程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

MQTT系列 | MQTT的连接和断开

发表于 2019-04-20 | 分类于 MQTT | 0 | 阅读次数 4085

1. MQTT的连接过程

Client建立到Broker的连接过程如下图所示:

  1. Client发送CONNECT数据包给Broker
  2. Broker在收到CONNECT数据包之后,给Client返回一个CONNACK数据包

1.1. CONNECT数据包

连接的建立由Client发起,Client端首先向Broker发送一个CONNECT数据包,CONNECT数据包包含以下内容(这里略过固定头的阐述,需要的话可以翻看MQTT基础概念)

1.1.1. 可变头

在CONNECT数据包可变头中,包含以下信息:

  • 协议名称(Protocol Name):值固定为字符 “MQTT”。
  • 协议版本(Protocol Level):对 MQTT 3.1.1 来说,该值为 4。
  • 用户名标识(User Name Flag):消息体中是否有用户名字段,1bit,0 或者 1。
  • 密码标识(Password Flag):消息体中是否有密码字段,1bit,0 或者 1。
  • 遗愿消息Retain标识(Will Retain):标识遗愿消息是否是 Retain 消息,1bit,0 或者 1。
  • 遗愿消息 QOS 标识(Will QOS):标识遗愿消息的 QOS,2bit,0、1 或者 2。
  • 遗愿标识(Will Flag):标识是否使用遗愿消息,1bit,0 或者 1。
  • 会话清除标识(Clean Session):标识Client是否建立一个持久化的会话,1bit,0或者1。当该标识设为0时,代表Client希望建立一个持久会话的连接,Broker将存储该Client订阅的主题和未接受的消息,否则Broker不会存储这些数据,同时在建立连接时清楚这个Client之前存在的持久化会话所保存的数据。
  • 连接保活(Keep Alive):设置一个以秒为单位的时间间隔,Client和Broker之间在这个时间间隔之内需要至少一次消息交互,否则Client和Broker会认为它们之间的连接已经断开。

1.1.2. 消息体

CONNECT数据包的消息体中包含以下数据:

  • 客户端标识符(Client Identifier):Client Identifier 是用来标识 Client 身份的字段,在 MQTT 3.1.1 的版本中,这个字段的长度是 1 到 23 个字节,而且只能包含数字和 26 个字母(包括大小写),Broker 通过这个字段来区分不同的 Client。所以在连接的时候,应该保证 Client Identifier 是唯一的,所以我们可以使用UUID,唯一的设备硬件标识,或者在Android设备中使用的话,可以使用DEVICE_ID等作为Client Identifier的取值来源。

    MQTT协议中要求Client连接时必须带上Client Identifier,但是也允许Broker在Client Identifier为空时,会为Client分配一个内部唯一的Identifier。如果需要持久化会话的话,那必须为Client设定一个唯一的Identifier。

  • 用户名(Username):如果可变头中的用户名标识设为 1,那么消息体中将包含用户名字段,Broker 可以使用用户名和密码来对接入的 Client 进行验证,只允许已授权的 Client 接入。

    注意不同的 Client 需要使用不同的 Client Identifier,但它们可以使用同样的用户名和密码进行连接。

  • 密码(Password):如果可变头中的密码标识设为 1,那么消息体中将包含密码字段。

  • 遗愿主题(Will Topic):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿主题,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布遗愿消息。

  • 遗愿消息(Will Message):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿消息,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布由该字段指定的内容。

1.2. CONNACK数据包

当Broker收到Client的CONNECT数据包之后,将检查并检验CONNECT数据包的内容,之后回复Client一个CONNACK数据包。CONNACK数据包包含以下内容(这里略过固定头的阐述,需要的话可以翻看MQTT基础概念):

1.2.1. 可变头

CONNACK数据包的可变头中,包含以下信息:

  • 会话存在标识(Session Present Flag):用于标识在 Broker 上是否已存在该 Client(用 Client Identifier 区分)的持久性会话,1bit,0 或者 1。当Client在连接时设置Clean Session=1(会话清除标识见CONNECT数据包的可变头),则CONNACK中的Session Present Flag始终为0;当 Client 在连接时设置 Clean Session=0,那么存在下面两种情况

    • 如果Broker上面保留了这个Client之前留下的持久性会话,那么CONNACK中的Session Present Flag值为1;
    • 如果Broker上面没有保存这个Client之前留下的会话数据,那么CONNACK中的Session Present Flag值为0;

    Session Present Flag 这个特性是在MQTT3.1.1版本中新加入的,之前的版本中没有这个标识

  • 连接返回码(Connect Return code):用于标识 Client 是 Broker 的连接是否建立成功,连接返回码有以下一些值:

    Return code连接状态
    0连接已经建立
    1连接被拒绝,不允许的协议版本
    2连接被拒绝,Client Identifier被拒绝
    3连接被拒绝,服务器不可用
    4连接被拒绝,错误的用户名或密码
    5连接被拒绝,未授权

    Return code=2 代表的是 Client Identifier 格式不规范,比如长度超过 23 个字符,包含了不允许的字符等(部分 Broker 的实现在协议标准上做了扩展,比如允许超过 23 个字符的 Client Identifer 等)

    Return code=4在MQTT协议中的含义是Username和Password的格式不正确,但是在大部分的Broker实现中,使用错误的用户名和密码时返回的也是4,所以可以认为4表示就是错误的用户名或者密码;

    Return code=5一般表示Broker不使用用户名和密码验证而是采用IP地址或者Client Identifier进行认证的时候使用,来标识Client没有通过验证。

