计算机系统应用教程网站

网站首页 > 技术文章 正文

MQTT原理学习总结 mqtt协议原理图

btikc 2024-10-11 11:32:41 技术文章 7 ℃ 0 评论

简介

MQTT(消息队列遥测传输)英文全称: Message Queue Telemetry Transport

MQTT是一种轻量级的发布-订阅消息传递协议,它可能最适合各种物联网设备。你可以在此处找到有关MQTT的更多信息。

MQTT 协议可以为大量的低功率、工作网络环境不可靠的物联网设备提供通讯保障。而它的应用范围也不仅如此,在移动互联网领域也大有作为:很多 Android App 的推送功能,都是基于 MQTT 实现的,也有一些 IM的实现,是基于 MQTT 的。

特点

简单的来说 MQTT 协议有以下特性:

  • 基于 TCP 协议的应用层协议;
  • 采用 C/S 架构;
  • 使用订阅/发布模式,将消息的发送方和接受方解耦;
  • 提供 3 种消息的 QoS(Quality of Service): 至多一次,最少一次,只有一次;
  • 收发消息都是异步的,发送方不需要等待接收方应答。

Client和Broker建立连接


CONNECT

连接的建立由 Client 端发起,Client 端首先向 Broker 发送一个 CONNECT 数据包,CONNECT 数据包包含以下内容(这里我们略过 Fixed header)。

可变头(Variable header)

在 CONNECT 数据包可变头中,含有以下信息。

  • 协议名称(Protocol Name):值固定为字符 “MQTT”。
  • 协议版本(Protocol Level):对 MQTT 3.1.1 来说,值为 4。
  • 用户名标识(User Name Flag):消息体中是否有用户名字段,1bit,0 或者 1。
  • 密码标识(Password Flag):消息体中是否有密码字段,1bit,0 或者 1。
  • 遗愿消息 Retain 标识(Will Retain):标识遗愿消息是否是 Retain 消息,1bit,0 或者 1。
  • 遗愿消息 QOS 标识(Will QOS):标识遗愿消息的 QOS,2bit,0、1 或者 2。
  • 遗愿标识(Will Flag):标识是否使用遗愿消息,1bit,0 或者 1。
  • 会话清除标识(Clean Session):标识 Client 是否建立一个持久化的会话,1bit,0 或者 1,当 CleanSession 的标识设为 0 时,代表 Client 希望建立一个持久会话的连接,Broker 将存储该 Client 订阅的主题和未接受的消息,否则 Broker 不会存储这些数据,同时在建立连接时清除这个 Client 之前存在的持久化会话所保存的数据。
  • 连接保护(Keep Alive): 设置一个单位为秒的时间间隔,Client 和 Broker 之间在这个时间间隔之内需要至少有一次消息交互,否则 Client 和 Broker 会认为它们之间的连接已经断开。

消息体(Payload)

CONNECT 数据包的消息体中包含以下数据。

  • 客户端标识符(Client Identifier):Client Identifier 是用来标识 Client 身份的字段,在 MQTT 3.1.1 的版本中,这个字段的长度是 1 到 23 个字节,而且只能包含数字和 26 个字母(包括大小写),Broker 通过这个字段来区分不同的 Client。所以在连接的时候,Client 应该保证它的 Identifier 是唯一的,通常我们可以使用比如 UUID,唯一的设备硬件标识,或者 Android 设备的 DEVICE_ID 等作为 Client Identifier 的取值来源。 MQTT 协议中要求 Client 连接时必须带上 Client Identifier,但是也允许 Broker 在实现时 Client Identifier为空,这时 Broker 会为 Client 分配一个内部唯一的 Identifier。如果你需要使用持久化会话,那就必须自己为Client 设定一个唯一的 Identifier。
  • 用户名(Username):如果可变头中的用户名标识设为 1,那么消息体中将包含用户名字段,Broker 可以使用用户名和密码来对接入的 Client 进行验证,只允许已授权的 Client 接入。注意不同的 Client 需要使用不同的 Client Identifier,但它们可以使用同样的用户名和密码进行连接。
  • 密码(Password):如果可变头中的密码标识设为 1,那么消息体中将包含密码字段。
  • 遗愿主题(Will Topic):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿主题,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布遗愿消息。
  • 遗愿消息(Will Message):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿消息,当 Client 非正常的中断连接的时候,Broker 将向指定的遗愿主题中发布由该字段指定的内容。

