网站首页 > 技术文章 正文
优化级队列、死信队列、延迟队列
优先级队列
使用场景:VIP用户 电商平台大客户订单
RabbitMQ的优先级队列可以让优先级高的消息先被消费者消费,优先级低的消息后被消费者消费。优先级的最大值为255,最小值为0(默认值),值越大,优先级越高,优先级越高,越先被消费者消费。
注意:优先级设置的过多,会使用更多的Erlang进程来消耗更多的CPU资源,因此,推荐优先级的值介于1和10之间。
RabbitMQ不支持通过策略的方式设置队列的优先级!
可以通过 x-max-priority 参数来实现
延迟队列
使用场景:
1. 订单在半小时之内未支付则自动取消
2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间,单位是毫秒。
死信队列
在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。
死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信消息而已。
同一个项目的死信交换机可以共用一个,然后每个业务队列分配一个单独的routeKey, 死信队列只不过是绑定在死信交换机上的队列。
RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。
消息进入到死信队列的情况:
消息或者队列TTL过期
队列达到最大长度 (最先入队的消息会被发送到死信队列)
消费者拒绝消息(Basic.reject 或者 Basic.Nack)并且requeue=false
死信的处理方式大致有以下几种:
丢失,如果消息不重要
记录进入死信队列,然后做后续的业务分析或者处理
通过死信队列,有负责监听死信的应用程序进行处理
重试模式
当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力
本地重试:我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。重试达到最大次数后,Spring会返回reject,消息会被丢弃
失败策略
重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐!!!)
惰性队列
在RabbitMQ中,惰性队列是一个不会主动推送消息给消费者的队列,直到消费者开始消费为止。这是通过消费者请求队列时的预取参数(prefetch count)来控制的。
设置消费者预取参数可以控制消费者在同一时间内能处理的消息数量。如果设置为0,则表示消费者在消息到达时就会立即处理它们,这就是一种非惰性模式。如果预取参数大于0,则表示消费者会等待更多的消息到达后再一次性获取处理,这就是一种惰性模式。
如何保证消息不丢失
丢失可能产生的原因:
生产者发送的消息未送达exchange
消息到达exchange后未到达queue
MQ宕机,queue将消息丢失
consumer接收到消息后未消费就宕机
RabbitMQ分别给出了解决方案
- 生产者发送确认机制 confirm机制
- mq持久化 exchange + queue + message 持久化 都是默认持久化的
- 消费者消费确认机制 手动ack manual
- 失败重试机制
生产者
通过使用channel.confirmSelect方法开启confirm模式,在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;
Exchange路由到队列失败
RabbitMQ中在发送消息时提供了mandatory参数。如果mandatory为true时,Exchange根据自身的类型和RoutingKey无法找到对应的Queue,它将不会丢掉该消息,而是会将消息返回给生产者。
RabbitMq自身问题导致的消息丢失问题解决方案
1. 需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;
通过将durable的值设置为true来保证持久化。持久化必须满足以下三个条件:
Exchange 设置持久化
Queue 设置持久化
Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息
2. 要保证rabbitMQ的高可用就要配合HAP ROXY做镜像集群模式
3. 消息补偿机制
持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,这种情况下消息仍然会丢失。
为了避免上面这个问题,我们可以让生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚
1. 消息表的主键:消息id ,消息状态,重试次数,创建时间
2. 然后根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理 【注意消费端的幂等性问题】
消费者
可以通过手动ack来保证消费者主动的控制ack行为,这样我们可以避免业务异常导致消息丢失的情况。
RabbitMQ是阅后即焚机制,确认消息被消费者消费后会立即删除。
RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。回执分为3种类型:ack、nack、reject。如果消费者返回ack,MQ会把消息从队列删除;如果返回nack或者reject,如果requeue是true的话,MQ会把消息重新投递给消费者,如果requeue是false的话,MQ会把消息删掉。
总结
生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。
mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。
消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。
如何处理重复消息
正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除 。
但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。
针对以上问题,一个解决思路是:
保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息消费的幂等性;
在写入消息队列的数据做唯一标识,消费消息时,根据唯一标识判断是否消费过;
如何防止消息被重复投递
在生产消息时,我们给每个消息都设置一个消息id,作为去重的依据,避免消息重复地进入队列。
如何保证消息的有序性
1. 所有需要保证有序性的消息被分配到同一个队列中
2. 同一队列所有消息必须由同一个消费者进行消费
生产者把一组有序的消息放到一个队列当中;而消费者一次性消费整个队列当中的消息。
核心思路就是根据业务数据关键值划分成多个消息集合,而且每个消息集合中的消息数据都是有序的,每个消息集合有自己独立的一个consumer。多个消息集合的存在保证了消息消费的效率,每个有序的消息集合对应单个的consumer也保证了消息消费时的有序性。
猜你喜欢
- 2024-10-17 一次教会你如何解决RabbitMQ消息丢失问题
- 2024-10-17 RabbitMQ消息可靠性分析和应用 rabbitmq消息数据类型
- 2024-10-17 基于本地消息表实现MQ最终一致性 本地消息表(异步确保)
- 2024-10-17 MQ怎么确保不丢数据 mq防止数据丢失
- 2024-10-17 Java面试必备!RabbitMQ 常用知识点总结,纯手绘23张图带你拿下
- 2024-10-17 SpringBoot+RabbitMQ 实现 RPC 调用
- 2024-10-17 RabbitMQ消息更多细节 rabbitmq消息堆积怎么解决
- 2024-10-17 springboot+rabbitmq+消息发送确认
- 2024-10-17 Rabbitmq消费端实战 rabbitmq官网
- 2024-10-17 每日学习~RabbitMQ消息应答机制 rabbit mq五种消息模型
你 发表评论:
欢迎- 11-19零基础学习!数据分析分类模型「支持向量机」
- 11-19机器学习 | 算法笔记(三)- 支持向量机算法以及代码实现
- 11-19我以前一直没有真正理解支持向量机,直到我画了一张图
- 11-19研一小姑娘分享机器学习之SVM支持向量机
- 11-19[机器学习] sklearn支持向量机
- 11-19支持向量机
- 11-19初探支持向量机:用大白话解释、原理详解、Python实现
- 11-19支持向量机的核函数
- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)