🗒️RabbitMQ队列使用

发布于2018-09-15
😀
RabbitMQ是一个开源的消息中间件系统,它实现了高级消息队列协议(AMQP),用于支持分布式应用程序之间的异步通信。
 
 

基本概念和介绍

在RabbitMQ中,有一些重要的主要概念,包括Exchange、Queue、Binding、Routing Key和Producer/Consumer。以下是这些主要概念的解释:
  1. Exchange(交换器)
      • Exchange是消息的分发中心,用于将消息路由到一个或多个Queue中。
      • Exchange根据消息的Routing Key来决定将消息发送到哪个Queue。
      • RabbitMQ提供了不同类型的Exchange,包括Direct、Fanout、Topic和Headers,每种类型有不同的路由规则。
  1. Queue(队列)
      • Queue是消息的终端,它是消息的最终目的地,消息将存储在队列中,等待被消费。
      • 消费者从队列中获取消息并进行处理。
      • 可以有多个Queue,每个Queue可以绑定到一个或多个Exchange上。
  1. Binding(绑定)
      • Binding是Exchange和Queue之间的连接。它定义了如何将消息从Exchange路由到Queue。
      • Binding通常包括Exchange的名称、Queue的名称和可选的Routing Key。
  1. Routing Key(路由键)
      • Routing Key是在消息发布时与消息关联的关键字。
      • Exchange使用Routing Key来决定将消息路由到哪个Queue。
      • Routing Key的格式取决于Exchange的类型。对于Direct Exchange,Routing Key通常是一个字符串,而对于Topic Exchange,它可以包含通配符。
  1. Producer(生产者)
      • 生产者是负责发布消息到Exchange的应用程序或组件。
      • 生产者将消息发送到特定的Exchange,并根据Exchange的规则和Routing Key来决定消息的路由方式。
  1. Consumer(消费者)
      • 消费者是负责从Queue中获取消息并处理它们的应用程序或组件。
      • 消费者订阅特定的Queue,并等待从该Queue接收消息。
RabbitMQ的这些主要概念构成了一个消息传递系统,它允许应用程序之间通过消息进行异步通信。消息可以在生产者和消费者之间进行分布,经过Exchange和Queue的中间处理,以满足不同的消息传递需求。不同类型的Exchange和Routing Key允许实现多样化的消息路由和过滤机制。
 

安装

 
notion image
RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang,双击otp\_win64\_21.1.exe直接安装,选择默认配置即可,如果不安装Erlang或安装错误而直接安装RabbitMQ会弹出错误提示
 
然后打开浏览器,可以通过访问
http://localhost:15672
进行测试,默认的登陆账号为:guest,密码为:guest。
 
 

简单体验

添加一个队列
选择Queues页面,打开“Add a new queue”,为队列命名,如“MQ\_Test”,其它选择可以默认,然后点击“Add queue”按钮,
notion image
 
发送消息测试
在“All queues”列表中选择刚刚创建的队列“test”,会显示队列的信息页面,主要显示了该队列的网络状态以及速率监控等,然后选择“Publish message”,在“Payload”中输入”这是一个简单的消息”,
notion image
 
然后点击“Publish message”按钮,就可以发送消息,发送完之后在“Overview”中显示了实时的网络状态。
 
然后选择“Get messages”下拉框,会弹出接收消息的显示界面,点击“Get message”按钮,接收到来自服务器的消息,也就是刚刚发送的“这是一个简单的消息”,
 
notion image
 
 
下面可以看到我们刚刚发送的消息
notion image
 
 

消息确认

RabbitMQ支持在消费者确认收到消息后再删除消息的机制,这被称为消息确认(Message Acknowledgment)。消息确认是保证消息被可靠地处理的一种方式,它可以防止消息丢失或重复处理。
在RabbitMQ中,有两种主要的消息确认模式:
  1. 自动消息确认(Auto Acknowledgment):
      • 默认情况下,消费者会自动确认消息。
      • 当消费者收到消息并处理完毕后,RabbitMQ会立即将消息从队列中删除。
      • 这种方式适用于那些不需要确保消息可靠处理的情况,因为一旦消息被投递给消费者,它会被立即删除,即使消费者未完成处理也不会重新投递。
  1. 显式消息确认(Explicit Acknowledgment):
      • 在显式消息确认模式下,消费者可以选择何时确认消息的处理。
      • 消费者可以在成功处理消息后发送确认,以告知RabbitMQ可以安全删除该消息。
      • 如果消费者在处理消息时发生错误或崩溃,消息将不会被确认,RabbitMQ将重新投递消息,确保它被可靠地处理。
显式消息确认对于确保消息不会丢失且仅在成功处理后才被删除的场景非常有用。它允许消费者完全控制消息的处理过程,同时提供了消息处理的可靠性。
在大多数RabbitMQ客户端库中,包括RabbitMQ官方的客户端库,都提供了显式消息确认的机制,允许消费者在成功处理消息后发送确认,或者在处理失败时拒绝消息。这提供了更精细的消息处理控制。
 

javascript

import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MessageHandler { @RabbitListener(queues = "your_queue") public void handleMessage(Message message) { try { // 处理消息 processMessage(message.getBody()); // 显式确认消息 // 注意:需要在@RabbitListener方法的参数列表中包含Channel参数 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 处理错误情况 System.err.println("Error: " + e.getMessage()); // 拒绝消息 // 注意:需要在@RabbitListener方法的参数列表中包含Channel参数 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } private void processMessage(byte[] body) { // 实际消息处理逻辑 } }
JavaScript
 
 

简单API一览

javascript

package top.shusheng007.stream.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue myQueue(){ return new Queue("ss007",true); } @Bean public Queue topicQueue1(){ return new Queue("topicQueue1",true); } @Bean public Queue topicQueue2(){ return new Queue("topicQueue2",true); } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding topicBinding1(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()) .with("ss007.王二狗.觉醒"); } @Bean public Binding topicBinding2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()) .with("ss007.*.觉醒"); } }
JavaScript
 
定义生产者

javascript

@RequiredArgsConstructor @Service public class SendService { private final RabbitTemplate rabbitTemplate; private final Queue myQueue; public void sendMsg(String msg){ rabbitTemplate.convertAndSend(myQueue.getName(),msg); } public void sendTopicMsg(String msg,String route){ rabbitTemplate.convertAndSend("topicExchange",route,msg); } }
JavaScript
 
定义消费者
 

javascript

@Slf4j @Service public class QueueConsumer { @RabbitListener(queues = {"ss007"}) public void receive(@Payload String fileBody) { log.info("ss007队列:" + fileBody); } @RabbitListener(queues = {"topicQueue1"}) public void receiveTopic1(@Payload String fileBody) { log.info("topic1队列:" + fileBody); } @RabbitListener(queues = {"topicQueue2"}) public void receiveTopic2(@Payload String fileBody) { log.info("topic2队列:" + fileBody); } }
JavaScript
 
 
💡
Good job ~
 
ElasticSearch使用Git 操作技巧
Loading...
©2021-2025 Arterning.
All rights reserved.