计算机系统应用教程网站

网站首页 > 技术文章 正文

RabbitMQ如何保证消息不被重复消费?

btikc 2025-02-07 16:40:31 技术文章 11 ℃ 0 评论

RabbitMQ作为目前使用比较广泛的消息中间件系统,通过高效的消息队列机制支持了分布式应用之间的消息互传,但是作为一个消息系统来讲,如何能够确保消息不被重复消费导致数据一致性等问题的发生就成了关键,尤其是在一些对于数据一致性和事务管理性较强的场景中,重复消息消费成为了重中之重需要解决的问题。

??在RabbitMQ中默认就提供了一些防止消息重复消费的机制,但是如何结合实际的情况正确的使用这些机制,确保消息能够正常准确的消费,就成了开发者需要关注的问题。下面我们就来详细介绍一下RabbitMQ中如何避免消息重复消费?

消息重复消费的原因

??在RabbitMQ消息队列管理机制中可能会出现如下的一些情况导致消息出现被重复消费的问题。

  • 消费者崩溃或重启:当消费者正在处理消息的过程中如果RabbitMQ出现故障或重启的情况,就可能会导致消息还没有来得及确认正常收到并处理,这种情况下,消息就会重新投递到消费者中,但是这个时候可能由于系统间的调用,消息消费者已经将相关的消息进行了处理,只是还未来得及通知,这样就导致消息的重复消费。
  • 网络问题:如果消息在传递过程中出现了网络中断的情况,消费者未能将正确确认ACK消息反馈到RabbitMQ中,那么RabbitMQ会认为消息消费失败并重新投递。
  • 消息未确认(ACK):如果因为消费者的故障没有来的及向RabbitMQ发送ACK消息确认信息,那么RabbitMQ可能会重新将该消息投递给其他消费者,这时候就会由其他消费者来进行处理,导致消费者系统中的数据不一致。
  • 消息的重入:在某些场景中,如果消息被错误地重新推送到队列中,也会导致重复消费。

??其实上面的种种原因导致的最终结果就是RabbitMQ没有收到确认消息,那么我们的解决方案也是要从解决消息确认机制上来入手。

如何保证消息不被重复消费?

??为了避免出现消息重复消费的问题,RabbitMQ提供了一些默认的支持机制,如下所示。

消息确认机制(ACK)

??根据上面的介绍,我们知道消息重复消费的一大半的问题都是由于无法收到确认消息,而RabbitMQ提供这个消息确认机制的目的就是能够保证消息被正常消费,并且告知RabbitMQ消息正常消费了,但是消费者如果出现了处理失败或者是奔溃的情况就不会发送确认机制,并且RabbitMQ也可能因为某些原因无法接收到确认消息,这种情况下,RabbitMQ就会将消息重复投递到队列中被其他消费者消费。

??在RabbitMQ中,提供了两种消息确认机制,如下所示。

  • 自动确认(auto-ack):消费者接收消息后会自动向RabbitMQ发送ACK,不需要手动确认。
  • 手动确认(manual-ack):消费者处理完消息后需要显式地调用channel.basicAck()方法,向RabbitMQ确认消息已经被成功处理。

??为了解决消息确认机制异常问题,这里我们建议通过手动的方式来进行确认。尤其是在消息处理较为复杂或可能失败的场景下。这是避免重复消费的一个重要机制,如下所示。

# 手动确认消息
def callback(ch, method, properties, body):
    try:
        # 处理消息
        process_message(body)
        # 手动确认消息
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # 处理失败,可以选择不确认或进行nack(负确认)
        ch.basic_nack(delivery_tag=method.delivery_tag)

??消费者通过basic_ack()方法来显式确认消息已成功处理。如果处理过程中发生异常,消费者则可以选择使用basic_nack()方法将消息标记为未成功消费,并可能重新投递消息,这样可以有效的保证消息的确认机制是正常执行,如果因为处理异常导致的错误则直接反馈处理失败,但是这种情况下无法保证因为网络链路导致确认消息收不到的情况。

