网站首页 > 技术文章 正文
前言
共享订阅是MQTT 5.0协议引入的新特性,相当于订阅端的负载均衡功能。
一般的非共享订阅的消息发布流程是这样的:
在这种结构下,如果订阅节点发生故障,就会导致发布者的消息丢失(QoS 0)或者堆积在Server中(QoS 1,2)。一般情况下,解决这个问题的办法都是直接增加订阅节点,但这样又产生了大量的重复消息,不仅浪费性能,在某些业务场景下,订阅节点还需要自行去重,进一步增加了业务的复杂度。
其次,当发布者的生产能力较强时,可能会出现订阅者的消费能力无法及时跟上的情况,此时只能由订阅者自行实现负载均衡来解决,又一次增加了用户的开发成本。
协议规范
现在,在MQTT 5.0协议中,你可以通过共享订阅特性解决上面提到的问题。当你使用共享订阅时,消息的流向就会变为:
同非共享订阅一样,共享订阅包含一个主题过滤器和订阅选项,唯一的区别在于共享订阅的主题过滤器格式必须是$share/{ShareName}/{filter}这种形式。这几个的字段的含义分别是:
- $share 前缀表明这将是一个共享订阅
- {ShareName}是一个不包含"/","+"以及"#"的字符串。订阅会话通过使用相同的 {ShareName}表示共享同一个订阅,匹配该订阅的消息每次只会发布给其中一个会话
- {filter}即非共享订阅中的主题过滤器
需要注意的是,如果服务端正在向其选中的订阅端发送QoS 2消息,并且在分发完成之前网络中断,服务端会在订阅端重新连接时继续完成该消息的分发。如果订阅端的会话在其重连之前终止,服务!端将丢弃该消息而不尝试发送给其他订阅端。如果是QoS 1消息,服务端可以等订阅端重新连接之后继续完成分发,也可以在订阅端断开连接时就立即尝试将消息分发给其他订阅端,MQTT协议没有强制规定,因此需要视服务器的具体实现而定。但如果在等待订阅端重连期间其会话终止,服务端则会将消息尝试发送给其他订阅端。
共享策略
虽然共享订阅使得订阅端能够负载均衡地消费消息,但MQTT协议并没有规定Server应当使用什么负载均衡策略。作为参考,EMQX提供了random,round_robin,sticky,hash四种策略供用户自行选择。
- random:在所有共享订阅会话中随机选择一个发送消息
- round_robin:按照订阅顺序轮流选择
- sticky:使用random策略随机选择一个订阅会话,持续使用至该会话取消订阅或断开连接再重复这一流程
- hash:对发送者的Client ID进行hash操作,根据hash结果选择订阅会话
效果演示
最后,我们通过一个综合性的示例来演示共享订阅的效果。
服务端使用emqx-v3.2.4,客户端使用emqtt,emqx的共享订阅分发策略为默认的random:broker.shared_subscription_strategy=random
使用./emqxstart 启动emqx,然后使用emqtt启动三个订阅客户端,分别订阅 $share/a/topic, $share/a/topic, $share/b/topic
启动一个发布客户端,向topic主题发布消息。
$share/a/topic 与$share/b/topic属于不同的会话组,非共享订阅主题topic会在所有的会话组中进行负载均衡。客户端sub3因为组内只有自己一个会话,所以收到了所有消息,而客户端sub1与sub2则是遵循我们配置的random策略随机接收消息。
猜你喜欢
- 2024-10-12 EMQ百万级MQTT消息服务(小技巧) mqtt消息级别
- 2024-10-12 开源物联网MQTT 5.0服务器——EMQX服务集群搭建教程
- 2024-10-12 EMQX+阿里云飞天洛神云网络NLB:MQTT消息亿级并发 千万级吞吐性能
- 2024-10-12 QUIC 协议:特性、应用场景及其对物联网/车联网的影响
- 2024-10-12 Nordic助力智能插头以蜂窝物联网和低功耗蓝牙提供电器远程能源管理
- 2024-10-12 Windows环境下安装配置Mosquitto服务及入门操作介绍
- 2024-10-12 EMQX——架构设计解析 im 架构设计
- 2024-10-12 两款常用的 MQTT 调试工具 mqtt客户端调试工具
- 2024-10-12 物联网MQTT在电动车共享充电项目中的应用
- 2024-10-12 MQTT消息服务器 emqttd mqtt的消息类型有几种?
你 发表评论:
欢迎- 最近发表
-
- 在 Spring Boot 项目中使用 activiti
- 开箱即用-activiti流程引擎(active 流程引擎)
- 在springBoot项目中整合使用activiti
- activiti中的网关是干什么的?(activiti包含网关)
- SpringBoot集成工作流Activiti(完整源码和配套文档)
- Activiti工作流介绍及使用(activiti工作流会签)
- SpringBoot集成工作流Activiti(实际项目演示)
- activiti工作流引擎(activiti工作流引擎怎么用)
- 工作流Activiti初体验及在数据库中生成的表
- Activiti工作流浅析(activiti6.0工作流引擎深度解析)
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)