计算机系统应用教程网站

网站首页 > 技术文章 正文

新手如何使用RocketMQ发送任意时长延迟消息?

btikc 2025-01-23 15:32:49 技术文章 12 ℃ 0 评论

在目前开源的RockeMQ版本中,并不支持发送任意时间精度的延迟消息。有很多同学应该也会碰到这样的问题:如果要发送任意时间精度的延迟消息,该如何做?碰到这个问题其实就要从MQ的实现原理上找切入点,今天通过查资料,把这种操作简单落地了。



常见的解决方案

1.定时扫描

事先记录事件的触发时间点,定时任务不停查数据库对比触发时间。这种方式不实时,随着定时任务的执行频率变高,触发实时性会有所提升,但是频繁地扫描增加了数据库的压力,也是最简单的做法。

2.jdk的解决方案

jdk为我们提供的定时器Timer,延时队列DelayQueue。这种方式在单机,对可靠性要求不高的环境下是可以使用的,任务和队列都存在于jvm内存中,所以不支持分布式的环境,系统突然宕机后也无法恢复。

3.消息中间件的延时消息

生产者投递延时消息,消费者在规定时间后才能消费此消息,这样对于我们业务开发而言,只需要关注刚刚过期的那一条消息即可。

public class DelayProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        try {
            for (int i = 0; i < 3; i++) {
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        (new Date() + "Hello RocketMQ ,QuickStart 11" + i)
                                .getBytes()// body
                );
                //1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
                // level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推
                msg.setDelayTimeLevel(2);
 
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
       producer.shutdown();
    }
}

具体的做法是:

  1. 生产延迟消息:延迟消息由两部分组成--该笔消息的订单号key+业务数据value;
  2. 存储消息:当把延迟消息组装好之后,把该消息(key,value)放入redis中并设置一定的超时时间同时存入时间轮数据结构中;
  3. 取出消息:当该消息在时间轮数据结构中到期时,取出key,然后根据这个key去redis中取value;
  4. 通过RocketMQ的生产者线程,把消息发送出去,若发送成功,则把redis中该key删除;若是发送失败,则记录日志,人工补偿;

用到的几个重要的技术模块主要有三个:

1. HashedWheelTimer:存储消息的key,key到期时,自动弹出---起到一个定时器的作用;

2. Redis:将完整的延迟消息存储到内存中时,还把数据持久化到硬盘,当redis重启时,基本不丢数据;

3. RocketMQ:发送延迟消息;

这里有几个问题需要注意:

1. 当系统突然宕机,服务器重启后,时间轮HashedWheelTimer中的key都将消失,并且很难恢复,此时丢失的key对应在Redis中的value只能等待时间到期,这种情况怎么办,即数据丢失问题?也可以不使用Redis存储完整的消息,把完整的消息直接放入时间轮数据结构中或放入延迟队列DelayQueue中;用这种方式也会存在数据丢失的问题:即系统突然宕机,服务器重启后,未到期的数据都将丢失,因为对数据没有进行持久化;

2. 当key从HashedWheelTimer中取出后,根据该key在Redis中没取到数据,这种情况该怎么办,即数据不一致的问题?

3. 当消息到期后,用RocketMQ发送时,发送好几次都失败了,这时候除了记录日志,人工进行补偿之外,还有什么好的解决方案?--解决办法之一是:把这些发送失败的消息,存入数据库表中;然后启动一个定时任务,定时把发送失败的消息,通过RocketMQ再次发送出去,若发送成功,将该消息从数据库中删除;若这次还是发送失败,则下次定时任务执行时,再继续尝试发送。

这里的HashedWheelTimer可以用Delayqueue代替,它两相比较而言,HashedWheelTimer的时间复杂度比Delayqueue要好些。


解决思路后期其实可以重点放在不实用第三方的中间件上面,就是用默认的延时等级排列组合出你想要的任意时长,这也是一个重要的解决思路。


参考1:https://zhuanlan.zhihu.com/p/99373081

参考2:https://www.jianshu.com/p/2838890f3284

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表