消息去重(Idempotency)

??为了避免重复消费消息导致对业务系统逻辑产生的影响,在系统设计的时候,可能通过幂等性(Idempotent)处理来保证消息正常消费,所谓的幂等性就是指同一条消息被多次消费,处理的结果也不发生变化。

??在RabbitMQ中可以通过如下的方式来实现幂等性。

  • 数据库去重:在处理消息时,可以在数据库中根据消息的唯一标识符(如消息ID)检查是否已经处理过。如果已经处理过,则跳过该消息,如果没有处理,那么就是正常的处理逻辑
  • 缓存去重:当然除了数据库之外,我们还可以通过使用缓存(如Redis)记录已处理的消息ID,当相同消息ID再次出现时,直接忽略。

??通过这种方式可以有效的保证消息在重复消费的过程中不会影响业务系统本身的逻辑,并且在网络异常的情况下也可以保证消息正常消费,如下所示,是一个基于Redis的消息幂等处理操作,但是这种情况会引入新的Redis的相关的问题。有兴趣的读者可以自己研究一下。

import redis

# 创建一个 Redis 客户端连接
r = redis.Redis(host='localhost', port=6379, db=0)

def callback(ch, method, properties, body):
    message_id = properties.message_id
    # 检查消息是否已经处理过
    if r.get(message_id):
        # 如果已经处理过,跳过消息
        return
    try:
        # 处理消息
        process_message(body)
        # 将消息ID标记为已处理
        r.set(message_id, 1, ex=3600)  # 设置过期时间,避免长期占用内存
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        ch.basic_nack(delivery_tag=method.delivery_tag)

死信队列机制

??在RabbitMQ中提供了一个死信队列的机制,它允许系统将处理失败或者是处理超时的消息发送到一个指定的死信队列中,然后通过死信队列来避免消息的重复消费,出现异常之后可由人工干预消息的处理,不会影响到系统逻辑本身,如下所示。

??在消费者处理消息失败时,可以将消息发送到一个死信队列中,以便进行重试、日志记录或报警处理。

channel.basic_publish(
    exchange='',
    routing_key='dlx_queue',
    body='Failed message',
    properties=pika.BasicProperties(
        delivery_mode=2  # 持久化消息
    )
)

消息持久化(Persistence)

??在RabbitMQ中,还提供了消息持久化机制,也就是说在消息传递的过程中,消息可以存储到磁盘上进行持久化的存储,这样可以防止RabbitMQ宕机之后因为重启而造成的消息丢失的情况,如果消息丢失,消费者就无法正常的处理消息,就有可能会导致消息重新投递的情况,但是这种重新投递应该算是消息的正常消费。

??所以在消费者需要这种处理机制的时候,可以在发送消息的时候设置对应的持久化标识,然后通过这个标识来对消息进行持久化处理,来保证消息正常消费,如下所示。

channel.basic_publish(
    exchange='',
    routing_key='my_queue',
    body='Message content',
    properties=pika.BasicProperties(
        delivery_mode=2  # 持久化消息
    )
)

消息超时和死信队列结合

??RabbitMQ还支持了消息超时机制,我们可以通过x-message-ttl属性来设置消息的有效时间。也就是说,如果消息在队列中存在的时间超过了这个时间限制,那么就会被丢弃或者转发到死信队列中。然后我们可以结合死信队列,来防止因为消息超时未被消费而导致的重复消费问题。

{
  "x-message-ttl": 60000,   // 设置消息在队列中存活的时间为60秒
  "x-dead-letter-exchange": "dlx_exchange"
}

总结

??上面我们介绍了几种避免消息重复消费的机制,在实际开发中,我们可以结合多种情况来共同的处理RabbitMQ消息重复消费的问题,这样可以有效的避免出现消息重复消费的问题,保证了消息传递的可靠性以及数据系统的一致性。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表