CONNACK

当 Broker 收到 Client 的 CONNECT 数据包之后,将检查并校验 CONNECT 数据包的内容,之后回复 Client一个 CONNACK 数据包。

CONNACK 数据包包含以下内容(这里我们略过 Fixed header)。

可变头(Variable header)

CONNACK 数据包的可变头中,含有以下信息。

  • 会话存在标识(Session Present Flag):用于标识在 Broker 上,是否已存在该 Client(用 Client Identifier区分)的持久性会话,1bit,0 或者 1。当 Client 在连接时设置 Clean Session=1,则 CONNACK 中的Session Present Flag 始终为 0;当 Client 在连接时设置 Clean Session=0,那么就有两种情况——如果Broker 上面保存了这个 Client 之前留下的持久性会话,那么 CONNACK 中的 Session Present Flag 值为 1;如果 Broker 没有保存该 Client 的任何会话数据,那么 CONNACK 中的 Session Present Flag 值为 0。Session Present Flag 这个特性是在 MQTT 3.1.1 版本中新加入的,之前的版本中并没有这个标识。
  • 连接返回码(Connect Return code):用于标识 Client 是 Broker 的连接是否建立成功,连接返回码有以下一些值:

Return Code连接状态0连接已建立1连接被拒绝,不允许的协议版本2连接被拒绝,Client Identifier 被拒绝3连接被拒绝,服务器不可用4连接被拒绝,错误的用户名或密码5连接被拒绝,未授权

这里重点讲一下 Return Code 4 和 5。

Return Code 4 在 MQTT 协议中的含义是 Username 和 Password 的格式不正确,但是在大部分的 Broker 实现中,在使用错误的用户名密码时,得到的返回码也是 4。所以这里我们认为 4 就是代表错误的用户名或密码。

Return Code 5 一般在 Broker 不使用用户名和密码而使用 IP 地址或者 Client Identifier 进行验证的时候使用,来标识 Client 没有通过验证。

注意: Return Code 2 代表的是 Client Identifier 格式不规范,比如长度超过 23 个字符,包含了不允许的字符等(部分 Broker 的实现在协议标准上做了扩展,比如允许超过 23 个字符的 Client Identifer 等)。

消息体(Payload)

CONNACK 没有消息体。

当 Client 向 Broker 发送 CONNECT 数据包并获得 Return Code 为 0 的 CONNACK 包后,就代表连接建立 成功,可以发布和接受消息了 .

关闭连接

Client主动关闭

向 Broker 发送一个 DISCONNECT 数据包就可以了。 DISCONNECT 数据包没有可变头(Variable header)和消息体(Payload)。在 Client 发送完DISCONNECT 之后,就可以关闭底层的 TCP 连接了,不需要等待 Broker 的回复(Broker 也不会对DISCONNECT 数据包回复)。

为什么需要在关闭 TCP 连接之前,发送一个和 Broker 没有交互的 DISCONNECT数据包,而不是直接关闭底层的 TCP 连接?这涉及到 MQTT 协议的一个特性,Broker 需要判断 Client 是否正常地断开连接:

当 Broker 收到 Client 的 DISCONNECT 数据包的时候,它认为 Client 是正常地断开连接,那么它会丢弃当前连接指定的遗愿消息(Will Message)。如果 Broker 检测到 Client 连接丢失,但又没有收到DISCONNECT 消息包,它会认为 Client 是非正常断开连接,就会向在连接的时候指定的遗愿主题(WillTopic)发布遗愿消息(Will Message)

Broker主动关闭

