MQTT系列 | MQTT的连接和断开

1. MQTT的连接过程

Client建立到Broker的连接过程如下图所示:

  1. Client发送CONNECT数据包给Broker
  2. Broker在收到CONNECT数据包之后,给Client返回一个CONNACK数据包

1.1. CONNECT数据包

连接的建立由Client发起,Client端首先向Broker发送一个CONNECT数据包,CONNECT数据包包含以下内容(这里略过固定头的阐述,需要的话可以翻看MQTT基础概念)

1.1.1. 可变头

在CONNECT数据包可变头中,包含以下信息:

  • 协议名称(Protocol Name):值固定为字符 “MQTT”。
  • 协议版本(Protocol Level):对 MQTT 3.1.1 来说,该值为 4。
  • 用户名标识(User Name Flag):消息体中是否有用户名字段,1bit,0 或者 1。
  • 密码标识(Password Flag):消息体中是否有密码字段,1bit,0 或者 1。
  • 遗愿消息Retain标识(Will Retain):标识遗愿消息是否是 Retain 消息,1bit,0 或者 1。
  • 遗愿消息 QOS 标识(Will QOS):标识遗愿消息的 QOS,2bit,0、1 或者 2。
  • 遗愿标识(Will Flag):标识是否使用遗愿消息,1bit,0 或者 1。
  • 会话清除标识(Clean Session):标识Client是否建立一个持久化的会话,1bit,0或者1。当该标识设为0时,代表Client希望建立一个持久会话的连接,Broker将存储该Client订阅的主题和未接受的消息,否则Broker不会存储这些数据,同时在建立连接时清楚这个Client之前存在的持久化会话所保存的数据。
  • 连接保活(Keep Alive):设置一个以秒为单位的时间间隔,Client和Broker之间在这个时间间隔之内需要至少一次消息交互,否则Client和Broker会认为它们之间的连接已经断开。

1.1.2. 消息体

CONNECT数据包的消息体中包含以下数据:

  • 客户端标识符(Client Identifier):Client Identifier 是用来标识 Client 身份的字段,在 MQTT 3.1.1 的版本中,这个字段的长度是 1 到 23 个字节,而且只能包含数字和 26 个字母(包括大小写),Broker 通过这个字段来区分不同的 Client。所以在连接的时候,应该保证 Client Identifier 是唯一的,所以我们可以使用UUID,唯一的设备硬件标识,或者在Android设备中使用的话,可以使用DEVICE_ID等作为Client Identifier的取值来源。

    MQTT协议中要求Client连接时必须带上Client Identifier,但是也允许Broker在Client Identifier为空时,会为Client分配一个内部唯一的Identifier。如果需要持久化会话的话,那必须为Client设定一个唯一的Identifier。

  • 用户名(Username):如果可变头中的用户名标识设为 1,那么消息体中将包含用户名字段,Broker 可以使用用户名和密码来对接入的 Client 进行验证,只允许已授权的 Client 接入。

    注意不同的 Client 需要使用不同的 Client Identifier,但它们可以使用同样的用户名和密码进行连接。

  • 密码(Password):如果可变头中的密码标识设为 1,那么消息体中将包含密码字段。

  • 遗愿主题(Will Topic):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿主题,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布遗愿消息。

  • 遗愿消息(Will Message):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿消息,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布由该字段指定的内容。

1.2. CONNACK数据包

当Broker收到Client的CONNECT数据包之后,将检查并检验CONNECT数据包的内容,之后回复Client一个CONNACK数据包。CONNACK数据包包含以下内容(这里略过固定头的阐述,需要的话可以翻看MQTT基础概念):

1.2.1. 可变头

CONNACK数据包的可变头中,包含以下信息:

  • 会话存在标识(Session Present Flag):用于标识在 Broker 上是否已存在该 Client(用 Client Identifier 区分)的持久性会话,1bit,0 或者 1。当Client在连接时设置Clean Session=1(会话清除标识见CONNECT数据包的可变头),则CONNACK中的Session Present Flag始终为0;当 Client 在连接时设置 Clean Session=0,那么存在下面两种情况

    • 如果Broker上面保留了这个Client之前留下的持久性会话,那么CONNACK中的Session Present Flag值为1;
    • 如果Broker上面没有保存这个Client之前留下的会话数据,那么CONNACK中的Session Present Flag值为0;

    Session Present Flag 这个特性是在MQTT3.1.1版本中新加入的,之前的版本中没有这个标识

  • 连接返回码(Connect Return code):用于标识 Client 是 Broker 的连接是否建立成功,连接返回码有以下一些值:

    | Return code | 连接状态 |
    | :————-: | :————————————————-: |
    | 0 | 连接已经建立 |
    | 1 | 连接被拒绝,不允许的协议版本 |
    | 2 | 连接被拒绝,Client Identifier被拒绝 |
    | 3 | 连接被拒绝,服务器不可用 |
    | 4 | 连接被拒绝,错误的用户名或密码 |
    | 5 | 连接被拒绝,未授权 |

    Return code=2 代表的是 Client Identifier 格式不规范,比如长度超过 23 个字符,包含了不允许的字符等(部分 Broker 的实现在协议标准上做了扩展,比如允许超过 23 个字符的 Client Identifer 等)

    Return code=4在MQTT协议中的含义是Username和Password的格式不正确,但是在大部分的Broker实现中,使用错误的用户名和密码时返回的也是4,所以可以认为4表示就是错误的用户名或者密码;

    Return code=5一般表示Broker不使用用户名和密码验证而是采用IP地址或者Client Identifier进行认证的时候使用,来标识Client没有通过验证。

