引言
大家在使用RabbitMQ的过程中一定考虑过一个问题,生产者生产的消息是否成功发送到了MQ服务器?消费者消费到消息并处理业务逻辑如何告诉服务器消费成功了,如果业务逻辑处理失败了想把消息放回服务器又该怎么办呢?今天这篇文章会带着你一一解决这些问题!
一、生产与消费确认模式
确认模式分别为生产者发送消息是否得到保证,消费者消费消息之后如何进行回应,即无论生产消息或者消费消息我们都希望可以得到一个具体的反馈。
生产者确认
Confirm模式:即及时应答模式,生产者发送一条,MQ服务器进行响应表示收到确认。其特点是异步,服务器确认之前不影响其他消息的生产。
TX事务模式:基于AMPQ协议;可以让信道设置成一个带事务的信道,分为三步:开启事务、提交事务、事务回滚。其特点是同步模式,事务提交之前不能继续发送消息
消费者确认
自动确认:表示消费到一条消息立即给服务器表示已经成功消费到了消息。其特点是消费确认了服务器就会将消息移除;设置代码为autoAck为true即可
手动确认:消费消息后会将消费结果反馈给服务器,其特点是服务器会根据消费结果进行删除,消费一条对应删除一条;如果消费失败则可以反馈给服务器将消息重返回队列,也可以直接删除。
二、生产者代码示例
下面根据具体的代码示例说明生产者分别有哪些方案可以确保消息成功发送!
Confirm模式
配置文件
#生产者发送消息的确认方式:correlated表示触发回调,以确定是否成功发送到了交换机
publisher-confirm-type: correlated
#交换机信息是否成功发送到了队列
publisher-returns: true
确认配置代码
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息是否成功发送到 Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("【发送消息到 Exchange】- 成功了。correlationData:${}#34;, correlationData);
} else {
log.info("【发送消息到 Exchange】- 失败了。correlationData ID:${}$,returned:${}$,cause:${}#34;, correlationData, null != correlationData ? correlationData.getReturnedMessage() : "空", cause);
}
});
// 触发 setReturnCallback 回调必须设置 mandatory=true, 否则 Exchange 没有找到 Queue 就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从 Exchange 路由到 Queue, 注意: 这是一个失败回调, 只有消息从 Exchange 路由到 Queue 失败才会回调这个方法
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
log.info("【发送消息到 queue】- 失败了,返回消息回调:{} 应答代码:{} 回复文本:{} 交换器:{} 路由键:{}", message, replyCode, replyText, exchange, routingKey);
});
return rabbitTemplate;
}
测试(修改一个不存在的队列名称)
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot1.queue","mq hello");
测试结果如下:
测试(修改一个不存在的交换机)
rabbitTemplate.convertAndSend("Bad.EXCHANGE_NAME","boot.queue","mq hello");
测试结果如下:
TX事务模式
通过手动添加事务的模式进行对生产者生产消息进行控制,达到消息有效确认的效果。
代码示例
public void SendMessageByTransaction() throws IOException, TimeoutException {
final ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
final Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(true);
try {
channel.txSelect();
channel.basicPublish(RabbitMQConfig.EXCHANGE_NAME, "boot.haha", MessageProperties.TEXT_PLAIN, "tx to mq".getBytes());
int i = 1 / 0;
channel.txCommit();
log.error("消息生产成功,事务提交");
} catch (Exception e) {
log.error("消息生产失败,消息回滚 {}",e.getMessage());
channel.txRollback();
}finally {
channel.close();
connection.close();
}
}
测试出现异常信息为消息回滚 / by zero,则消息生产失败进行其他逻辑处理否则消息提交成功,从而达到消息确认的目的。
三、消费者示例
消息确认
消息消费确认分为自动确认与手动确认分别如下:
自动确认:该模式只需要配置acknowledge-mode:auto即可完成,该模式是服务器认为消费者已经消费后立即将消息移除掉,此模式均为默认设置。
手动确认:
配置文件设置
listener:
simple:
# ACK 模式,此处选择手动 ACK
acknowledge-mode: manual
# 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列
default-requeue-rejected: false
代码示例
@RabbitListener(queues = "boot_queue")
public void listenerQueue(Message message, Channel channel) throws Exception{
try {
log.info("成功接收消息并处理逻辑正确则反馈给服务器");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("主动抛出异常");
throw new Exception();
}catch (Exception e){
log.info("捕捉异常信息并将消息重回队列");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
测试(主动抛出异常)
测试正确结果
总结
关于生产者消息的确认中,事务提交模式虽然能百分之百提交,但具体消息是否真正到了队列是不可知的而且消耗性能;而Confirm模式则能很好的捕捉消息去向。关于消费者确认则根据自动与手动的区别得出结论,自动虽然能少写很多代码,但是需要在确认消息之前做一些逻辑处理是不能的,而手动确认则更加灵活。
本文暂时没有评论,来添加一个吧(●'◡'●)