MQTT 协议规定 Broker 在没有收到 Client 的 DISCONNECT 数据包之前都应该保持和 Client 连接,只有Broker 在 Keep Alive 的时间间隔里,没有收到 Client 的任何 MQTT 数据包的时候会主动关闭连接。一些Broker 的实现在 MQTT 协议上做了一些拓展,支持 Client 的连接管理,可以主动地断开和某个 Client 的连接。

Broker 主动关闭连接之前不会向 Client 发送任何 MQTT 数据包,直接关闭底层的 TCP 连接就完事了。

代码实战

建立持久化会话

 var mqtt = require('mqtt')
 var client = mqtt.connect('mqtt://iot.eclipse.org', {
     clientId: "mqtt_sample_id_1",
     clean: false //持久会话
 })
 
 client.on('connect', function (connack) {
     console.log(`return code: ${connack.returnCode}, sessionPresent:${connack.sessionPresent}`)
     client.end()
 })

建立非持久化会话

 var mqtt = require('mqtt')
 var client = mqtt.connect('mqtt://iot.eclipse.org', {
         clientId: "mqtt_sample_id_1",
         clean: true //非持久会话
    })
 ?
 client.on('connect', function (connack) {
     console.log(`return code: ${connack.returnCode}, sessionPresent:${connack.sessionPresent}`)
     client.end()
 })

持久会话和非持久会话的就在clean的值是false还是true。

订阅发布模型

一个典型的 MQTT 消息发送与接收的流程如下:

  1. ClientA 连接到 Broker;
  2. ClientB 连接到 Broker,并订阅主题 Topic1;
  3. ClientA 发送给 Broker 一条消息,主题为 Topic1;
  4. Broker 收到 ClientA 的消息,发现 ClientB 订阅了 Topic1,然后将消息转发到 ClientB;
  5. ClientB 从 Broker 接收到该消息

和传统的队列有点不同,如果 ClientB 在 ClientA 发布消息之后再订阅 Topic1,ClientB 不会收到该条消息。

MQTT 通过订阅与发布模型对消息的发布者和订阅者进行解耦,发布者在发布消息时并不需要订阅方也连接到 Broker,只要订阅方之前订阅过相应主题,那么它在连接到 Broker 之后就可以收到发布方在它离线期间发布的 消息。我们称这种消息为离线消息。

接收离线的消息需要 Client 使用持久化会话,且发布时消息的 QoS 不小于1。

Publisher 和 Subscriber

Publisher 和 Subscriber 是相对于 Topic 来说的身份,如果一个 Client 向某个 Topic 发布消息,那么它就是Publisher;如果一个 Client 订阅了某个 Topic,那么它就是 Subscriber。在上面的例子中,ClientA 是Publisher, ClientB 是 Subscriber。

Sender 和 Receiver

Sender 和 Receiver 是相对于消息传输方向的身份,仍然是上面的例子: 当 ClientA 发布消息时,它发送给 Broker 一条消息,那么 ClientA 是 Sender,Broker 是 Receiver; 当 Broker 转发消息给 ClientB 时,Broker 是 Sender,ClientB 是 Receiver。

Publisher/Subscriber、Sender/Receiver 这两组概念最大的区别就是,Publisher 和 Subscriber 只可能是Client。而 Sender/Receiver 有可能是 Client 和 Broker。解释清楚这两个不同的概念之后,我们接下来看一下 PUBLISH 消息包

发布

