计算机系统应用教程网站

网站首页 > 技术文章 正文

基于本地消息表实现MQ最终一致性 本地消息表(异步确保)

btikc 2024-10-17 08:46:52 技术文章 6 ℃ 0 评论

背景

目前公司项目对微服务之间调用都是基于MQ实现,后续项目会有越来越多的服务,考虑后续服务增多,服务之间出现的分布式事务是需要面对和解决的问题

案例分析

场景

是否一致性

订单处理成功,然后突然宕机,事务未提交,消息没有发送出去

一致性

订单处理成功,由于网络原因或者MQ宕机,消息没有发送出去,事务回滚

一致性

订单处理成功,消息发送成功,但是MQ由于其他原因,导致消息存储失败,事务回滚

一致性

订单处理成功,消息存储成功,但是MQ处理超时,从而ACK确认失败,导致发送方本地事务回滚

不一致

订单处理成功,MQ发送成功,而ACK确认失败,导致订单事务提交成功,MQ丢失

不一致

结论

从上面的情况分析,我们可以看到无法保证业务处理与消息发送两边的一致性,原因就在于:远程(RCP)调用,结果最终可能为成功、失败、超时;而对于超时的情况,处理方最终的结果可能是成功,也可能是失败,调用方是无法知晓的。

解决方案,利用本地消息表

最核心做法就是在执行业务操作的时候,记录一条消息数据到DB(每个服务本地创建一张消息表),并且消息数据的记录与业务数据的记录必须在同一个事务内完成,这是该方案的前提核心保障

在记录消息数据后,可以通过定时任务轮询消息表去投递MQ,也可以立即调用发送MQ方法,然后这个过程中可能存在消息投递失败的可能,可用定时任务补偿

核心:1每个服务创建一张通用消息表

2业务逻辑和记录消息在同一个事务

RabbitMQ实现方式

1.通用本地消息表

2.实现ConfirmCallback

实现ConfirmCallback接口后,rabbitMQ的ack结果会回调到confirm方法

@Component
@Slf4j
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MessageRecordRepository messageRecordRepository;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(correlationData==null){
            log.error("correlationData为空,ack结果:{},失败原因:{},消息唯一标识:{}",ack,cause,correlationData);
            return ;
        }
        String messageId = correlationData.getId();
        MessageRecord messageRecord = messageRecordRepository.getById(messageId);
        if(messageRecord==null){
            log.error("messageRecord为空,ack结果:{},失败原因:{},消息唯一标识:{}",ack,cause,correlationData);
            return ;
        }
        if(!ack){
            Integer count = null;
            if(Objects.nonNull(messageRecord.getRetriesCount())){
                count +=messageRecord.getRetriesCount();
            }
            messageRecordRepository.updateById(MessageRecord.builder()
                    .id(Long.valueOf(messageId))
                    .status(MqStateEnum.UN_ACK.getCode())
                    .retriesCount(count)
                    .build());
            log.error("ack失败,失败原因:{},消息唯一标识:{}",cause,correlationData.getId());
        }else{
            messageRecordRepository.updateById(MessageRecord.builder()
                    .id(Long.valueOf(messageId))
                    .status(MqStateEnum.ACK.getCode())
                    .build());
            log.info("ack结果成功,消息唯一标识:{}",correlationData.getId());
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息主体 message:{},交换器exchange:{},routing:{},replyText:{}",message,exchange,routingKey,replyText);
    }


}

3.具体代码使用

  1. 具体使用场景
  1. 预发送方法
  1. 提交MQ方法

优点

1.解决微服务调用事务最终一致性问题

2.可方便线上排查BUG,查询消息表可知消息有无发送,是否消费成功

3.能保证事务提交后再发MQ,避免发送方没来得及提交事务,异步的MQ消费太快出现数据不一致问题

方案缺点

1.消息表记录在DB,性能瓶颈依赖了数据库性能

2.使用方式变得稍微繁琐,需要调用两次才能发送MQ

3.重试次数多少?业务怎么去定义重试次数、重试太多怎么告警

补偿机制

可用定时任务补偿ack失败的消息,目前业务MQ的吞吐量不高,定时任务的重试机制可以考虑先不做,后续线上出错可hotfix补充

后续扩展

本地消息表的性能存在瓶颈,后续业务复杂或数据量大的情况下可采用消息服务的形式,通过部署多个消息服务节点解决性能问题。即将本地消息的实现抽出到一个公共消息服务。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表