计算机系统应用教程网站

网站首页 > 技术文章 正文

RabbitMQ 如何实现数据100%不丢失

btikc 2024-10-02 12:21:06 技术文章 16 ℃ 0 评论

文中代码主要以golang为示例,其他语言实现方式请自行Google

RabbitMQ一条消息从生产端到消费端共经3个步骤:

  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息。

如图所示,在每个步骤中都可能会发生消息丢失的情况,因而我们需要有措施来保障系统的可靠性。(磁盘损坏等极端情况就不在我们的考虑范围啦……)

一、生产端投递消息可靠性

生产端投递消息丢失的原因有很多,例如消息在传输过程中发生网络故障导致丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,从而引发消息丢失,此类情况我们根本无法预估并知晓。
因而,我们可以利用RabbitMQ的一些机制来处理

1.事务消息机制

事务处理方案所说能够保障消息的强一致性,但因此造成的性能损耗较大,面对大量消息推送时性能问题严重,故而我们可以考虑轻量级解决方案:confirm消息确认机制

2.confirm消息确认机制

生产端投递的消息投递到RabbitMQ后,RabbitMQ将发送一个确认消息给到生产端,让生产端知晓我已收到消息,否则这条消息就可能丢失了,需要生产端再次发起消息投递。

开启确认机制:

err = channel.Confirm(false)

以confirm模式发送消息示例:

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "rmq/db/rmq"
)

var (
    channel *amqp.Channel
    err     error
    queue   amqp.Queue
    conn    *amqp.Connection
)

func main() {
    conn, err = rmq.GetConn()

    defer conn.Close()

    channel, err = conn.Channel()

    if err != nil {
        fmt.Printf("error: %s \n", err.Error())
        return
    }

    defer channel.Close()

    err = channel.Confirm(false)

    if err != nil {
        fmt.Printf("error: %s \n", err.Error())
        return
    }

    queue, err = channel.QueueDeclare("confirm:message", false, false, false, false, nil)
    if err != nil {
        fmt.Printf("error: %s \n", err.Error())
        return
    }

    confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))

    defer confirmOne(confirms)
    err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte("confirm message"),
    })

    if err != nil {
        fmt.Printf("error: %s \n", err.Error())
        return
    }

    fmt.Println("消息发送成功")

}

func confirmOne(confirms <-chan amqp.Confirmation) {
    if confirmed := <-confirms; confirmed.Ack {
        fmt.Printf("带标志的消息投递确认: %d", confirmed.DeliveryTag)
    } else {
        fmt.Printf("消息投递确认: %d", confirmed.DeliveryTag)
    }
}

采用此种方案就可以让生产端感知到消息是否投递到RabbitMQ中。

二、消息持久化

RabbitMQ收到消息后是暂存到内存当中,此时若RabbitMQ挂了,重启服务将会导致数据丢失,所以我们应当将相关数据持久化到硬盘中,这样RabbitMQ重启后依然可以到硬盘中取数据恢复。
消息达到RabbitMQ后先到exchange交换机,然后路由到queue队列,最后发送给消费端。

因而需要对exchange、queue、message都进行持久化:
exchange持久化:

// 第三个参数true表示这个exchange持久化
channel.ExchangeDeclare("exchange", "direct", true, false, false, true, nil)

queue持久化:

// 第二个参数true标识这个queue持久化
channel.QueueDeclare("confirm:message", true, false, false, false, nil)

message持久化:

err = ch.Publish("exchange","queue name",false,false,amqp.Publishing{
                    DeliveryMode: amqp.Persistent, // 消息持久化
                    ContentType:  "text/plain",
                    Body:         []byte(msgBody),
                })

注意:如果需要消息持久化,Queue也是需要设定为持久化才有效。
这样,如果RabbitMQ收到消息后挂掉了,重启后会自行恢复消息。
思考一下,基于以上几种机制,我们还是不能完全保证消息可靠性投递到RabbitMQ中,比如极端情况,RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂掉了,此时消息依然是丢失了;或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障导致生产端没有收到确认消息,导致生产端不知道RabbitMQ是否收到消息,依然不好处理接下来的业务。

因而我们需要在上述机制的基础上,考虑一些消息补偿机制,以应对部分极端情况,比如消息入库

三、消息入库

我们可以考虑将要发送的消息保存到数据库中,标注一个状态字段status=0,标识生产端将消息发送给RabbitMQ但还没收到确认回复。在生产端收到RabbitMQ确认回复后,将status设为1,表示RabbitMQ已收到消息。
考虑到前面提到的极端情况,我们可以在生产端开设一个定时器,定时检索消息表,将
status=0并且超过固定期限后还没收到确认的消息内容取出重发(此时消费端要考虑消息重复情况,提前做好幂等性设置),并设定重发最大次数,超限做单独的特殊处理。

到此,消息就可以可靠性投递到RabbitMQ中,生产端也可以正常感知了。

四、消费端信息不丢失

正常情况下,以下三种情况会导致消息丢失

  • 在RabbitMQ将消息发出后,消费端还没有接收到消息前发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;
  • 在RabbitMQ将消息发出后,消费端还没有接收到消息前消费端挂了,此时消息会丢失;
  • 消费端准备接收到消息后,但在处理消息过程中发生异常或宕机,消息会丢失。
    综合上述三种情况,都是因为RabbitMQ的自动ack机制,即RabbitMQ默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时,RabbitMQ也没有该消息了。
    因此就需要将自动ack机制改为手动ack机制。
    消费端手动确认消息:
// 接收消息
messages,err := r.channel.Consume(
    r.QueueName, // 队列名称
    "", // 区分消费者
    false, // 设置关闭自动应答,每条消息必须手动ack
    false,
    false,
    false,
    nil
)

if err != nil{
    fmt.Println(err)
}

// 消费消息
channel := make(chan bool)

go func(){
    for k := range messages{
        // true 表示确认所有未确认消息
        // false 表示确认当前消息
        k.Ack(false)
    }
}()
// 阻塞主程序
<-channel

当autoAck设置为false时,对于RabbitMQ服务端而言,队列中的消息就分为两类:

  • 等待投递给消费端的消息
  • 已经投递给消费端,等待消费端发回确认信号的消息

如果RabbitMQ一直没有接收到消费端的确认信号,且消费端已经断开链接或宕机,此时RabbitMQ会将此消息重新放入队列,等待下次投递。故而消费端也需要做好幂等性设置,确保消息重复处理机制。
综合以上方案,即可保证生产端-RabbitMQ-消费端全链路数据不丢失啦!

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表