PUBLISH 数据包是用于在 Sender 和 Receiver 之间传输消息数据的,也就是说,当 Publisher 要向某个Topic 发布一条消息的时候,Publisher 会向 Broker 发送一个 PUBLISH 数据包;当 Broker 要将一条消息转发给订阅了某条主题的 Subscriber 时,Broker 也会向 Subscriber 发送一条 PUBLISH 数据包。

  • QoS:2bit,0、1 或者 2,代表 PUBLISH 消息的 QoS level
  • 主题名称(Topic Name):主题名称是一个 UTF-8 编码的字符串,用来命名该消息发布到哪一个主题,Topic Name 可以是长度大于等于 1 任何一个字符串(可包含空格),但是在实际项目中,我们最好还是遵循以下一些最优方法 :
  1. 主题名称应该包含层级,不同的层级用 / 划分,比如,2 楼 201 房间的温度感应器可以用这个主
  2. 题:“home/2ndfloor/201/temperature”。
  3. 主题名称开头不要使用 /,例如:“/home/2ndfloor/201/temperature”。
  4. 不要在主题中使用空格。
  5. 只使用 ASCII 字符。
  6. 主题名称在可读的前提下尽量短。
  7. 主题是大小写敏感的,“Home” 和 “home” 是两个不同的主题。
  8. 可以将设备的唯一标识加到主题中,比如:“warehouse/shelf/shelf1_ID/status”。
  9. 主题尽量精确,不要使用泛用的主题,例如在 201 房间有三个传感器,温度、亮度和湿度,那么你应该使用
  10. 三个主题名
  11. 称:“home/2ndfloor/201/temperature”、“home/2ndfloor/201/brightness”和“home/2ndfloor/
  12. 201/humidity”,而不是让三个传感器都使用“home/2ndfloor/201”。
  13. 以 $ 开头的主题属于 Broker 预留的系统主题,通常用于发布 Broker 的内部统计信息,比如
  14. ,应用程序不要使用? 开头的主题收发数据。

订阅

订阅主题的流程如下:

  1. Client 向 Broker 发送一个 SUBSCRIBE 数据包,其中包含了 Client 想要订阅的主题以及其他一些参数;
  2. Broker 收到 SUBSCRIBE 数据包后,向 Client 发送一个 SUBACK 数据包作为应答

Subscribe

  • 订阅列表(List of Subscriptions):SUBSCRIBE 的消息体中包含 Client 想要订阅的主题列表,列表中的每一项由订阅主题名和对应的 QoS 组成。主题名中可以包含通配符,单层通配符“+”和多层通配符“#”。使用包含通配符的主题名可以订阅满足匹配条件的所有主题。为了和 PUBLISH 中的主题区分,我们叫SUBSCRIBE 中的主题名为主题过滤器(Topic Filter)。

单层通配符“+”:就如之前我们讲的,MQTT 的主题是具有层级概念的,不同的层级之间用“/”分割,**“+”可以用来指代任意一个层级。

多层通配符“#”:“#”和“+”的区别在于,“#”可以用来指定任意多个层级,但是“#”必须是 TopicFilter 的最后一个字符,同时它必须跟在“/”后面,除非 Topic Filter 只包含“#”这一个字符

  • SUBSCRIBE 数据包中 QoS 代表针对某一个或者一组主题,Client 希望 Broker 在发送来自这些主题的消息给它时,消息使用的 QoS 级别

SUBACK

返回码(return codes):SUBBACK 数据包包含了一组返回码,返回码的数量和顺序和 SUBSCRIBE 数据包 的订阅列表对应,用于标识订阅类别中的每一个订阅项的订阅结果。

返回码含义0订阅成功, 最大可用QoS为01订阅成功,最大可用QoS为12订阅成功, 最大可用QoS为2128订阅失败

返回码 0~2 代表订阅成功,同时 Broker 授予 Subscriber 不同的 QoS 等级,这个等级可能会和 Subscriber在 SUBSCRIBE 数据包中要求的不一样。

返回码 128 代表订阅失败,比如 Client 没有权限订阅某个主题,或者要求订阅的主题格式不正确等。


订阅:代码实战

