1. Retained消息
Retained 消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker 会马上将这个消息发送给订阅者。有以下这些特点:
-
一个Topic只能有一条Retained消息,发布新的Retained 消息将覆盖老的 Retained 消息(所以想删除一个 Retained 消息也很简单,只要向这个主题发布一个 Payload 长度为 0 的 Retained 消息就可以了);
-
如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的 Retained 消息;
-
只有新的订阅者才会收到 Retained 消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到 Retained 消息;
-
Broker 收到 Retained 消息后,会单独保存一份,再向当前的订阅者发送一份普通的消息(Retained 标识为 0)。当有新订阅者的时候, Broker 会把保存的这条消息发给新订阅者(Retained 标识为 1)。
Retained消息和持久性会话的区别:
Retained消息是Broker为每一个Topic单独存储的;
持久性会话是Broker为每一个Client单独存储的
1.1. 代码实践
下面是publisher的代码,在发送消息时指定retain为true
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.publish("test", payload="hello world", qos=0, retain=True)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
下面是subscriber的代码
import paho.mqtt.client as mqtt
'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):
print("granted_qos:", granted_qos)
'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):
print("message retain", message.retain)
print("message topic", message.topic)
print("message payload", message.payload)
def on_connect(client, userdata, flags, rc):
if rc == 0 :
print("subscribing")
client.subscribe("test", 0)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
在指定retain
为True
的情况下,先运行publisher的代码,之后再运行subscriber的代码,在subscriber运行的终端界面输出如下信息:
subscribing
granted_qos: (0,)
message retain 1
message topic test
message payload b'hello world'
输出的信息中message retain
的值为1,表示收到的消息为retained消息。
当再次运行publisher的代码,运行subscriber的控制台会输出如下内容:
message retain 0
message topic test
message payload b'hello world'
上述的输出结果同Retained消息特点中的第四点“Broker 收到 Retained 消息后,会单独保存一份,再向当前的订阅者发送一份普通的消息”所述一致,因为当前订阅者已经订阅了相应的话题,当Broker收到Retained消息之后,先保存下来,然而因为这个消息对于当前已经订阅了相应话题的订阅者来说是一个普通的消息所以message retain
的值为0。
2. LWT(Last Will and Testament)
LWT是之前讲过的Client连接到Broker时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。当Broker检测到Client非正常地断开连接的时候,就会向遗愿主题发布一条消息。遗愿相关的设置是在建立连接的时候,在CONNECT数据包里面指定的。包括以下这些设置:
- Will Flag:是否使用LWT
- Will QoS:发布遗愿消息时使用的QoS
- Will Retain:遗愿消息的Retain标识
- Will Topic:遗愿主题名,不可使用通配符
- Will Message:遗愿消息内容
Broker 在以下情况下认为 Client 是非正常断开连接的:
- Broker 检测到底层的 I/O 异常;
- Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
- Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
- Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。
如果Client通过发布DISCONNECT数据包断开连接,是属于正常断开连接,不会触发LWT的机制,同时Broker会丢掉这个Client在连接时指定的LWT参数。
2.1. 代码实践:监控Client的状态
Client在连接的时候,指定Will Topic为will_test,Will Message为"client is offline",并设置该消息的QoS为1,retain也置为True(设置为True表示会被Broker保留,同Retained消息)。同时在连接成功之后,向主题will_test发布一个内容为"client is online"的Retained消息。这样订阅者,无论在任何时候订阅"will_test",都会获取Client当前的连接状态。client_will.py代码如下:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.publish("will_test", payload="client is online", qos=1, retain=True)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
mqtt_client.on_connect = on_connect
mqtt_client.will_set("will_test", payload="client is offline", qos=1, retain=True)
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
而负责监控的代码,则订阅will_test,订阅的QoS为1,client_monitor_will.py代码如下:
import paho.mqtt.client as mqtt
def on_message(client, userdata, message):
print("message retain", message.retain)
print("message payload", message.payload)
def on_connect(client, userdata, flags, rc):
if rc == 0 :
client.subscribe("will_test", 1)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
首先运行client_will.py,之后再运行client_monitor_will.py,终端输出如下信息:
message retain 1
message payload b'client is online'
因为client_will.py运行之后,发布了一个Retained消息,当运行client_monitor_will.py之后,因为订阅了相应的话题,所以会收到该消息。这时候终止掉client_will.py的运行,输出如下信息:
message retain 0
message payload b'client is offline'
因为在终止的时候已经订阅了相应的话题,所以当终止之后,虽然遗愿消息中的retain被设为1了,但是对当前的订阅者来说是普通消息,所以message retain为0。当这个时候终止掉client_monitor_will.py的运行,再次重新运行client_monitor_will.py,输出如下信息:
message retain 1
message payload b'client is offline'
因为终止掉client_will.py的时候,发送的遗愿消息的retain被设为了1,Broker会保证发送的遗愿消息,当新的订阅者出现的时候,会把这个Retained消息发送给订阅者。
3. Keep Alive(连接保活)
Broker需要知道Client是否正常地断开了和它的连接,以发送遗愿消息。实际上Client也需要能够很快地检测它失去了和Broker的连接,以便重新连接,虽然TCP 协议在丢失连接时会通知上层应用,但是 TCP 有一个半打开连接的问题(half-open connection),在这种状态下,一端的 TCP 连接已经失效,但是另外一端并不知情,它认为连接依然是打开的,它需要很长的时间才能感知到对端连接已经断开了,这种情况在使用移动或者卫星网络的时候尤为常见。所以仅仅依赖TCP的连接状态检测是不够的,于是MQTT协议设计了一套Keep Alive机制。
MQTT 协议是基于 TCP 的一个应用层协议
在建立连接的时候,我们可以传递一个Keep Alive参数,它的单位为秒,MQTT协议中规定:**在1.5倍的Keep Alive(1.5*Keep Alive)的时间间隔内,如果Broker没有收到来自Client的任何数据包,那么Broker认为它和Client之间的连接已经断开;同样如果Client没有收到来自Broker的任何数据包,那么Client认为它和Broker之间的连接已经断开。**在Broker和Client之间没有任何数据包传输的时候,MQTT中通过PINGREQ/PINGRESP来满足Keep Alive的约定和侦测连接状态。
- PINGREQ
PINGREQ数据包中没有可变头和消息体,当Client在一个Keep Alive时间间隔内没有向Broker发送任何数据包,比如PUBLISH和SUBSCRIBE的时候,它应该向Broker发送PINGREQ数据包。
- PINGRESP
PINGRESP数据包中没有可变头和消息体,当Broker收到来自Client的PINGREQ数据包之后,它会回复Client一个PINGRESP数据包。
对于Keep ALive 机制,还需要注意以下几点:
- 如果在一个Keep Alive时间间隔内,Client和Broker有过数据包传输,比如PUBLISH数据包,Client就没有必要再使用PINGREQ了;
- Keep Alive值是由Client指定,不同的Client可以指定不同的值;
- Keep Alive的最大值为18小时12分15秒即65535秒;
- Keep Alive的值设为0的话,代表不使用Keep Alive机制
3.1. 实验实践
启动mosquitto Broker之后,我们通过mosquitto_sub
订阅了一个hello
的话题,订阅完之后。subscriber和Broker之间再也没有任何数据包传输,但是通过运行mosquitto的控制台可以看到,他们之间有PINGREQ和PINGRESP数据包的传输。