计算机系统应用教程网站

网站首页 > 技术文章 正文

详细介绍一下RabbitMQ消息的发送过程?

btikc 2025-02-07 16:40:17 技术文章 12 ℃ 0 评论

RabbitMQ是一个遵循AMQP协议的开源消息代理系统,被广泛使用在分布式系统中来进行系统解耦、异步通信、负载均衡以及延迟消息处理等场景。下面我们就来详细介绍一下RabbitMQ中的消息是如何发送的,通过对消息发送的流程来了解一下RabbitMQ的工作原理,同时这个问题也是在RabbitMQ中面试官常问到的面试题之一。

RabbitMQ架构

??在介绍RabbitMQ消息发送机制之前,首先先来了解一下RabbitMQ的基础架构。在RabbitMQ架构中包含了如下的一些核心组件。

  • 生产者(Producer):负责发送消息的客户端。
  • 交换机(Exchange):接收生产者发送的消息,并将消息路由到一个或多个队列。
  • 队列(Queue):存储消息的容器。消费者从队列中获取消息。
  • 消费者(Consumer):从队列中获取消息并进行处理的客户端。

??在RabbitMQ中,交换机的作用就是负责消息的路由,而队列则是用来进行消息的存储,交换机与队列之间通过绑定关系来实现连接,根据交换机的类型不同,对应的路由规则也有所不同,这个在之前介绍交换机的时候,我们也详细介绍过,这里就不做过多的赘述了。

消息发送的过程

??了解完RabbitMQ的基础架构之后,接下来我们就来介绍一下RabbitMQ的消息发送过程。

生产者连接RabbitMQ服务

??想要实现消息的发送,首先消息生产者要与RabbitMQ的服务建立起连接,当连接建立之后,生产者就可以通过Channel通道来实现消息的发送机制,这里我们以Python代码为例来做演示介绍。如下所示。

import pika

# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 确保交换机存在
channel.exchange_declare(exchange='logs', exchange_type='fanout')

??在上面的实现中,生产者连接到了本地的RabbitMQ服务上,然后创建了一个名为log的交换机,并且交换机的类型为fanout,也就是说这是一个用来进行广播的交换机,当消息发送之后,所有的监听队列都可以接受到消息。

生产者将消息发送到交换机

??配置RabbitMQ的连接以及交换机信息之后,接下来,我们就可以通过生产者将消息发送到指定的交换机中,然后交换机会根据路由规则将消息路由到合适的队列中,如下所示。

# 发送消息到交换机
channel.basic_publish(exchange='logs', routing_key='', body='Hello, RabbitMQ!')

??发送了一个消息到logs交换机中,而这个交换机是fanout类型的交换机,也就是说会忽略路由规则将消息广播的所有绑定了该交换机的队列中,也可以看到routing_key 参数为空,也是因为 fanout 类型的交换机会忽略路由键,直接将消息发送到所有绑定的队列。

交换机将消息路由到队列

??当生产者将消息发送到到交换机上的时候,交换机会根据类型和绑定的路由规则对消息进行路由,例如在 fanout 交换机类型中定义的规则就是将消息将广播到所有绑定到该交换机的队列中,假设这里有两个队列queue1queue2分别绑定到交换机 logs,那么当消息进入到该交换机的时候,消息就会被同时发送到这两个队列中。

消费者从队列中获取消息

??消费者通过连接到对应的服务器以及队列中,来监听获取生产者的消息,如下所示。

# 声明队列
channel.queue_declare(queue='queue1')

# 绑定队列到交换机
channel.queue_bind(exchange='logs', queue='queue1')

# 回调函数,处理消息
def callback(ch, method, properties, body):
    print(f"Received {body}")

# 消费消息
channel.basic_consume(queue='queue1', on_message_callback=callback, auto_ack=True)

# 开始消费
channel.start_consuming()

??在上面的实现中首先声明了queue1的队列,然后将其绑定到了logs交换机上,消费者通过basic_consume方法开始监听并且消费队列中的消息,当消息队列中有新消息进入的时候,就会触发回调函数callback(),来进行消息的处理。

消息确认与处理

??最终当消费者确认收到了消息之后,如果消息处理成功,就可以通过消息确认机制将相关的消息通知给RabbitMQ,这里RabbitMQ提供了如下的两种消息确认机制。

  • 自动确认(Auto Acknowledge):消费者接收到消息后,RabbitMQ会自动认为消息被处理成功,消息会从队列中移除。
  • 手动确认(Manual Acknowledge):消费者在成功处理消息后,显式地向RabbitMQ发送确认消息,告知RabbitMQ消息已被成功处理。

??如下所示是一个手动确认的实现逻辑。

# 手动确认
channel.basic_ack(delivery_tag=method.delivery_tag)

??通过这种方式可以在消息处理出现错误的时候保证消息不回丢失,对于未确认的消息会被重新进行投递处理。

消息的持久化与可靠性

??上面也提到了,如果消息没有被正常处理的时候,RabbitMQ默认提供的是非持久化的消息处理,也就是说当RabbitMQ重启服务之后这些没有被处理的消息就会丢失,如果要保证消息的持久化和可靠性,就需要通过如下的配置来开启消息持久化的支持。

  • 交换机持久化:在声明交换机时,设置 durable=True,确保交换机在 RabbitMQ 重启时不会丢失。
  • 队列持久化:在声明队列时,设置 durable=True,确保队列在 RabbitMQ 重启时不会丢失。
  • 消息持久化:在发送消息时,设置 delivery_mode=2,表示消息会被持久化到磁盘。

??如下所示,有了这些配置支持之后,就可以保证消息不丢失。

channel.basic_publish(exchange='logs', routing_key='', body='Hello, RabbitMQ!', 
                      properties=pika.BasicProperties(
                          delivery_mode=2  # 使消息持久化
                      ))

消息确认和异常处理

??通过上面的介绍,我们也知道了,在实际应用场景中,很难保证消息的发送以及消费不会出现异常,例如可能会出现RabbitMQ 服务器不可用、网络故障等,当然上我们也提到了通过持久化等方式来保证消息的可靠性,但是有些异常是在业务处理中产生的,所以就需要RabbitMQ支持异常处理和重试机制,上面我们提到了一个消息发送确认机制,可以通过publisher confirms 特性,确保消息已经成功发送到 RabbitMQ。

??当然在实际使用场景中,消费者在处理消息的时候,如果系统发生了错误,那么可以通过重试机制将消息重新方卉到队列中进行再次消费,或者是经过多次重试消息还是会出错,那么我们就可以配置一个RabbitMQ的死信队列来处理这些不可被处理的消息。

总结

??在RabbitMQ消息发送的过程中,生产者将消息发送到交换机,交换机根据路由规则将消息投递到一个或多个队列,消费者从队列中获取消息并进行处理。消息的持久化、确认机制和异常处理都能够确保消息传递的可靠性。

??掌握RabbitMQ的消息发送过程,可以有效的帮助我们理解RabbitMQ的工作原理,可以帮助我们在实际工作中对RabbitMQ消息处理进行性能优化、提高消息传递的可靠性。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表