网站首页 > 技术文章 正文
简介
MQTT(消息队列遥测传输)英文全称: Message Queue Telemetry Transport
MQTT是一种轻量级的发布-订阅消息传递协议,它可能最适合各种物联网设备。你可以在此处找到有关MQTT的更多信息。
MQTT 协议可以为大量的低功率、工作网络环境不可靠的物联网设备提供通讯保障。而它的应用范围也不仅如此,在移动互联网领域也大有作为:很多 Android App 的推送功能,都是基于 MQTT 实现的,也有一些 IM的实现,是基于 MQTT 的。
特点
简单的来说 MQTT 协议有以下特性:
- 基于 TCP 协议的应用层协议;
- 采用 C/S 架构;
- 使用订阅/发布模式,将消息的发送方和接受方解耦;
- 提供 3 种消息的 QoS(Quality of Service): 至多一次,最少一次,只有一次;
- 收发消息都是异步的,发送方不需要等待接收方应答。
Client和Broker建立连接
CONNECT
连接的建立由 Client 端发起,Client 端首先向 Broker 发送一个 CONNECT 数据包,CONNECT 数据包包含以下内容(这里我们略过 Fixed header)。
可变头(Variable header)
在 CONNECT 数据包可变头中,含有以下信息。
- 协议名称(Protocol Name):值固定为字符 “MQTT”。
- 协议版本(Protocol Level):对 MQTT 3.1.1 来说,值为 4。
- 用户名标识(User Name Flag):消息体中是否有用户名字段,1bit,0 或者 1。
- 密码标识(Password Flag):消息体中是否有密码字段,1bit,0 或者 1。
- 遗愿消息 Retain 标识(Will Retain):标识遗愿消息是否是 Retain 消息,1bit,0 或者 1。
- 遗愿消息 QOS 标识(Will QOS):标识遗愿消息的 QOS,2bit,0、1 或者 2。
- 遗愿标识(Will Flag):标识是否使用遗愿消息,1bit,0 或者 1。
- 会话清除标识(Clean Session):标识 Client 是否建立一个持久化的会话,1bit,0 或者 1,当 CleanSession 的标识设为 0 时,代表 Client 希望建立一个持久会话的连接,Broker 将存储该 Client 订阅的主题和未接受的消息,否则 Broker 不会存储这些数据,同时在建立连接时清除这个 Client 之前存在的持久化会话所保存的数据。
- 连接保护(Keep Alive): 设置一个单位为秒的时间间隔,Client 和 Broker 之间在这个时间间隔之内需要至少有一次消息交互,否则 Client 和 Broker 会认为它们之间的连接已经断开。
消息体(Payload)
CONNECT 数据包的消息体中包含以下数据。
- 客户端标识符(Client Identifier):Client Identifier 是用来标识 Client 身份的字段,在 MQTT 3.1.1 的版本中,这个字段的长度是 1 到 23 个字节,而且只能包含数字和 26 个字母(包括大小写),Broker 通过这个字段来区分不同的 Client。所以在连接的时候,Client 应该保证它的 Identifier 是唯一的,通常我们可以使用比如 UUID,唯一的设备硬件标识,或者 Android 设备的 DEVICE_ID 等作为 Client Identifier 的取值来源。 MQTT 协议中要求 Client 连接时必须带上 Client Identifier,但是也允许 Broker 在实现时 Client Identifier为空,这时 Broker 会为 Client 分配一个内部唯一的 Identifier。如果你需要使用持久化会话,那就必须自己为Client 设定一个唯一的 Identifier。
- 用户名(Username):如果可变头中的用户名标识设为 1,那么消息体中将包含用户名字段,Broker 可以使用用户名和密码来对接入的 Client 进行验证,只允许已授权的 Client 接入。注意不同的 Client 需要使用不同的 Client Identifier,但它们可以使用同样的用户名和密码进行连接。
- 密码(Password):如果可变头中的密码标识设为 1,那么消息体中将包含密码字段。
- 遗愿主题(Will Topic):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿主题,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布遗愿消息。
- 遗愿消息(Will Message):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿消息,当 Client 非正常的中断连接的时候,Broker 将向指定的遗愿主题中发布由该字段指定的内容。
CONNACK
当 Broker 收到 Client 的 CONNECT 数据包之后,将检查并校验 CONNECT 数据包的内容,之后回复 Client一个 CONNACK 数据包。
CONNACK 数据包包含以下内容(这里我们略过 Fixed header)。
可变头(Variable header)
CONNACK 数据包的可变头中,含有以下信息。
- 会话存在标识(Session Present Flag):用于标识在 Broker 上,是否已存在该 Client(用 Client Identifier区分)的持久性会话,1bit,0 或者 1。当 Client 在连接时设置 Clean Session=1,则 CONNACK 中的Session Present Flag 始终为 0;当 Client 在连接时设置 Clean Session=0,那么就有两种情况——如果Broker 上面保存了这个 Client 之前留下的持久性会话,那么 CONNACK 中的 Session Present Flag 值为 1;如果 Broker 没有保存该 Client 的任何会话数据,那么 CONNACK 中的 Session Present Flag 值为 0。Session Present Flag 这个特性是在 MQTT 3.1.1 版本中新加入的,之前的版本中并没有这个标识。
- 连接返回码(Connect Return code):用于标识 Client 是 Broker 的连接是否建立成功,连接返回码有以下一些值:
Return Code连接状态0连接已建立1连接被拒绝,不允许的协议版本2连接被拒绝,Client Identifier 被拒绝3连接被拒绝,服务器不可用4连接被拒绝,错误的用户名或密码5连接被拒绝,未授权
这里重点讲一下 Return Code 4 和 5。
Return Code 4 在 MQTT 协议中的含义是 Username 和 Password 的格式不正确,但是在大部分的 Broker 实现中,在使用错误的用户名密码时,得到的返回码也是 4。所以这里我们认为 4 就是代表错误的用户名或密码。
Return Code 5 一般在 Broker 不使用用户名和密码而使用 IP 地址或者 Client Identifier 进行验证的时候使用,来标识 Client 没有通过验证。
注意: Return Code 2 代表的是 Client Identifier 格式不规范,比如长度超过 23 个字符,包含了不允许的字符等(部分 Broker 的实现在协议标准上做了扩展,比如允许超过 23 个字符的 Client Identifer 等)。
消息体(Payload)
CONNACK 没有消息体。
当 Client 向 Broker 发送 CONNECT 数据包并获得 Return Code 为 0 的 CONNACK 包后,就代表连接建立 成功,可以发布和接受消息了 .
关闭连接
Client主动关闭
向 Broker 发送一个 DISCONNECT 数据包就可以了。 DISCONNECT 数据包没有可变头(Variable header)和消息体(Payload)。在 Client 发送完DISCONNECT 之后,就可以关闭底层的 TCP 连接了,不需要等待 Broker 的回复(Broker 也不会对DISCONNECT 数据包回复)。
为什么需要在关闭 TCP 连接之前,发送一个和 Broker 没有交互的 DISCONNECT数据包,而不是直接关闭底层的 TCP 连接?这涉及到 MQTT 协议的一个特性,Broker 需要判断 Client 是否正常地断开连接:
当 Broker 收到 Client 的 DISCONNECT 数据包的时候,它认为 Client 是正常地断开连接,那么它会丢弃当前连接指定的遗愿消息(Will Message)。如果 Broker 检测到 Client 连接丢失,但又没有收到DISCONNECT 消息包,它会认为 Client 是非正常断开连接,就会向在连接的时候指定的遗愿主题(WillTopic)发布遗愿消息(Will Message)
Broker主动关闭
MQTT 协议规定 Broker 在没有收到 Client 的 DISCONNECT 数据包之前都应该保持和 Client 连接,只有Broker 在 Keep Alive 的时间间隔里,没有收到 Client 的任何 MQTT 数据包的时候会主动关闭连接。一些Broker 的实现在 MQTT 协议上做了一些拓展,支持 Client 的连接管理,可以主动地断开和某个 Client 的连接。
Broker 主动关闭连接之前不会向 Client 发送任何 MQTT 数据包,直接关闭底层的 TCP 连接就完事了。
代码实战
建立持久化会话
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://iot.eclipse.org', {
clientId: "mqtt_sample_id_1",
clean: false //持久会话
})
client.on('connect', function (connack) {
console.log(`return code: ${connack.returnCode}, sessionPresent:${connack.sessionPresent}`)
client.end()
})
建立非持久化会话
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://iot.eclipse.org', {
clientId: "mqtt_sample_id_1",
clean: true //非持久会话
})
?
client.on('connect', function (connack) {
console.log(`return code: ${connack.returnCode}, sessionPresent:${connack.sessionPresent}`)
client.end()
})
持久会话和非持久会话的就在clean的值是false还是true。
订阅发布模型
一个典型的 MQTT 消息发送与接收的流程如下:
- ClientA 连接到 Broker;
- ClientB 连接到 Broker,并订阅主题 Topic1;
- ClientA 发送给 Broker 一条消息,主题为 Topic1;
- Broker 收到 ClientA 的消息,发现 ClientB 订阅了 Topic1,然后将消息转发到 ClientB;
- ClientB 从 Broker 接收到该消息
和传统的队列有点不同,如果 ClientB 在 ClientA 发布消息之后再订阅 Topic1,ClientB 不会收到该条消息。
MQTT 通过订阅与发布模型对消息的发布者和订阅者进行解耦,发布者在发布消息时并不需要订阅方也连接到 Broker,只要订阅方之前订阅过相应主题,那么它在连接到 Broker 之后就可以收到发布方在它离线期间发布的 消息。我们称这种消息为离线消息。
接收离线的消息需要 Client 使用持久化会话,且发布时消息的 QoS 不小于1。
Publisher 和 Subscriber
Publisher 和 Subscriber 是相对于 Topic 来说的身份,如果一个 Client 向某个 Topic 发布消息,那么它就是Publisher;如果一个 Client 订阅了某个 Topic,那么它就是 Subscriber。在上面的例子中,ClientA 是Publisher, ClientB 是 Subscriber。
Sender 和 Receiver
Sender 和 Receiver 是相对于消息传输方向的身份,仍然是上面的例子: 当 ClientA 发布消息时,它发送给 Broker 一条消息,那么 ClientA 是 Sender,Broker 是 Receiver; 当 Broker 转发消息给 ClientB 时,Broker 是 Sender,ClientB 是 Receiver。
Publisher/Subscriber、Sender/Receiver 这两组概念最大的区别就是,Publisher 和 Subscriber 只可能是Client。而 Sender/Receiver 有可能是 Client 和 Broker。解释清楚这两个不同的概念之后,我们接下来看一下 PUBLISH 消息包
发布
PUBLISH 数据包是用于在 Sender 和 Receiver 之间传输消息数据的,也就是说,当 Publisher 要向某个Topic 发布一条消息的时候,Publisher 会向 Broker 发送一个 PUBLISH 数据包;当 Broker 要将一条消息转发给订阅了某条主题的 Subscriber 时,Broker 也会向 Subscriber 发送一条 PUBLISH 数据包。
- QoS:2bit,0、1 或者 2,代表 PUBLISH 消息的 QoS level
- 主题名称(Topic Name):主题名称是一个 UTF-8 编码的字符串,用来命名该消息发布到哪一个主题,Topic Name 可以是长度大于等于 1 任何一个字符串(可包含空格),但是在实际项目中,我们最好还是遵循以下一些最优方法 :
- 主题名称应该包含层级,不同的层级用 / 划分,比如,2 楼 201 房间的温度感应器可以用这个主
- 题:“home/2ndfloor/201/temperature”。
- 主题名称开头不要使用 /,例如:“/home/2ndfloor/201/temperature”。
- 不要在主题中使用空格。
- 只使用 ASCII 字符。
- 主题名称在可读的前提下尽量短。
- 主题是大小写敏感的,“Home” 和 “home” 是两个不同的主题。
- 可以将设备的唯一标识加到主题中,比如:“warehouse/shelf/shelf1_ID/status”。
- 主题尽量精确,不要使用泛用的主题,例如在 201 房间有三个传感器,温度、亮度和湿度,那么你应该使用
- 三个主题名
- 称:“home/2ndfloor/201/temperature”、“home/2ndfloor/201/brightness”和“home/2ndfloor/
- 201/humidity”,而不是让三个传感器都使用“home/2ndfloor/201”。
- 以 $ 开头的主题属于 Broker 预留的系统主题,通常用于发布 Broker 的内部统计信息,比如
- ,应用程序不要使用? 开头的主题收发数据。
订阅
订阅主题的流程如下:
- Client 向 Broker 发送一个 SUBSCRIBE 数据包,其中包含了 Client 想要订阅的主题以及其他一些参数;
- Broker 收到 SUBSCRIBE 数据包后,向 Client 发送一个 SUBACK 数据包作为应答
Subscribe
- 订阅列表(List of Subscriptions):SUBSCRIBE 的消息体中包含 Client 想要订阅的主题列表,列表中的每一项由订阅主题名和对应的 QoS 组成。主题名中可以包含通配符,单层通配符“+”和多层通配符“#”。使用包含通配符的主题名可以订阅满足匹配条件的所有主题。为了和 PUBLISH 中的主题区分,我们叫SUBSCRIBE 中的主题名为主题过滤器(Topic Filter)。
单层通配符“+”:就如之前我们讲的,MQTT 的主题是具有层级概念的,不同的层级之间用“/”分割,**“+”可以用来指代任意一个层级。
多层通配符“#”:“#”和“+”的区别在于,“#”可以用来指定任意多个层级,但是“#”必须是 TopicFilter 的最后一个字符,同时它必须跟在“/”后面,除非 Topic Filter 只包含“#”这一个字符
- SUBSCRIBE 数据包中 QoS 代表针对某一个或者一组主题,Client 希望 Broker 在发送来自这些主题的消息给它时,消息使用的 QoS 级别
SUBACK
返回码(return codes):SUBBACK 数据包包含了一组返回码,返回码的数量和顺序和 SUBSCRIBE 数据包 的订阅列表对应,用于标识订阅类别中的每一个订阅项的订阅结果。
返回码含义0订阅成功, 最大可用QoS为01订阅成功,最大可用QoS为12订阅成功, 最大可用QoS为2128订阅失败
返回码 0~2 代表订阅成功,同时 Broker 授予 Subscriber 不同的 QoS 等级,这个等级可能会和 Subscriber在 SUBSCRIBE 数据包中要求的不一样。
返回码 128 代表订阅失败,比如 Client 没有权限订阅某个主题,或者要求订阅的主题格式不正确等。
订阅:代码实战
如果你建立的是持久会话的连接,那么有可能 Broker 已经保存你在之前的连接时订阅的主题,你就没有必要再发起 SUBSCRIBE 请求 了,这个小优化在网络带宽或者设备处理能力较差的情况尤为重要 .
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://iot.eclipse.org', {
clientId: "mqtt_sample_subscriber_id_1",
clean: false
})
client.on('connect', function (connack) {
if(connack.returnCode == 0) {
if (connack.sessionPresent == false) {
console.log("subscribing")
client.subscribe("home/2ndfloor/201/temperature", {qos: 1 },
function (err, granted) {
if (err != undefined) {
console.log("subscribe failed")
} else {
console.log(`subscribe succeeded with ${granted[0].topic}, qos: ${granted[0].qos}`)
}
})
}
}else {
console.log(`Connection failed: ${connack.returnCode}`)
}
})
client.on("message", function (_, message, _) {
var jsonPayload = JSON.parse(message.toString())
console.log(`current temperature is ${jsonPayload.current}`)
}
取消订阅
Subcriber 也可以取消对某些主题的订阅,取消订阅的流程如下:
- Client 向 Broker 发送一个 UNSUBSCRIBE 数据包,其中包含了 Client 想要取消订阅的主题;
- Broker 收到 UNSUBSCRIBE 数据包后,向 Client 发送一个 UNSUBACK 数据包作为应答。
取消订阅代码实践
代码很简单,在建立连接之后取消对之前订阅的主题
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://iot.eclipse.org', {
clientId: "mqtt_sample_subscriber_id_1",
clean: false
})
client.on('connect', function (connack) {
if (connack.returnCode == 0) {
console.log("unsubscribing")
client.unsubscribe("home/2ndfloor/201/temperature", function (err) {
if (err != undefined) {
console.log("unsubscribe failed")
} else {
console.log("unsubscribe succeeded")
}
client.end()
})
} else {
console.log(`Connection failed: ${connack.returnCode}`)
}
})
服务质量QoS(Quality of Service)
MQTT设计了一套 保证消息稳定传输的机制,包括消息应答,存储和重传。在这套机制下,提供了三种不同层次QoS:
QoS0,最多一次,至多一次; QoS1,最少一次,至少一次; QoS2,恰好一次,确保只有一次。
QoS是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议: QoS0代表,发送者发送的一条消息,接收者最多能收到一次,也就是说发送者尽力向接收者发送消息,如果发送失败,也就算了; QoS1代表,发送者发送的一条消息,接收者至少能收到一次,也就是说发送者向接收者发送消息,如果发送失败,会继续重试,直到接收者收到消息为止,但是因为重传的原因,Receiver有可能会收到重复的消息; QoS2代表,发送者发送的一条消息,接收者确保能收到而且只收到一次,也就是说发送者尽力向接收者发送 消息,如果发送失败,会继续重试,直到接收者收到消息为止,同时保证会因为消息重传而收到重复的消 息
要注意的是,QoS 是 Sender 和 Receiver 之间达成的协议,不是 Publisher 和 Subscriber 之间达成的协议。也就是说 Publisher 发布一条 QoS1 的消息,只能保证 Broker 能至少收到一次这个消息;至于对应的Subscriber 能否至少收到一次这个消息,还要取决于 Subscriber 在 Subscribe 的时候和 Broker 协商的 QoS等级。
在 MQTT 协议中,从 Broker 到 Subscriber 这段消息传递的实际 QoS 等于:Publisher 发布消息时指定的 QoS 等级和 Subscriber 在订阅时与 Broker 协商的 QoS 等级,这两个QoS 等级中的最小那一个。
公式:
Actual Subscribe QoS = MIN(Publish QoS, Subscribe QoS)
保留消息
保留消息让我们来看一下这个场景:
你有一个温度传感器,它每三个小时向一个主题发布当前的温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候能才能收到温度消息?
对的,它必须等到三个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。
怎么来解决这个问题呢?这个时候就轮到保留消息出场解决这个问题了.
保留消息是指在PUBLISH数据包中保留标识设为1(retain=1)的消息,BROKER收到这样的PUBLISH包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker会马上将这个消息发送给订阅者。
保留消息有以下一些特点:
- 一个主题只能有1条保留消息,发布新的保留消息将覆盖老的保留消息;
- 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的保留消息;
- 只有新的订阅者才会收到保留消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到保留消息发送到订阅者时,消息的保留标识仍然是1,订阅者可以判断这个消息是否是保留消息,以做相应的 处理。
注意:保留消息和持久性会话没有任何关系,保留消息是 Broker 为每一个 Topic 单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的。
如果你想删除一个 保留消息也很简单,只要向这个主题发布一个 Payload 长度为 0 的 保留消息就 可以了。
那么开头我们提到的那个场景的解决方案就很简单了,温度传感器每 3 个小时发布当前的温度的保留消息,那么无论新的订阅者什么时候进行订阅,它都能收到温度传感器上一次发布的数据
代码实战
发布保留消息
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://iot.eclipse.org', {
clientId: "mqtt_sample_publisher_1",
clean: false //保持持久会话。
})
?
client.on('connect', function (connack) {
if(connack.returnCode == 0){
client.publish("home/2ndfloor/201/temperature", JSON.stringify({current: 25}), {qos: 0,retain: 1},
function (err) {
if(err == undefined) {
console.log("Publish finished")
client.end()
}else{
console.log("Publish failed")
}
})
}else{
console.log(`Connection failed: ${connack.returnCode}`)
}
})
订阅保留消息
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://iot.eclipse.org', {
clientId: "mqtt_sample_subscriber_id_chapter_8",
clean: false //通知Broker保持持久会话
})
?
client.on('connect', function (connack) {
if(connack.returnCode == 0) {
if (connack.sessionPresent == false) {
console.log("subscribing")
//订阅
client.subscribe("home/2ndfloor/201/temperature", {qos: 0 },
function (err, granted) {
if (err != undefined) {
console.log("subscribe failed")
} else {
console.log(`subscribe succeeded with ${granted[0].topic}, qos:${granted[0].qos}`)
}
})
}
}else {
console.log(`Connection failed: ${connack.returnCode}`)
}
})
?
//处理订阅的消息
client.on("message", function (_, message, packet) {
var jsonPayload = JSON.parse(message.toString())
console.log(`retained: ${packet.retain}, temperature: ${jsonPayload.current}`)
})
遗愿(LWT : Last Will and Testament )
LWT 全称为 Last Will and Testament,也就是连接到 Broker 时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。
当 Broker 检测到 Client 非正常地断开连接的时候,就会向遗愿主题里面发布一条消息。遗愿相关的设置是在建立连接的时候,在 CONNECT 数据包里面指定的。
- Will Flag:是否使用 LWT
- Will Topic:遗愿主题名,不可使用通配符
- Will Qos:发布遗愿消息时使用的 QoS
- Will Retain:遗愿消息的 Retain 标识
- Will Message:遗愿消息内容
Broker 在以下情况下认为 Client 是非正常断开连接的:
- Broker 检测到底层的 I/O 异常;
- Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
- Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
- Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。
如果 Client 通过发布 DISCONNECT 数据包断开连接,这个属于正常断开连接,不会触发 LWT 的机制,同时,Broker 还会丢弃掉这个 Client 在连接时指定的 LWT 参数。
通常,如果我们关心一个设备,比如传感器的连接状态,可以使用 LWT。在接下来的代码实践里面,我们会使用 LWT 和 Retained 消息来实现对一个 Client 的连接状态监控
代码实战
实现 Client 连接状态监控的原理很简单:
- Client 在连接的时候指定 Will Topic 为“client/status”,遗愿消息为“offline”,Will Retain=1;
- Client 在连接成功以后向同一个主题“client/status”,发布一个内容为“online”的 Retained 消息
那么订阅者在任何时候订阅“client/status”,都会获取 Client 当前的连接状态。
client.js 代码如下:
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://iot.eclipse.org', {
clientId: "mqtt_sample_publisher_chapter_8",
clean: false,
will:{
topic : 'client/status',
qos: 1,
retain: true,
payload: JSON.stringify({status: 'offline'})
}
})
client.on('connect', function (connack) {
if(connack.returnCode == 0){
client.publish("client/status", JSON.stringify({status: 'online'}), {qos: 1, retain:
1})
}else{
console.log(`Connection failed: ${connack.returnCode}`)
}
})
monitor.js 代码如下:
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://iot.eclipse.org', {
clientId: "mqtt_sample_subscriber_id_chapter_8_2",
clean: false
})
?
client.on('connect', function () {
client.subscribe("client/status", {qos: 1})
})
client.on("message", function (_, message) {
var jsonPayload = JSON.parse(message.toString())
console.log(`client is ${jsonPayload.status}`)
})
在monitor.js中,我们每次连接的时候都重新订阅“client / status”,这样的话每次运行都能收到关于Client连接状态的保留消息。
首先运行node client.js,然后运行node monitor.js,会得到以下输出:
client is online
在运行client.js的终端上,使用Ctrl+C终止client.js,之后在运行monitor.js的终端上会得到以下输出: client is offline
然后重新运行node client.js,在运行monitor.js的终端上会得到以下输出:
client is online
Ctrl+C终止monitor.js,然后重新运行node monitor.js,会得到以下输出:
client is online
这样我们就完美地监控了Client的连接状态
保持连接
Broker需要知道客户是否非正常地断开了和它的连接,以发送遗愿消息。
客户也需要能够很快地检测到它失去了和Broker的连接,以便重新连接。MQTT协议是基于TCP的一个应用层协议,理论上TCP协议在丢失连接时会通知上层应用,但是TCP有一个半打开连接的问题(半开连接)。
在半开连接这种状态下,一端的TCP连接已经失效,但是另外一端并不知情,它认为连接依然是打开的,它需要很长的时间才能感知到对端连接已经断开了,这种情况在使用移动或者卫星网络的时候尤为常见。
只是依赖TCP层的连接状态监测是不够的,于是MQTT协议设计了一套Keep Alive机制。在建立连接的时候,我们可以传递一个Keep Alive参数,它的单位为秒,MQTT协议中约定:
在 Keep Alive的时间间隔内,如果Broker没有收到来自客户的任何数据包,那么Broker认为它和客户之间的连接已经断开;同样地, 如果客户没有收到来自Broker的任何数据包,那么Client认为它和Broker之间的连接已经断开。 MQTT 还有一对 PINGREQ/PINGRESP 数据包,当 Broker 和 Client 之间没有任何数据包传输的时候,可以通过 PINGREQ/PINGRESP 来满足 Keep Alive 的约定和侦测连接状态。
PINGREQ
PINGREQ 数据包没有可变头(Variable header)和消息体(Payload),当 Client 在一个 Keep Alive 时间间隔内没有向 Broker 发送任何数据包,比如 PUBLISH 和 SUBSCRIBE 的时候,它应该向 Broker 发送PINGREQ 数据包。
PINGRESP
PINGRESP 数据包没有可变头(Variable header)和消息体(Payload),当 Broker 收到来自 Client 的PINGREQ 数据包,它应该回复 Client 一个 PINGRESP 数据包。
对于 Keep Alive 机制,我们还需要记住以下几点:
- 如果在一个 Keep Alive 时间间隔内,Client 和 Broker 有过数据包传输,比如 PUBLISH,Client 就没有必要再使用 PINGREQ 了,在网络资源比较紧张的情况下这点很重要;
- Keep Alive 值是由 Client 指定的,不同的 Client 可以指定不同的值;
- Keep Alive 的最大值为 18 小时 12 分 15 秒;
- Keep Alive 值如果设为 0 的话,代表不使用 Keep Alive 机制。
猜你喜欢
- 2024-10-11 Qt/C++编写物联网组件/支持modbus/rtu/tcp/udp/mqtt/多线程采集
- 2024-10-11 搞定客户端证书错误,看这篇就够了
- 2024-10-11 如何使用C LinkSDK(4.x)快速接入阿里云物联网平台?
- 2024-10-11 XMPP协议、MQTT协议、HTTP协议、CoAP协议的基本比较
- 2024-10-11 如何用工业树莓派和 MQTT 平台打通 OT 和 IT
- 2024-10-11 上手体验物联网MQTT 物联网业务mstp
你 发表评论:
欢迎- 最近发表
-
- 在 Spring Boot 项目中使用 activiti
- 开箱即用-activiti流程引擎(active 流程引擎)
- 在springBoot项目中整合使用activiti
- activiti中的网关是干什么的?(activiti包含网关)
- SpringBoot集成工作流Activiti(完整源码和配套文档)
- Activiti工作流介绍及使用(activiti工作流会签)
- SpringBoot集成工作流Activiti(实际项目演示)
- activiti工作流引擎(activiti工作流引擎怎么用)
- 工作流Activiti初体验及在数据库中生成的表
- Activiti工作流浅析(activiti6.0工作流引擎深度解析)
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)