在之前的分享中,我们介绍了关于RabbitMQ消息的发送过程,并且了解了RabbitMQ作为一个开源的消息中间件在分布式系统中承担的角色,提供的各种消息传递模式,可以帮助系统提升性能保证系统的高效稳定运行,那么下面我们就来详细的介绍一下RabbitMQ的消息的接收过程。
概念回顾
??在之前的介绍中,我们提到了RabbitMQ中有如下的几个基本的组件。
- Producer(生产者):消息的发送者,负责将消息发布到 RabbitMQ 中。
- Consumer(消费者):消息的接收者,负责从队列中消费消息。
- Queue(队列):用于存储消息的缓冲区,消费者从队列中取出消息进行处理。
- Exchange(交换机):接收生产者发送的消息,并根据特定的路由规则将消息转发到一个或多个队列。
- Binding(绑定):交换机和队列之间的关系,确定哪些消息会路由到哪些队列。
??这些组件共同支撑了整个的RabbitMQ消息发送机制。并且在之前的介绍中我们介绍了如何将消息发送到交换机,并且通过交换机绑定的路由发送到消费者的队列中的过程。下面我们来介绍一下消费者如何接收消息。
RabbitMQ消息接收流程
生产者发送消息到交换机
??在消息传递的过程中,RabbitMQ会先通过API客户端调用将消息发送到指定的交换机中,然后交换机会根据配置的路由规则绑定来将消息转发到符合条件的队列中。根据路由规则的不同RabbitMQ也给出了不同的交换机实现,如下所示。
- direct:消息直接按照路由键(Routing Key)转发到绑定的队列。
- fanout:消息会广播到所有绑定的队列。
- topic:消息会根据路由键的模式匹配,选择符合条件的队列。
- headers:根据消息头部的内容进行路由。
??对于交换机的详细内容,在之前的博客中我们介绍过,这里不做过多的介绍了。
队列存储消息
??当消息被发送到交换机中的时候,就会被某个路由规则匹配然后路由到相应的队列中,然后消息就会被存储到该队列中,直到消费者将消息从队列中取出。
??对于队列这种数据结构想必大家应该不陌生,具有先进先出等特点,并且在RabbitMQ中我们可以通过持久化技术来保证进入到消息队列中的消息不会因为重启而丢失。
消费者订阅队列
??在消费者处理消息之前,首先需要连接到RabbitMQ中,然后要订阅某个消息队列的消息,在订阅消息队列的时候,消费者会告诉RabbitMQ一个规则,当消息进入之后会自动将消息推送到指定的消息者中,一般情况下,消费者的消息订阅流程如下所示。
- 建立连接:消费者通过客户端工具与RabbitMQ建立连接。
- 声明队列:接下来消费者需要声明自己要接收消息的队列是哪一个。如果该队列已经存在,那么RabbitMQ会将直接使用该队列;如果队列不存在,则会创建一个新的队列。
- 绑定队列:然后,消费者会与一个特定规则的队列进行绑定,这样才能保证接收到队列中的消息。
消费者获取消息
??在消费者进行消息接收处理的时候,可以通过异步的方式进行处理,也可以通过同步的方式进行处理,如下所示。
- 同步消费:消费者会轮询队列,直到有消息到达时才会返回并处理消息。这种方式对消息的实时性要求较高,但可能会浪费一定的计算资源。
- 异步消费:当然RabbitMQ也会主动将消息推送给消费者,消费者可以注册一个回调函数来处理接收到的消息。这种方式适合于高并发场景,能够提高资源利用率。
??当然在消费者接收到了消息之后,需要确认消息是否收到,然后通知RabbitMQ从队列中移除对应的消息。这里RabbitMQ提供了两种消息确认机制,分别是手动确认和自动确认,如下所示。
- 自动确认:这种方式,消费者一旦接收到消息,RabbitMQ就会立即认为消息被成功消费。从而删除队列中的消息,但是这种方式带来的弊端就是消费者如果在处理消息的过程出现异常的话就会导致消息丢失。
- 手动确认:这种方式则是需要消费者在处理完消息后,通过手动确认的方式通知RabbitMQ消息已经成功消费。只有在收到确认后,RabbitMQ才会从队列中删除该消息,这样可以有效的保证了消息不回被丢失,但是要损耗一定的性能
消费者处理消息
??当消息被消息消费者接收之后,会对消息进行后续的处理,这就是消费者的内部处理逻辑了。如果消息处理成功,可以通过手动确认的方式通知RabbitMQ删除该条消息,如果处理失败那么就会触发重试机制或者是多次处理失败之后会将消息放入到死信队列中。
??一旦消息被成功处理或者消费者发送了确认消息,那么RabbitMQ就会立即从队列中删除该条消息,否则就会出现消费者没有及时处理消息,导致消息被重新投递到了其他消费者造成消息重复消费,在某些情况下,可能因为消费者的故障导致消息无法正常消费,这种情况下RabbitMQ会尝试重试,或者是将消息推送到其他消费者,或者是将消息投递到死信队列中,确保消息最终是被正常处理的。
总结
??通过对消息接收过程的了解,可以帮助我们更好的设计和优化RabbitMQ的操作过程,如果你有更复杂的需求,可以利用消息确认、死信队列、消息持久化等功能来满足不同场景的需求实现,如果你对RabbitMQ还有其他问题,欢迎留言讨论!
本文暂时没有评论,来添加一个吧(●'◡'●)