网站首页 > 技术文章 正文
继面试被问到如何保证RabbitMQ消息不丢失该怎么回答?我感觉是时候回顾下MQ的基础理论知识了,因为这些基础面试的时候也会经常被问到。上一期我们总结了下MQ基础理论(一)消息通信基本概念,我们来继续学习消息的持久化以及代码实现RabbitMQ通信。在正常生产环境运维过程中无法避免RabbitMQ服务器重启,那么,如果RabbitMQ重启之后,那些队列和交换器就会都消失了(随同里面都消息)。原因是每个队列和交换器都durable默认为false,他决定了RabbitMQ重启或宕机后需要重新建立交换器和队列,那么当我们将durable属性设置为true后,交换器和队列是不会重建了,但是消息却还会消失。
能从AMQP服务器崩溃中恢复的消息,我们称之为消息的的持久化。在发布消息前,通过把他的投递模式选项设置为2来把消息标记为持久化。Java中设置方式为channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 但是消息持久化会带来严重的性能问题,极大影响RabbitMQ的吞吐量,所以要适当取舍。
和消息持久化相关的一个概念就是AMQP事务。设置消息持久化确实很好,但是发布完持久化的消息并不会给生产者返回持久化信息,那么如果消息在持久化之前服务器宕机了,那么还是避免不了会发生消息丢失问题。这时就是事务发挥作用的时候了。但是事务仍然会降低大约2~10倍的吞吐量,而且会使生产者和应用程序产生同步,而我们使用RabbitMQ通信的目的就是要避免同步。所以,RabbitMQ团队有更好的方案来保证消息投递:发送方确认模式。当然你需要告诉Rabbit将信道设置成confirm模式。
下面我们用Java程序来介绍下RabbitMQ消息通信的事务模式和发送方确认模式
事务使用
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
- channel.txSelect()声明启动事务;
- channel.txComment()提交事务;
- channel.txRollback()回滚事务;
/**
* @Description:RabbitMq生产者、消费者测试类
* @Auther: 头条号【Java思享汇】
* @Date: 2020/6/30 14:57
*/
public class RabbitMqTest {
//消息队列名称
private final static String QUEUE_NAME = "hello";
@Test
public void send() throws Exception{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建消息通道
Channel channel = connection.createChannel();
try{
//声明事务
channel.txSelect();
//生成一个消息队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 5; i++) {
String message = "Hello World RabbitMQ count: " + i;
//发布消息,第一个参数表示路由(Exchange名称),未""则表示使用默认消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" RabbitMQ-send " + message + "'");
}
System.out.println(" RabbitMQ-send End......... ");
//事务提交
channel.txCommit();
}catch (Exception e){
//事务回滚
channel.txRollback();
}finally {
//关闭消息通道和连接
channel.close();
connection.close();
}
}
}
事务性能测试:
事务模式,结果如下:
- 事务模式,发送10w条数据,执行花费时间:11104ms
- 事务模式,发送10w条数据,执行花费时间:9827ms
- 事务模式,发送10w条数据,执行花费时间:12238ms
非事务模式,结果如下:
- 非事务模式,发送10w条数据,执行花费时间:6659ms
- 非事务模式,发送10w条数据,执行花费时间:4568ms
- 非事务模式,发送10w条数据,执行花费时间:4447ms
从上面可以看出,非事务模式的性能是事务模式的性能高2-3倍,我的MAC电脑测试是这样的结果,不同的电脑配置会有差异,但结论是一样的,事务模式的性能要差很多,那有没有既能保证消息的可靠性又能兼顾性能的解决方案呢?那就是接下来要讲的Confirm发送方确认模式。
扩展知识
我们知道,消费者可以使用消息自动或手动发送来确认消费消息,那如果我们在消费者模式中使用事务(当然如果使用了手动确认消息,完全用不到事务的),会发生什么呢?
消费者模式使用事务
假设消费者模式中使用了事务,并且在消息确认之后进行了事务回滚,那么RabbitMQ会产生什么样的变化?结果分为两种情况:
- autoAck=false手动应对的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但在确认消息会等事务的返回解决之后,在做决定是确认消息还是重新放回队列,如果你手动确认现在之后,又回滚了事务,那么以事务回滚为主,此条消息会重新放回队列;
- autoAck=true如果自定确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了;
Confirm发送方确认模式
Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
Confirm的三种实现方式:
- channel.waitForConfirms()普通发送方确认模式;
- channel.waitForConfirmsOrDie()批量确认模式;
- channel.addConfirmListener()异步监听发送方确认模式;
方式一:普通Confirm模式
@Test
public void sendConfirm() throws Exception{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建消息通道
Channel channel = connection.createChannel();
try{
//生成一个消息队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//开启发送方确认
channel.confirmSelect();
for (int i = 0; i < 5; i++) {
String message = "Hello World RabbitMQ count: " + i;
System.out.println("RabbitMQ-send " + message + "'");
//发布消息,第一个参数表示路由(Exchange名称),未""则表示使用默认消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//发送消息确认
if(channel.waitForConfirms()){
System.out.println(" RabbitMQ-confirm " + message + "'");
}
}
System.out.println(" RabbitMQ-send End......... ");
}catch (Exception e){
}finally {
//关闭消息通道和连接
channel.close();
connection.close();
}
}
方式二:批量Confirm模式
@Test
public void sendConfirm() throws Exception{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建消息通道
Channel channel = connection.createChannel();
try{
//生成一个消息队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//开启发送方确认
channel.confirmSelect();
for (int i = 0; i < 5; i++) {
String message = "Hello World RabbitMQ count: " + i;
System.out.println("RabbitMQ-send " + message + "'");
//发布消息,第一个参数表示路由(Exchange名称),未""则表示使用默认消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//发送消息确认
channel.waitForConfirmsOrDie();
System.out.println(" RabbitMQ-send End......... ");
}catch (Exception e){
System.out.println(" RabbitMQ-send error......... ");
}finally {
//关闭消息通道和连接
channel.close();
connection.close();
}
}
使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。
方式三:异步Confirm模式
@Test
public void sendConfirm3() throws Exception{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建消息通道
Channel channel = connection.createChannel();
try{
//生成一个消息队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//开启发送方确认
channel.confirmSelect();
for (int i = 0; i < 100; i++) {
String message = "Hello World RabbitMQ count: " + i;
System.out.println("RabbitMQ-send " + message + "'");
//发布消息,第一个参数表示路由(Exchange名称),未""则表示使用默认消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
}
});
System.out.println(" RabbitMQ-send End......... ");
Thread.sleep(5000);
}catch (Exception e){
System.out.println(" RabbitMQ-send error......... ");
}finally {
//关闭消息通道和连接
channel.close();
connection.close();
}
}
异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可,以上异步返回的信息如下:
可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。
总结
综合总体测试情况来看:Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。最后,附上消费消息代码,其中确认模式为手动确认。
@Test
public void consumer() throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建消息信道
Channel channel = connection.createChannel();
//消息队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//消费者用于获取消息信道绑定的消息队列中的信息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println("RabbitMQ-consumer '" + message);
System.out.println("RabbitMQ-consumer:consumerTag '" + consumerTag+",envelope="+envelope+",properties="+properties);
} finally {
System.out.println("RabbitMQ-consumer End...... ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
不断分享开发过程用到的技术和面试经常被问到的问题,如果您也对IT技术比较感兴趣可以「关注」我
猜你喜欢
- 2024-10-17 一次教会你如何解决RabbitMQ消息丢失问题
- 2024-10-17 RabbitMQ消息可靠性分析和应用 rabbitmq消息数据类型
- 2024-10-17 基于本地消息表实现MQ最终一致性 本地消息表(异步确保)
- 2024-10-17 MQ怎么确保不丢数据 mq防止数据丢失
- 2024-10-17 Java面试必备!RabbitMQ 常用知识点总结,纯手绘23张图带你拿下
- 2024-10-17 SpringBoot+RabbitMQ 实现 RPC 调用
- 2024-10-17 RabbitMQ消息更多细节 rabbitmq消息堆积怎么解决
- 2024-10-17 springboot+rabbitmq+消息发送确认
- 2024-10-17 Rabbitmq消费端实战 rabbitmq官网
- 2024-10-17 每日学习~RabbitMQ消息应答机制 rabbit mq五种消息模型
你 发表评论:
欢迎- 11-19零基础学习!数据分析分类模型「支持向量机」
- 11-19机器学习 | 算法笔记(三)- 支持向量机算法以及代码实现
- 11-19我以前一直没有真正理解支持向量机,直到我画了一张图
- 11-19研一小姑娘分享机器学习之SVM支持向量机
- 11-19[机器学习] sklearn支持向量机
- 11-19支持向量机
- 11-19初探支持向量机:用大白话解释、原理详解、Python实现
- 11-19支持向量机的核函数
- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)