网站首页 > 技术文章 正文
在目前开源的RockeMQ版本中,并不支持发送任意时间精度的延迟消息。有很多同学应该也会碰到这样的问题:如果要发送任意时间精度的延迟消息,该如何做?碰到这个问题其实就要从MQ的实现原理上找切入点,今天通过查资料,把这种操作简单落地了。
常见的解决方案
1.定时扫描
事先记录事件的触发时间点,定时任务不停查数据库对比触发时间。这种方式不实时,随着定时任务的执行频率变高,触发实时性会有所提升,但是频繁地扫描增加了数据库的压力,也是最简单的做法。
2.jdk的解决方案
jdk为我们提供的定时器Timer,延时队列DelayQueue。这种方式在单机,对可靠性要求不高的环境下是可以使用的,任务和队列都存在于jvm内存中,所以不支持分布式的环境,系统突然宕机后也无法恢复。
3.消息中间件的延时消息
生产者投递延时消息,消费者在规定时间后才能消费此消息,这样对于我们业务开发而言,只需要关注刚刚过期的那一条消息即可。
public class DelayProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + "Hello RocketMQ ,QuickStart 11" + i)
.getBytes()// body
);
//1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
// level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推
msg.setDelayTimeLevel(2);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
具体的做法是:
- 生产延迟消息:延迟消息由两部分组成--该笔消息的订单号key+业务数据value;
- 存储消息:当把延迟消息组装好之后,把该消息(key,value)放入redis中并设置一定的超时时间同时存入时间轮数据结构中;
- 取出消息:当该消息在时间轮数据结构中到期时,取出key,然后根据这个key去redis中取value;
- 通过RocketMQ的生产者线程,把消息发送出去,若发送成功,则把redis中该key删除;若是发送失败,则记录日志,人工补偿;
用到的几个重要的技术模块主要有三个:
1. HashedWheelTimer:存储消息的key,key到期时,自动弹出---起到一个定时器的作用;
2. Redis:将完整的延迟消息存储到内存中时,还把数据持久化到硬盘,当redis重启时,基本不丢数据;
3. RocketMQ:发送延迟消息;
这里有几个问题需要注意:
1. 当系统突然宕机,服务器重启后,时间轮HashedWheelTimer中的key都将消失,并且很难恢复,此时丢失的key对应在Redis中的value只能等待时间到期,这种情况怎么办,即数据丢失问题?也可以不使用Redis存储完整的消息,把完整的消息直接放入时间轮数据结构中或放入延迟队列DelayQueue中;用这种方式也会存在数据丢失的问题:即系统突然宕机,服务器重启后,未到期的数据都将丢失,因为对数据没有进行持久化;
2. 当key从HashedWheelTimer中取出后,根据该key在Redis中没取到数据,这种情况该怎么办,即数据不一致的问题?
3. 当消息到期后,用RocketMQ发送时,发送好几次都失败了,这时候除了记录日志,人工进行补偿之外,还有什么好的解决方案?--解决办法之一是:把这些发送失败的消息,存入数据库表中;然后启动一个定时任务,定时把发送失败的消息,通过RocketMQ再次发送出去,若发送成功,将该消息从数据库中删除;若这次还是发送失败,则下次定时任务执行时,再继续尝试发送。
这里的HashedWheelTimer可以用Delayqueue代替,它两相比较而言,HashedWheelTimer的时间复杂度比Delayqueue要好些。
解决思路后期其实可以重点放在不实用第三方的中间件上面,就是用默认的延时等级排列组合出你想要的任意时长,这也是一个重要的解决思路。
参考1:https://zhuanlan.zhihu.com/p/99373081
参考2:https://www.jianshu.com/p/2838890f3284
猜你喜欢
- 2025-01-23 记录:RocketMQ在使用上的一些排坑和优化
- 2025-01-23 消息队列选型(RabbitMq、RocketMq、Kafaka)
- 2025-01-23 Modbus新手教程(modbusrtu)
- 2025-01-23 性能调优篇:困扰我半年之久的RocketMQ timeout exception 被破解了
- 2025-01-23 RocketMQ - 如何实现顺序消息(rocketmq+-+如何实现顺序消息功能)
- 2025-01-23 10 张图 | 一文带你用 Mac M1 跑 RocketMQ
- 2025-01-23 RocketMQ - RocketMQ集群可视化的监控和管理
- 2025-01-23 RocketMQ中的线程池是如何创建的?
- 2025-01-23 手把手教你,从零开始搭建Spring Cloud Alibaba!这份笔记太牛了
- 2025-01-23 RocketMQ每秒要写入几十万并发,是怎么实现的?
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)