网站首页 > 技术文章 正文
背景
目前公司项目对微服务之间调用都是基于MQ实现,后续项目会有越来越多的服务,考虑后续服务增多,服务之间出现的分布式事务是需要面对和解决的问题
案例分析
场景 | 是否一致性 |
订单处理成功,然后突然宕机,事务未提交,消息没有发送出去 | 一致性 |
订单处理成功,由于网络原因或者MQ宕机,消息没有发送出去,事务回滚 | 一致性 |
订单处理成功,消息发送成功,但是MQ由于其他原因,导致消息存储失败,事务回滚 | 一致性 |
订单处理成功,消息存储成功,但是MQ处理超时,从而ACK确认失败,导致发送方本地事务回滚 | 不一致 |
订单处理成功,MQ发送成功,而ACK确认失败,导致订单事务提交成功,MQ丢失 | 不一致 |
结论
从上面的情况分析,我们可以看到无法保证业务处理与消息发送两边的一致性,原因就在于:远程(RCP)调用,结果最终可能为成功、失败、超时;而对于超时的情况,处理方最终的结果可能是成功,也可能是失败,调用方是无法知晓的。
解决方案,利用本地消息表
最核心做法就是在执行业务操作的时候,记录一条消息数据到DB(每个服务本地创建一张消息表),并且消息数据的记录与业务数据的记录必须在同一个事务内完成,这是该方案的前提核心保障。
在记录消息数据后,可以通过定时任务轮询消息表去投递MQ,也可以立即调用发送MQ方法,然后这个过程中可能存在消息投递失败的可能,可用定时任务补偿
核心:1每个服务创建一张通用消息表
2业务逻辑和记录消息在同一个事务
RabbitMQ实现方式
1.通用本地消息表
2.实现ConfirmCallback
实现ConfirmCallback接口后,rabbitMQ的ack结果会回调到confirm方法
@Component
@Slf4j
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageRecordRepository messageRecordRepository;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(correlationData==null){
log.error("correlationData为空,ack结果:{},失败原因:{},消息唯一标识:{}",ack,cause,correlationData);
return ;
}
String messageId = correlationData.getId();
MessageRecord messageRecord = messageRecordRepository.getById(messageId);
if(messageRecord==null){
log.error("messageRecord为空,ack结果:{},失败原因:{},消息唯一标识:{}",ack,cause,correlationData);
return ;
}
if(!ack){
Integer count = null;
if(Objects.nonNull(messageRecord.getRetriesCount())){
count +=messageRecord.getRetriesCount();
}
messageRecordRepository.updateById(MessageRecord.builder()
.id(Long.valueOf(messageId))
.status(MqStateEnum.UN_ACK.getCode())
.retriesCount(count)
.build());
log.error("ack失败,失败原因:{},消息唯一标识:{}",cause,correlationData.getId());
}else{
messageRecordRepository.updateById(MessageRecord.builder()
.id(Long.valueOf(messageId))
.status(MqStateEnum.ACK.getCode())
.build());
log.info("ack结果成功,消息唯一标识:{}",correlationData.getId());
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息主体 message:{},交换器exchange:{},routing:{},replyText:{}",message,exchange,routingKey,replyText);
}
}
3.具体代码使用
- 具体使用场景
- 预发送方法
- 提交MQ方法
优点
1.解决微服务调用事务最终一致性问题
2.可方便线上排查BUG,查询消息表可知消息有无发送,是否消费成功
3.能保证事务提交后再发MQ,避免发送方没来得及提交事务,异步的MQ消费太快出现数据不一致问题
方案缺点
1.消息表记录在DB,性能瓶颈依赖了数据库性能
2.使用方式变得稍微繁琐,需要调用两次才能发送MQ
3.重试次数多少?业务怎么去定义重试次数、重试太多怎么告警
补偿机制
可用定时任务补偿ack失败的消息,目前业务MQ的吞吐量不高,定时任务的重试机制可以考虑先不做,后续线上出错可hotfix补充
后续扩展
本地消息表的性能存在瓶颈,后续业务复杂或数据量大的情况下可采用消息服务的形式,通过部署多个消息服务节点解决性能问题。即将本地消息的实现抽出到一个公共消息服务。
猜你喜欢
- 2024-10-17 一次教会你如何解决RabbitMQ消息丢失问题
- 2024-10-17 RabbitMQ消息可靠性分析和应用 rabbitmq消息数据类型
- 2024-10-17 MQ怎么确保不丢数据 mq防止数据丢失
- 2024-10-17 Java面试必备!RabbitMQ 常用知识点总结,纯手绘23张图带你拿下
- 2024-10-17 SpringBoot+RabbitMQ 实现 RPC 调用
- 2024-10-17 RabbitMQ消息更多细节 rabbitmq消息堆积怎么解决
- 2024-10-17 springboot+rabbitmq+消息发送确认
- 2024-10-17 Rabbitmq消费端实战 rabbitmq官网
- 2024-10-17 每日学习~RabbitMQ消息应答机制 rabbit mq五种消息模型
- 2024-10-17 一文搞懂消息推送技术选型 消息推送的几种实现方式
你 发表评论:
欢迎- 11-19零基础学习!数据分析分类模型「支持向量机」
- 11-19机器学习 | 算法笔记(三)- 支持向量机算法以及代码实现
- 11-19我以前一直没有真正理解支持向量机,直到我画了一张图
- 11-19研一小姑娘分享机器学习之SVM支持向量机
- 11-19[机器学习] sklearn支持向量机
- 11-19支持向量机
- 11-19初探支持向量机:用大白话解释、原理详解、Python实现
- 11-19支持向量机的核函数
- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)