如果你建立的是持久会话的连接,那么有可能 Broker 已经保存你在之前的连接时订阅的主题,你就没有必要再发起 SUBSCRIBE 请求 了,这个小优化在网络带宽或者设备处理能力较差的情况尤为重要 .

 var mqtt = require('mqtt')
 var client = mqtt.connect('mqtt://iot.eclipse.org', {
     clientId: "mqtt_sample_subscriber_id_1",
     clean: false
 })
 client.on('connect', function (connack) {
     if(connack.returnCode == 0) {
         if (connack.sessionPresent == false) {
         console.log("subscribing")
         client.subscribe("home/2ndfloor/201/temperature", {qos: 1 }, 
             function (err, granted) {
                 if (err != undefined) {
                     console.log("subscribe failed")
                 } else {
                      console.log(`subscribe succeeded with ${granted[0].topic}, qos:  ${granted[0].qos}`)
                 }
         })
     }
     }else {
         console.log(`Connection failed: ${connack.returnCode}`)
     }
 })
     
 client.on("message", function (_, message, _) {
     var jsonPayload = JSON.parse(message.toString())
     console.log(`current temperature is ${jsonPayload.current}`)
 }


取消订阅

Subcriber 也可以取消对某些主题的订阅,取消订阅的流程如下:

  1. Client 向 Broker 发送一个 UNSUBSCRIBE 数据包,其中包含了 Client 想要取消订阅的主题;
  2. Broker 收到 UNSUBSCRIBE 数据包后,向 Client 发送一个 UNSUBACK 数据包作为应答。

取消订阅代码实践

代码很简单,在建立连接之后取消对之前订阅的主题

 var mqtt = require('mqtt')
 var client = mqtt.connect('mqtt://iot.eclipse.org', {
     clientId: "mqtt_sample_subscriber_id_1",
     clean: false
 })
 client.on('connect', function (connack) {
     if (connack.returnCode == 0) {
         console.log("unsubscribing")
         client.unsubscribe("home/2ndfloor/201/temperature", function (err) {
             if (err != undefined) {
             console.log("unsubscribe failed")
             } else {
             console.log("unsubscribe succeeded")
             }   
             client.end()        
         })
     } else {
         console.log(`Connection failed: ${connack.returnCode}`)
     }
 })

服务质量QoS(Quality of Service)

MQTT设计了一套 保证消息稳定传输的机制,包括消息应答,存储和重传。在这套机制下,提供了三种不同层次QoS:

QoS0,最多一次,至多一次; QoS1,最少一次,至少一次; QoS2,恰好一次,确保只有一次。

QoS是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议: QoS0代表,发送者发送的一条消息,接收者最多能收到一次,也就是说发送者尽力向接收者发送消息,如果发送失败,也就算了; QoS1代表,发送者发送的一条消息,接收者至少能收到一次,也就是说发送者向接收者发送消息,如果发送失败,会继续重试,直到接收者收到消息为止,但是因为重传的原因,Receiver有可能会收到重复的消息; QoS2代表,发送者发送的一条消息,接收者确保能收到而且只收到一次,也就是说发送者尽力向接收者发送 消息,如果发送失败,会继续重试,直到接收者收到消息为止,同时保证会因为消息重传而收到重复的消 息


要注意的是,QoS 是 Sender 和 Receiver 之间达成的协议,不是 Publisher 和 Subscriber 之间达成的协议。也就是说 Publisher 发布一条 QoS1 的消息,只能保证 Broker 能至少收到一次这个消息;至于对应的Subscriber 能否至少收到一次这个消息,还要取决于 Subscriber 在 Subscribe 的时候和 Broker 协商的 QoS等级。

在 MQTT 协议中,从 Broker 到 Subscriber 这段消息传递的实际 QoS 等于:Publisher 发布消息时指定的 QoS 等级和 Subscriber 在订阅时与 Broker 协商的 QoS 等级,这两个QoS 等级中的最小那一个。

公式:

 Actual Subscribe QoS = MIN(Publish QoS, Subscribe QoS)  

保留消息

保留消息让我们来看一下这个场景:

你有一个温度传感器,它每三个小时向一个主题发布当前的温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候能才能收到温度消息?

对的,它必须等到三个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。

怎么来解决这个问题呢?这个时候就轮到保留消息出场解决这个问题了.

保留消息是指在PUBLISH数据包中保留标识设为1(retain=1)的消息,BROKER收到这样的PUBLISH包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker会马上将这个消息发送给订阅者。

保留消息有以下一些特点:

  • 一个主题只能有1条保留消息,发布新的保留消息将覆盖老的保留消息;
  • 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的保留消息;
  • 只有新的订阅者才会收到保留消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到保留消息发送到订阅者时,消息的保留标识仍然是1,订阅者可以判断这个消息是否是保留消息,以做相应的 处理。

注意:保留消息和持久性会话没有任何关系,保留消息是 Broker 为每一个 Topic 单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的。

如果你想删除一个 保留消息也很简单,只要向这个主题发布一个 Payload 长度为 0 的 保留消息就 可以了。

那么开头我们提到的那个场景的解决方案就很简单了,温度传感器每 3 个小时发布当前的温度的保留消息,那么无论新的订阅者什么时候进行订阅,它都能收到温度传感器上一次发布的数据

代码实战

发布保留消息

 var mqtt = require('mqtt')
 var client = mqtt.connect('mqtt://iot.eclipse.org', {
     clientId: "mqtt_sample_publisher_1",
     clean: false //保持持久会话。
 })
 ?
 client.on('connect', function (connack) {
     if(connack.returnCode == 0){
             client.publish("home/2ndfloor/201/temperature", JSON.stringify({current: 25}), {qos: 0,retain: 1},          
             function (err) {
                 if(err == undefined) {
                     console.log("Publish finished")
                     client.end()
                 }else{
                     console.log("Publish failed")
              }
         })
     }else{
         console.log(`Connection failed: ${connack.returnCode}`)
     }
 })

订阅保留消息

 var mqtt = require('mqtt')
 var client = mqtt.connect('mqtt://iot.eclipse.org', {
     clientId: "mqtt_sample_subscriber_id_chapter_8",
     clean: false  //通知Broker保持持久会话
 })
 ?
 client.on('connect', function (connack) {
     if(connack.returnCode == 0) {
         if (connack.sessionPresent == false) {
             console.log("subscribing")
             
             //订阅
             client.subscribe("home/2ndfloor/201/temperature", {qos: 0 }, 
                 function (err, granted) {
                     if (err != undefined) {
                         console.log("subscribe failed")
                     } else {
                         console.log(`subscribe succeeded with ${granted[0].topic}, qos:${granted[0].qos}`)
                     }
              })
         }
         
     }else {
         console.log(`Connection failed: ${connack.returnCode}`)
     }
 })
 ?
 //处理订阅的消息
 client.on("message", function (_, message, packet) {
     var jsonPayload = JSON.parse(message.toString())
     console.log(`retained: ${packet.retain}, temperature: ${jsonPayload.current}`)
 })

遗愿(LWT : Last Will and Testament )

LWT 全称为 Last Will and Testament,也就是连接到 Broker 时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。

当 Broker 检测到 Client 非正常地断开连接的时候,就会向遗愿主题里面发布一条消息。遗愿相关的设置是在建立连接的时候,在 CONNECT 数据包里面指定的。

  • Will Flag:是否使用 LWT
  • Will Topic:遗愿主题名,不可使用通配符
  • Will Qos:发布遗愿消息时使用的 QoS
  • Will Retain:遗愿消息的 Retain 标识
  • Will Message:遗愿消息内容

Broker 在以下情况下认为 Client 是非正常断开连接的:

  1. Broker 检测到底层的 I/O 异常;
  2. Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
  3. Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
  4. Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。

如果 Client 通过发布 DISCONNECT 数据包断开连接,这个属于正常断开连接,不会触发 LWT 的机制,同时,Broker 还会丢弃掉这个 Client 在连接时指定的 LWT 参数。

通常,如果我们关心一个设备,比如传感器的连接状态,可以使用 LWT。在接下来的代码实践里面,我们会使用 LWT 和 Retained 消息来实现对一个 Client 的连接状态监控

代码实战

实现 Client 连接状态监控的原理很简单:

  1. Client 在连接的时候指定 Will Topic 为“client/status”,遗愿消息为“offline”,Will Retain=1;
  2. Client 在连接成功以后向同一个主题“client/status”,发布一个内容为“online”的 Retained 消息

那么订阅者在任何时候订阅“client/status”,都会获取 Client 当前的连接状态。


client.js 代码如下:

 var mqtt = require('mqtt')
 var client = mqtt.connect('mqtt://iot.eclipse.org', {
     clientId: "mqtt_sample_publisher_chapter_8",
     clean: false,
     will:{
         topic : 'client/status',
         qos: 1,
         retain: true,
         payload: JSON.stringify({status: 'offline'})
     }
 })
 client.on('connect', function (connack) {
 if(connack.returnCode == 0){
 client.publish("client/status", JSON.stringify({status: 'online'}), {qos: 1, retain:
 1})
 }else{
 console.log(`Connection failed: ${connack.returnCode}`)
 }
 })

monitor.js 代码如下:

 var mqtt = require('mqtt')
 var client = mqtt.connect('mqtt://iot.eclipse.org', {
     clientId: "mqtt_sample_subscriber_id_chapter_8_2",
     clean: false    
 })
 ?
 client.on('connect', function () {
     client.subscribe("client/status", {qos: 1})
 })
     
 client.on("message", function (_, message) {
     var jsonPayload = JSON.parse(message.toString())
     console.log(`client is ${jsonPayload.status}`)
 })

在monitor.js中,我们每次连接的时候都重新订阅“client / status”,这样的话每次运行都能收到关于Client连接状态的保留消息。

首先运行node client.js,然后运行node monitor.js,会得到以下输出:

 client is online

在运行client.js的终端上,使用Ctrl+C终止client.js,之后在运行monitor.js的终端上会得到以下输出: client is offline

然后重新运行node client.js,在运行monitor.js的终端上会得到以下输出:

 client is online

Ctrl+C终止monitor.js,然后重新运行node monitor.js,会得到以下输出:

 client is online

这样我们就完美地监控了Client的连接状态

保持连接

Broker需要知道客户是否非正常地断开了和它的连接,以发送遗愿消息。

客户也需要能够很快地检测到它失去了和Broker的连接,以便重新连接。MQTT协议是基于TCP的一个应用层协议,理论上TCP协议在丢失连接时会通知上层应用,但是TCP有一个半打开连接的问题(半开连接)。

在半开连接这种状态下,一端的TCP连接已经失效,但是另外一端并不知情,它认为连接依然是打开的,它需要很长的时间才能感知到对端连接已经断开了,这种情况在使用移动或者卫星网络的时候尤为常见。

只是依赖TCP层的连接状态监测是不够的,于是MQTT协议设计了一套Keep Alive机制。在建立连接的时候,我们可以传递一个Keep Alive参数,它的单位为秒,MQTT协议中约定:

在 Keep Alive的时间间隔内,如果Broker没有收到来自客户的任何数据包,那么Broker认为它和客户之间的连接已经断开;同样地, 如果客户没有收到来自Broker的任何数据包,那么Client认为它和Broker之间的连接已经断开。 MQTT 还有一对 PINGREQ/PINGRESP 数据包,当 Broker 和 Client 之间没有任何数据包传输的时候,可以通过 PINGREQ/PINGRESP 来满足 Keep Alive 的约定和侦测连接状态。

PINGREQ

PINGREQ 数据包没有可变头(Variable header)和消息体(Payload),当 Client 在一个 Keep Alive 时间间隔内没有向 Broker 发送任何数据包,比如 PUBLISH 和 SUBSCRIBE 的时候,它应该向 Broker 发送PINGREQ 数据包。

PINGRESP

PINGRESP 数据包没有可变头(Variable header)和消息体(Payload),当 Broker 收到来自 Client 的PINGREQ 数据包,它应该回复 Client 一个 PINGRESP 数据包。

对于 Keep Alive 机制,我们还需要记住以下几点:

  • 如果在一个 Keep Alive 时间间隔内,Client 和 Broker 有过数据包传输,比如 PUBLISH,Client 就没有必要再使用 PINGREQ 了,在网络资源比较紧张的情况下这点很重要;
  • Keep Alive 值是由 Client 指定的,不同的 Client 可以指定不同的值;
  • Keep Alive 的最大值为 18 小时 12 分 15 秒;
  • Keep Alive 值如果设为 0 的话,代表不使用 Keep Alive 机制。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表