哈喽大家好,我是阿Q!
上文,我们已经完成了SpringBoot快速集成RabbitMQ的小Demo,本文我们来聊一下RabbitMQ为了防止消息丢失,增加的消息确认机制:生产者消息确认机制和消费者消息确认机制。
确认机制
一、生产者消息确认机制
- 在yml中增加配置信息
spring:
rabbitmq:
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
spring.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
- 增加回调
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启 Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("ReturnCallback: "+"消息:"+returned.getMessage());
System.out.println("ReturnCallback: "+"回应码:"+returned.getReplyCode());
System.out.println("ReturnCallback: "+"回应信息:"+returned.getReplyText());
System.out.println("ReturnCallback: "+"交换机:"+returned.getExchange());
System.out.println("ReturnCallback: "+"路由键:"+returned.getRoutingKey());
}
});
return rabbitTemplate;
}
- confirm机制是只保证消息到达exchange,并不保证消息可以路由到正确的queue
- 当前的exchange不存在或者指定的路由key路由不到才会触发return机制
大家可以自行演示以下情况的执行结果:
- 不存在交换机和队列
- 存在交换机,不存在队列
- 消息推送成功
二、消费者消息的确认机制
默认情况下如果一个消息被消费者正确接收则会从队列中移除。如果一个队列没被任何消费者订阅,那么这个队列中的消息会被缓存,当有消费者订阅时则会立即发送,进而从队列中移除。
消费者消息的确认机制可以分为以下3种:
- 自动确认
AcknowledgeMode.NONE 默认为自动确认,不管消费者是否成功处理了消息,消息都会从队列中被移除。
- 根据情况确认
AcknowledgeMode.AUTO 根据方法的执行情况来决定是否确认还是拒绝(是否重新入队列)
- 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
- 当抛出AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且消息不会重回队列
- 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
- 其他的异常,则消息会被拒绝,并且该消息会重回队列,如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的。可以通过 setDefaultRequeueRejected(默认是true)去设置
可能造成消息丢失,一般是需要我们在try-catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
- 手动确认
AcknowledgeMode.MANUAL对于手动确认,也是我们工作中最常用到的,它的用法如下:
/*
* 肯定确认
* deliveryTag:消息队列数据的唯一id
* multiple:是否批量
* true :一次性确认所有小于等于deliveryTag的消息
* false:对当前消息进行确认;
*/
channel.basicAck(long deliveryTag, boolean multiple);
/*
* 否定确认
* multiple:是否批量
* true:一次性拒绝所有小于deliveryTag的消息
* false:对当前消息进行确认;
* requeue:被拒绝的是否重新入列,
* true:就是将数据重新丢回队列里,那么下次还会消费这消息;
* false:就是拒绝处理该消息,服务器把该消息丢掉即可。
*/
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);
/*
* 用于否定确认,但与basicNack相比有一个限制,一次只能拒绝单条消息
*/
channel.basicReject(long deliveryTag, boolean requeue);
手动确认
在yml配置中开启手动确认模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
或者在代码中开启
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MQReciever mqReciever;//消息接收处理类
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//并发使用者的数量
container.setConcurrentConsumers(1);
//消费者人数上限
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
//设置一个队列,此处支持设置多个
container.setQueueNames("directQueue");
container.setMessageListener(mqReciever);
return container;
}
}
消息消费类
@Component
@RabbitListener(queues = "directQueue")//监听队列名称
public class MQReciever implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = message.toString();
String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
System.out.println("消费的消息内容:"+msgArray[1]);
System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
//业务处理
......
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//拒绝重新入队列
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
无ack:效率高,存在丢失大量消息的风险;有ack:效率低,不会丢消息。
作者:阿Q说代码
链接:https://juejin.cn/post/7063259228898590750
本文暂时没有评论,来添加一个吧(●'◡'●)