网站首页 > 技术文章 正文
文中代码主要以golang为示例,其他语言实现方式请自行Google
RabbitMQ一条消息从生产端到消费端共经3个步骤:
- 生产端发送消息到RabbitMQ;
- RabbitMQ发送消息到消费端;
- 消费端消费这条消息。
如图所示,在每个步骤中都可能会发生消息丢失的情况,因而我们需要有措施来保障系统的可靠性。(磁盘损坏等极端情况就不在我们的考虑范围啦……)
一、生产端投递消息可靠性
生产端投递消息丢失的原因有很多,例如消息在传输过程中发生网络故障导致丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,从而引发消息丢失,此类情况我们根本无法预估并知晓。
因而,我们可以利用RabbitMQ的一些机制来处理
1.事务消息机制
事务处理方案所说能够保障消息的强一致性,但因此造成的性能损耗较大,面对大量消息推送时性能问题严重,故而我们可以考虑轻量级解决方案:confirm消息确认机制
2.confirm消息确认机制
生产端投递的消息投递到RabbitMQ后,RabbitMQ将发送一个确认消息给到生产端,让生产端知晓我已收到消息,否则这条消息就可能丢失了,需要生产端再次发起消息投递。
开启确认机制:
err = channel.Confirm(false)
以confirm模式发送消息示例:
package main
import (
"fmt"
"github.com/streadway/amqp"
"rmq/db/rmq"
)
var (
channel *amqp.Channel
err error
queue amqp.Queue
conn *amqp.Connection
)
func main() {
conn, err = rmq.GetConn()
defer conn.Close()
channel, err = conn.Channel()
if err != nil {
fmt.Printf("error: %s \n", err.Error())
return
}
defer channel.Close()
err = channel.Confirm(false)
if err != nil {
fmt.Printf("error: %s \n", err.Error())
return
}
queue, err = channel.QueueDeclare("confirm:message", false, false, false, false, nil)
if err != nil {
fmt.Printf("error: %s \n", err.Error())
return
}
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
defer confirmOne(confirms)
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("confirm message"),
})
if err != nil {
fmt.Printf("error: %s \n", err.Error())
return
}
fmt.Println("消息发送成功")
}
func confirmOne(confirms <-chan amqp.Confirmation) {
if confirmed := <-confirms; confirmed.Ack {
fmt.Printf("带标志的消息投递确认: %d", confirmed.DeliveryTag)
} else {
fmt.Printf("消息投递确认: %d", confirmed.DeliveryTag)
}
}
采用此种方案就可以让生产端感知到消息是否投递到RabbitMQ中。
二、消息持久化
RabbitMQ收到消息后是暂存到内存当中,此时若RabbitMQ挂了,重启服务将会导致数据丢失,所以我们应当将相关数据持久化到硬盘中,这样RabbitMQ重启后依然可以到硬盘中取数据恢复。
消息达到RabbitMQ后先到exchange交换机,然后路由到queue队列,最后发送给消费端。
因而需要对exchange、queue、message都进行持久化:
exchange持久化:
// 第三个参数true表示这个exchange持久化
channel.ExchangeDeclare("exchange", "direct", true, false, false, true, nil)
queue持久化:
// 第二个参数true标识这个queue持久化
channel.QueueDeclare("confirm:message", true, false, false, false, nil)
message持久化:
err = ch.Publish("exchange","queue name",false,false,amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(msgBody),
})
注意:如果需要消息持久化,Queue也是需要设定为持久化才有效。
这样,如果RabbitMQ收到消息后挂掉了,重启后会自行恢复消息。
思考一下,基于以上几种机制,我们还是不能完全保证消息可靠性投递到RabbitMQ中,比如极端情况,RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂掉了,此时消息依然是丢失了;或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障导致生产端没有收到确认消息,导致生产端不知道RabbitMQ是否收到消息,依然不好处理接下来的业务。
因而我们需要在上述机制的基础上,考虑一些消息补偿机制,以应对部分极端情况,比如消息入库。
三、消息入库
我们可以考虑将要发送的消息保存到数据库中,标注一个状态字段status=0,标识生产端将消息发送给RabbitMQ但还没收到确认回复。在生产端收到RabbitMQ确认回复后,将status设为1,表示RabbitMQ已收到消息。
考虑到前面提到的极端情况,我们可以在生产端开设一个定时器,定时检索消息表,将status=0并且超过固定期限后还没收到确认的消息内容取出重发(此时消费端要考虑消息重复情况,提前做好幂等性设置),并设定重发最大次数,超限做单独的特殊处理。
到此,消息就可以可靠性投递到RabbitMQ中,生产端也可以正常感知了。
四、消费端信息不丢失
正常情况下,以下三种情况会导致消息丢失
- 在RabbitMQ将消息发出后,消费端还没有接收到消息前发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;
- 在RabbitMQ将消息发出后,消费端还没有接收到消息前消费端挂了,此时消息会丢失;
- 消费端准备接收到消息后,但在处理消息过程中发生异常或宕机,消息会丢失。
综合上述三种情况,都是因为RabbitMQ的自动ack机制,即RabbitMQ默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时,RabbitMQ也没有该消息了。
因此就需要将自动ack机制改为手动ack机制。
消费端手动确认消息:
// 接收消息
messages,err := r.channel.Consume(
r.QueueName, // 队列名称
"", // 区分消费者
false, // 设置关闭自动应答,每条消息必须手动ack
false,
false,
false,
nil
)
if err != nil{
fmt.Println(err)
}
// 消费消息
channel := make(chan bool)
go func(){
for k := range messages{
// true 表示确认所有未确认消息
// false 表示确认当前消息
k.Ack(false)
}
}()
// 阻塞主程序
<-channel
当autoAck设置为false时,对于RabbitMQ服务端而言,队列中的消息就分为两类:
- 等待投递给消费端的消息
- 已经投递给消费端,等待消费端发回确认信号的消息
如果RabbitMQ一直没有接收到消费端的确认信号,且消费端已经断开链接或宕机,此时RabbitMQ会将此消息重新放入队列,等待下次投递。故而消费端也需要做好幂等性设置,确保消息重复处理机制。
综合以上方案,即可保证生产端-RabbitMQ-消费端全链路数据不丢失啦!
猜你喜欢
- 2024-10-02 「2021最新版」RabbitMQ面试题总结,每道题都很经典
- 2024-10-02 RabbitMQ如何保证消息不丢失 运城丢失女孩最新消息
- 2024-10-02 用 RabbitMQ 延迟队列,实现消息延迟推送
- 2024-10-02 RabbitMQ如何保证消息不丢失? 山西丢失孩子最新消息
- 2024-10-02 超详细的RabbitMQ入门,看这篇就够了
- 2024-10-02 3分钟阅读技术干货,一步一步的理解RabbitMQ
- 2024-10-02 周日福利--消息队列学习必备宝典(RabbitMQ实战指南)
- 2024-10-02 C# 消息队列之RabbitMQ rabbitmq消息队列类型
- 2024-10-02 RabbitMQ 持久化和权重分配消息 rabbitmq的持久化和确认机制
- 2024-10-02 RabbitMQ如何防止消息丢失和重复消费
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)