网站首页 > 技术文章 正文
要保证消息的可靠性,就需要保证消息在流转的每一个阶段都能够保证可靠性
从图上我们大概可以看出来一个消息会经历四个节点,只有保证这四个节点的可靠性才能保证整个系统的可靠性。
1. 生产者发出后保证到达了MQ。
2. MQ收到消息保证分发到了消息对应的Exchange。
3. Exchange分发消息入队之后保证消息的持久性。
4. 消费者收到消息之后保证消息的正确消费。
生产者发送消息到MQ Broker可靠性保障
这是消息流转的第一个过程,我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。
针对这个问题,RabbitMQ提供了两种解决方式:
- 引入了事务机制实现
- 通过发送方确认机制(publisher confirm)
这个机制很好理解,就是消息发送到MQ那端之后,MQ会回一个确认收到的消息给我们。
事务机制
RabbitMQ客户端中与事务机制 相关的方法有三个∶ channel.txSelect、channel.txCommit和 channel.txRollback。channel.txSelect 用于将当前的信道设置成事务模式channel.txCommit 用于提交事务,channel.txRollback 用于事务回滚。在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channe1.txRollback 方法来实现事务回滚。注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。
正常的事务提交过程:
channel. txSelect ();
channel.basicPublish (EXCHANGE NAME, ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,"transaction messages".getBytes ());
channel.txCommit ();
上面代码对应的 AMQP协议流转过程如图所示:
开启事务机制与不开启相比多了四个步骤∶
* 客户端发送 Tx.Select,将信道置为事务模式
* Broker 回复 Tx.Select-Ok,确认已将信道置为事务模式
* 在发送完消息之后,客户端发送Tx.Commit提交事务
* Broker回复Tx.Commit-Ok,确认事务提交
事务回滚过程:
try{
channel.txSelect ();
channel.basicPublish (exchange,routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
int result = 1 / 0;
channel.txCommit ();)
}catch (Exception e) {
e.printStackTrace ();
channel.txRollback ();
}
其 AMQP协议流转过程如图:
如果要发送多条消息,则将 channel.basicPublish 和 channel.txCommit 等方法包裹进循环内即可.
channel.txSelect();
for (int i = 0; i < LOOP_TIMES; i++) {
try {
channel.basicPublish("exchange", "routingKey", null, ("messages" + i).getBytes());
channel.txCommit();
}catch(IOException e) {
e.printStackTrace();
channel.txRollback();
}
}
事务确实能够解决消息发送方和 RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会"吸干"RabbitMQ 的性能.
发送方确认机制
采用事务机制实现会严重降低 RabbitMQ 的消息吞吐量,于是引入了一种轻量级的方式——发送方确认(publisher confirm)机制
生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMO也可以设置 channel.basicAck 方法中的multiple 参数,表示到这个序号之前的所有消息都已经得到了处理,可以参考下图
注意:
事务机制和普通confirm方式的吞吐量都很低,在实际生产环境中推荐使用批量confirm和异步confirm
MQ接收失败或者路由失败
生产者的发送消息处理好了之后,我们就可以来看看MQ端的处理,MQ可能出现两个问题:
消息找不到对应的Exchange。
找到了Exchange但是找不到对应的Queue。
这两种情况都可以用RabbitMQ提供的mandatory参数来解决,它会设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。
MQ Broker宕机
假如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来。
消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。
这些都是MQ宕机引起的问题,如果出现服务器宕机或者磁盘损坏则上面的手段统统无效,必须引入镜像队列,做异地多活来抵御这种不可抗因素。(不放在一个篮子里)
消费者无法正常消费
打开手动消息确认之后,只要我们这条消息没有成功消费,无论中间是出现消费者宕机还是代码异常,只要连接断开之后这条信息还没有被消费那么这条消息就会被重新放入队列再次被消费。
当然这也可能会出现重复消费的情况,不过在分布式系统中**幂等性**是一定要做的,所以一般重复消费都会被接口的幂等给拦掉。
所谓幂等性就是:一个操作多次执行产生的结果与一次执行产生的结果一致。
猜你喜欢
- 2024-10-17 一次教会你如何解决RabbitMQ消息丢失问题
- 2024-10-17 RabbitMQ消息可靠性分析和应用 rabbitmq消息数据类型
- 2024-10-17 基于本地消息表实现MQ最终一致性 本地消息表(异步确保)
- 2024-10-17 MQ怎么确保不丢数据 mq防止数据丢失
- 2024-10-17 Java面试必备!RabbitMQ 常用知识点总结,纯手绘23张图带你拿下
- 2024-10-17 SpringBoot+RabbitMQ 实现 RPC 调用
- 2024-10-17 RabbitMQ消息更多细节 rabbitmq消息堆积怎么解决
- 2024-10-17 springboot+rabbitmq+消息发送确认
- 2024-10-17 Rabbitmq消费端实战 rabbitmq官网
- 2024-10-17 每日学习~RabbitMQ消息应答机制 rabbit mq五种消息模型
你 发表评论:
欢迎- 11-19零基础学习!数据分析分类模型「支持向量机」
- 11-19机器学习 | 算法笔记(三)- 支持向量机算法以及代码实现
- 11-19我以前一直没有真正理解支持向量机,直到我画了一张图
- 11-19研一小姑娘分享机器学习之SVM支持向量机
- 11-19[机器学习] sklearn支持向量机
- 11-19支持向量机
- 11-19初探支持向量机:用大白话解释、原理详解、Python实现
- 11-19支持向量机的核函数
- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)