1.2.2. 消息体

CONNACK没有消息体。

综上所述当Client向Broker发送了CONNECT数据包并获得了Return code为0的CONNACK数据包后,则代表连接建立成功了。之后则可以进行消息的发布和订阅了。

2. MQTT断开过程

MQTT的断开过程分为以下两种:

  • 一种是Client主动关闭连接
  • 一种是Broker主动关闭连接

2.1. Client主动关闭连接

Client 主动关闭连接的流程非常简单,**只需要Client向 Broker 发送一个 DISCONNECT 数据包就可以了。**DISCONNECT 数据包没有可变头(Variable header)和消息体(Payload)。在 Client 发送完 DISCONNECT 之后,就可以关闭底层的 TCP 连接了,不需要等待 Broker 的回复(Broker 也不会对 DISCONNECT 数据包回复)。

为什么Client关闭TCP连接之前,要发送一个和Broker没有交互的数据包,而不是关闭底层的TCP连接的?

因为这涉及到MQTT协议的一个特性,在MQTT协议中,Broker需要判断Client是否是正常的断开连接。当Broker收到Client的DISCONNECT数据包的时候,Broker则认为Client是正常断开连接的,那么会丢弃当前连接指定的遗愿消息。如果Broker检测到Client连接丢失,但是又没有收到DISCONNECT数据包,则认为Client是非正常断开的,就会向在连接的时候指定的遗愿主题发布遗愿消息。

2.2. Broker主动关闭连接

MQTT协议规定Broker在没有收到Client的DISCONNECT数据包之前都应该和Client保持连接。只有当Broker 在Keep Alive的时间间隔内,没有收到Client的任何MQTT数据包的时候会主动关闭连接。一些Broker的实现在MQTT协议上做了一些拓展,支持Client的连接管理,可以主动和某个Client断开连接。

Broker主动关闭连接之前不会向Client发送任何MQTT数据包,而是直接关闭底层的TCP连接。

3. 代码实践

下面使用Python的paho mqtt库来实现MQTT的连接,Broker的话使用自己搭建的mosquitto。

3.1. 建立持久会话的连接

import paho.mqtt.client as mqtt

'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
    print("return code:", rc)
    print("session present:", flags['session present'])


# 通过client_id指定Client Identifier
# 设置clean_session为False表示要建立一个持久性会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=False)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行Python代码之后,输出结果如下所示:

return code: 0
session present: 0

return code为0表示连接已成功建立,因为demo_mqtt的Client第一次建立,所以SessionPresent为0。再次运行,输出结果变成如下:

return code: 0
session present: 1

3.2. 建立非持久会话的连接

相比建立持久会话的连接的代码,只需要将clean_session指定为了True即可建立一个非持久会话的连接了。

import paho.mqtt.client as mqtt

'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
    print("return code:", rc)
    print("session present:", flags['session present'])


# 通过client_id指定Client Identifier
# 设置clean_session为True表示要建立一个非持久性的会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=True)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行该代码,输出结果为:

return code: 0
session present: 0

并且无论运行多少次,SessionPresent都是为0

paho mqtt的Python版本,默认clean_session为True

3.3. 使用相同的 Client Identifier 进行连接

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("return code:", rc)
    print("session present:", flags['session present'])

mqtt_client = mqtt.Client(client_id="demo_mqtt")
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

分别在两个终端中运行上述同样的代码,那么两个终端中会不停打印如下内容:

return code: 0
session present: 0
return code: 0
session present: 0
......

因为当两个Client中使用同样的Client Identifie进行连接时,那么第二个Client连接成功后,Broker会关闭和第一个已经连接上的 Client 连接。然而因为我们使用了loop_forever()函数,这个函数会一直阻塞,直到Client调用disconnect(),并且这个函数会在断开后自动重连。所以当连接被 Broker 关闭时,它又会尝试重新连接,结果就是这两个 Client 交替地把对方顶下线,因此在使用中我们需要保证每一个设备使用的 Client Identifier 是唯一的。

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/87
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# MQTT
Python第三方库matplotlib的入门使用
机器学习局 | 梯度下降算法详解
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%