前言
- 之前我们有提到如何保证rabbitmq消息不丢失。分别从三个角度解析了。分别是发送方、rabbitmq、消费方。
- 当时有关消费方只是简单带过了介绍。今天我们从一个使用场景来分析下消费者确认消费带来的坑
发送消息
- 这里我们还是继续沿用之前的发送逻辑。
public Map<String, Object> sendMessage(Map<String, Object> params) throws UnsupportedEncodingException {
Map<String, Object> resultMap = new HashMap<String, Object>(){
{
put("code", 200);
}
};
String msg = "";
Integer index = 0;
if (params.containsKey("msg")) {
msg = params.get("msg").toString();
}
if (params.containsKey("index")) {
index = Integer.valueOf(params.get("index").toString());
}
if (index != 0) {
//这里开始模拟异常出现。消息将会丢失
int i = 1 / 0;
}
Map<String, Object> map = new HashMap<>();
map.put("msg", msg);
Message message= MessageBuilder.withBody(JSON.toJSONString(map).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", message,data);
return resultMap;
}
复制代码
- 首先我们在发送里面还是会保留异常情况,这是为了之前测试发送消息确认的操作。笔者这里偷个懒就没有删除 。 本次我们在调用接口会始终保证消息投递的准确性。因为我们的重点是消费者
- 因为rabbitmq有三种确认机制acknowledge-mode ; 分别是manual、auto、none; manual就是需要我们手动确认,auto标识自动确认消息,none就是不作为
@RabbitListener(queues = RabbitConfig.QUEUEFIRST)
@Async("asyncExecutor")
public void handler(Message msg, Channel channel) {
//channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
byte[] body = msg.getBody();
String messages = new String(body);
JSONObject json = (JSONObject) JSONObject.parse(messages);
if ("1".equals(json.getString("msg"))) {
try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
if ("2".equals(json.getString("msg"))) {
throw new RuntimeException("异常。。。。。");
}
log.info(RabbitConfig.QUEUEFIRST+"队列中消费的信息:"+msg);
}
复制代码
- 在接受消息上我们根据发送过来的消息做了处理,当收到消息体为1的我们会进行消息确认,如何消息体是2则会抛出异常也就是不进行消息确认。只要我们消息不确认那么rabbitmq就会保存消息并尝试再次发送给消费者。
场景描述
- 深夜突然接到电话说线上数据不同了。上面我们说的是mq的消费问题,聪明的读者肯定知道这个问题肯定是mq的消费问题。而对于我来说一开始很闷逼的。项目上线已经三天了,为什么偏偏是这个时候功能不正常了呢?
- 于是我打开项目开始线上定位,首先看了下日志一看才发现项目在处理mq的那段逻辑在疯狂的报错。心中开始窃喜这么容易就找到问题所在了。但是随着问题的深入发现报错并不是导致线上故障的根本原因。因为线上的现象是数据无法同步。而同步的关键是监听mq的消息从而实现同步。但是现在的问题是通过日志看根本就无法接受到消息,而且进入mq后台页面看到相关的队列全部堵塞在那里了。
- 这里我们稍微总结下:mq处理逻辑疯狂报错mq无法接受其它数据
问题剖析
- 因为这是上线三天后造成的,所以我很肯定是业务逻辑是正常的,否则根本就无法通过测试。那么我该考虑的是为什么队列对堵塞呢?而本次上线的确对mq做了稍微的改动,就是增加了消息的手动确认。
Unexpected exception occurred invoking async method: public void xxxxxxxxxxxxxxxx(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)
java.lang.IllegalStateException: Channel closed; cannot ack/nack
复制代码
- 这是线上报错之一,另外一个报错是业务里的报错和我们无关。通过上面的报错信息我们能够摘取到以下async exceptionchannel cloesed , cannot ack/nack
- 一个是异步处理中出现异常,导致async exception ; 因为异步处理出错当到达确认时候就会出现not ack
- 这里贴出一个博主针对not ack的解决方案,针对我们本次场景并不适用
- 我们在发送msg=2的时候就会发生异常。这个其实还是很好处理的,将mq接受消息的地方做个兼容处理也就是全局捕获异常。
- 到了这里我们只是在处理上面第一个问题----mq处理疯狂报错;很明显这并没有解决我们的根本问题--为什么mq消息发生拥堵
mq批处理设置
消费者在开启acknowledge的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。
然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接收来自队列的消息。在这种场景下,我们可以通过设置basic.qos信令中的prefetch_count来达到这种效果
- 我们可以看到mq的默认prefetch_count是250 。 这个设置也是我们本次mq拥堵的根本原因。
- 也就是说在我们遇到一次报错导致没有有效消息确认时,我们消费者和mq之间channel就会占用一条消息。知道250条数据都被无法确认的消息沾满后,正常的数据也就不会在下发到该消费者了。因为prefetch_count是消费者的上限。
- 这就造成了死循环了。250条我们无法确认的消息无法消费,我们就无法获取新的消息。
- 这也就是了为什么系统刚上线的时候没有问题,因为刚开始我们可以接受消息没有确认并不影响我们处理正确数据只是效率慢了一点。随着时间的推移慢慢的错误数据越来越多导致我们最终拥堵。
解决办法
- 处理掉异常业务,并且捕获异常保证消息确认
- 消息确认最好使用自动确认或者将消息确认放在前面。
总结
- 当rabbitmq要将队列中的一条消息投递给消费者时,会遍历该队列上的消费者列表,选一个合适的消费者,然后将消息投递出去。其中挑选消费者的一个依据就是看消费者对应的channel上未ack的消息数是否达到设置的prefetch_count个数,如果未ack的消息数达到了prefetch_count的个数,则不符合要求。当挑选到合适的消费者后,中断后续的遍历
原文链接:https://juejin.cn/post/7056574438849904654
本文暂时没有评论,来添加一个吧(●'◡'●)