1.2.2. 消息体

CONNACK没有消息体。

综上所述当Client向Broker发送了CONNECT数据包并获得了Return code为0的CONNACK数据包后,则代表连接建立成功了。之后则可以进行消息的发布和订阅了。

2. MQTT断开过程

MQTT的断开过程分为以下两种:

  • 一种是Client主动关闭连接
  • 一种是Broker主动关闭连接

2.1. Client主动关闭连接

Client 主动关闭连接的流程非常简单,只需要Client向 Broker 发送一个 DISCONNECT 数据包就可以了。DISCONNECT 数据包没有可变头(Variable header)和消息体(Payload)。在 Client 发送完 DISCONNECT 之后,就可以关闭底层的 TCP 连接了,不需要等待 Broker 的回复(Broker 也不会对 DISCONNECT 数据包回复)。

为什么Client关闭TCP连接之前,要发送一个和Broker没有交互的数据包,而不是关闭底层的TCP连接的?

因为这涉及到MQTT协议的一个特性,在MQTT协议中,Broker需要判断Client是否是正常的断开连接。当Broker收到Client的DISCONNECT数据包的时候,Broker则认为Client是正常断开连接的,那么会丢弃当前连接指定的遗愿消息。如果Broker检测到Client连接丢失,但是又没有收到DISCONNECT数据包,则认为Client是非正常断开的,就会向在连接的时候指定的遗愿主题发布遗愿消息。

2.2. Broker主动关闭连接

MQTT协议规定Broker在没有收到Client的DISCONNECT数据包之前都应该和Client保持连接。只有当Broker 在Keep Alive的时间间隔内,没有收到Client的任何MQTT数据包的时候会主动关闭连接。一些Broker的实现在MQTT协议上做了一些拓展,支持Client的连接管理,可以主动和某个Client断开连接。

Broker主动关闭连接之前不会向Client发送任何MQTT数据包,而是直接关闭底层的TCP连接。

3. 代码实践

下面使用Python的paho mqtt库来实现MQTT的连接,Broker的话使用自己搭建的mosquitto。

3.1. 建立持久会话的连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import paho.mqtt.client as mqtt

'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])


# 通过client_id指定Client Identifier
# 设置clean_session为False表示要建立一个持久性会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=False)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行Python代码之后,输出结果如下所示:

1
2
return code: 0
session present: 0

return code为0表示连接已成功建立,因为demo_mqtt的Client第一次建立,所以SessionPresent为0。再次运行,输出结果变成如下:

1
2
return code: 0
session present: 1

3.2. 建立非持久会话的连接

相比建立持久会话的连接的代码,只需要将clean_session指定为了True即可建立一个非持久会话的连接了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import paho.mqtt.client as mqtt

'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])


# 通过client_id指定Client Identifier
# 设置clean_session为True表示要建立一个非持久性的会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=True)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行该代码,输出结果为:

1
2
return code: 0
session present: 0

并且无论运行多少次,SessionPresent都是为0

paho mqtt的Python版本,默认clean_session为True

3.3. 使用相同的 Client Identifier 进行连接

1
2
3
4
5
6
7
8
9
10
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])

mqtt_client = mqtt.Client(client_id="demo_mqtt")
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

分别在两个终端中运行上述同样的代码,那么两个终端中会不停打印如下内容:

1
2
3
4
5
return code: 0
session present: 0
return code: 0
session present: 0
......

因为当两个Client中使用同样的Client Identifie进行连接时,那么第二个Client连接成功后,Broker会关闭和第一个已经连接上的 Client 连接。然而因为我们使用了loop_forever()函数,这个函数会一直阻塞,直到Client调用disconnect(),并且这个函数会在断开后自动重连。所以当连接被 Broker 关闭时,它又会尝试重新连接,结果就是这两个 Client 交替地把对方顶下线,因此在使用中我们需要保证每一个设备使用的 Client Identifier 是唯一的。

程序锅 wechat
欢迎关注微信公众号【一口程序锅】,不定期的技术分享、资源分享。
让我多买本书学习学习
0%