程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

MQTT系列 | MQTT消息的发布和订阅

发表于 2019-04-20 | 分类于 MQTT | 0 | 阅读次数 2787

1. MQTT的发布

MQTT发布中最重要的是PUBLISH数据包,PUBLISH数据包是用于sender和receiver之间传输消息数据的。当Publisher要向某个Topic发布一条消息的时候,Publisher会向Broker发送一个PUBLISH数据包;当Broker要将一条消息转发给订阅了某条主题的Subscriber时,Broker也会向该Subscriber发送一个PUBLISH数据包。因为PUBLISH传输过程中涉及到了QoS,Recevier收到sender的PUBLISH数据包之后会根据QoS的不同,还有后续不同的应答流程(只有当QoS为0时,Receiver不做任何应答),所以关于这个具体的流程,在QoS那一章节进行讲述。下面对PUBLISH数据包进行讲解:

1.1. PUBLISH数据包

1.1.1. 固定头

PUBLISH的固定头包含了一下内容:

  • 消息重复标识(DUP flag):1bit,0 或者 1,当 DUP flag = 1 的时候,代表该消息是一条重发消息,因 Receiver 没有确认收到之前的消息而重新发送的。这个标识只在 QoS 大于 0 的消息中使用。
  • QoS:2bit,0、1 或者 2,代表 PUBLISH 消息的 QoS level。
  • Retain 标识(Retain flag):1bit,0 或者 1。在从 Client 发送到 Broker 的 PUBLISH 消息中被设为 1 的时候,Broker 应该保存该条消息,当之后有任何新的 Subscriber 订阅 PUBLISH 消息中指定的主题时,都会先收到该条消息,这种消息也叫 Retained 消息。在从 Broker 发送到 Client 的 PUBLISH 消息中被设为 1 的时候,代表该条消息是一条 Retained 消息。

1.1.2. 可变头

  • 数据包标识( Packet Identifier):2字节,用来标识一个唯一数据包。数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互(比如发送、应答为一次交互)中保持唯一就好,只在QoS大于1的消息中使用,因为QoS大于1的消息有应答流程。
  • 主题名称(Topic Name):主题名称是一个 UTF-8 编码的字符串,用来命名该消息发布到哪一个主题,Topic Name 可以是长度大于等于 1 任何一个字符串(可包含空格)。但是在实际项目中,我们最好还是遵循以下一些最优方法。
    • 主题名称应该包含层级,不同的层级用 / 划分。
    • 主题名称开头不要使用/
    • 不要在主题中使用空格
    • 只使用ASCII字符
    • 主题名称在可读的前提下尽量短
    • 主题是大小写敏感的,“data”和“Data”是两个不同的主题
    • 以$为开头的主题属于Broker预留的系统主题,通常用于发布Broker的内部统计信息,所以在自己定义时不要使用$开头的主题手法数据。

1.1.3. 消息体

PUBLISH数据包的消息体中包含的是该消息要发送的具体数据,数据可以是任何格式的:二进制数据、文本、JSON等都可以。

2. MQTT的订阅

订阅主题的流程如下图所示:

  1. Client向Broker发送一个SUBSCRIBE数据包,该数据包中含有Client想要订阅的主题和其他一些参数;
  2. Broker收到SUBSCRIBE数据包后,向Client发送一个SUBACK数据包作为应答。

2.1. SUBSCRIBE数据包

2.1.1. 可变头

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

2.1.2. 消息体

  • 订阅列表(List of Subscriptions):SUBSCRIBE 的消息体中包含 Client 想要订阅的主题列表,列表中的每一项由订阅主题名和对应的 QoS 组成。

    主题名说明

    主题名中可以包含通配符,单层通配符“+”和多层通配符“#”。使用包含通配符的主题名可以订阅满足匹配条件的所有主题。为了和 PUBLISH 中的主题区分,我们叫 SUBSCRIBE 中的主题名为主题过滤器(Topic Filter)。

    • 单层通配符“+”:“+”可以用来指代任意一个层级。

      举例:

      如“sensor/+/tem”,可以匹配:

      • sensor/data/tem
      • sensor/cmd/tem

      不可以匹配:

      • sensor/data/01/tem
    • 多层通配符“#”:“#”和“+”的区别在于,“#”可以用来指代任意多个层。**但是"#"必须是Topic Filter的最后一个字符,同时必须跟在“/“后面,除非Topic Filter只包含一个”#“这一个字符。**如“#”是一个合法的Topic Filter,而“sensor#”不是一个合法的Topic Filter。

      举例:

      如“sensor/data/#”,可匹配:

      • sensor/data
      • sensor/data/tem
      • sensor/data/tem/01
      • sensor/data/tem/01/02

      不可以匹配:

      • sensor/cmd/tem

    QoS说明

    SUBSCRIBE数据包中QoS代表针对某一个或着某一组的主题,Client希望Broker在转发来自这些主题的消息给它时,消息使用的QoS级别。

2.2. SUBACK数据包

为确认每一次的订阅,Broker收到SUBSCRIBE之后会回复一个SUBACK数据包作为应答。SUBACK数据包包含以下内容:

2.2.1. 可变头

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

2.2.2. 消息体

返回码(return codes):SUBBACK 数据包包含了一组返回码,返回码的数量和顺序和 SUBSCRIBE 数据包的订阅列表对应,用于标识订阅类别中的每一个订阅项的订阅结果。

返回码含义
0订阅成功, 最大可用QoS为0
1订阅成功,最大可用QoS为1
2订阅成功, 最大可用QoS为2
128订阅失败

返回码0~2表示订阅成功,并且Broker授予Subscriber不同等级的QoS,这个等级可能会和SubScriber在SUBSCRIBE数据包中要求的不一样。128表示订阅失败,可能是没有权限订阅这个主题,或者订阅主题的格式不对。

QoS返回和订阅时不一样的原因可以参见QoS中的QoS降级的相关知识。

3. MQTT的取消订阅

subscriber也可以取消对某些主题的订阅,取消订阅的流程如下图所示:

  1. subscriber向Broker发送一个UNSUBSCRIBE数据包,该数据包包含想要取消订阅的主题;
  2. Broker收到UNSUBSCRIBE数据包之后,向subscriber发送一个UNSUBACK数据包作为应答。

3.1. UNSUBSCRIBE数据包

3.1.1. 可变头

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

3.1.2. 消息体

  • 主题列表(List of Topics):UNSUBSCRIBE 的消息体中包含 Client 想要取消订阅的主题过滤器列表,这些主题过滤器和 SUBSCRIBE 数据包中一样,可以包含通配符。UNSUBSCRIBE 消息体里面不再包含主题过滤器对应的 QoS 了。

3.2. UNSUBACK数据包

Broker收到UNSUBSCRIBE数据包之后会回复一个UNSUBACK数据包作为应答。UNSUBACK数据包内容如下:

3.2.1. 可变头

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

3.2.2. 消息体

UNSUBACK 数据包没有消息体。

4. 代码实践

4.1. 发布消息

向一个主题发布一条QoS为1的数据包,发送成功之后断开连接:

import paho.mqtt.client as mqtt

def on_publish(client, userdata, mid):
    print("message ID ", mid)
    client.disconnect()

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.publish("test", payload="hello world", qos=1)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_pub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行上述代码,输出如下:

message ID  1

相应订阅了test主题的订阅方输出如下:

4.2. 订阅消息

通常我们在建立和 Broker 的连接之后就可以开始订阅了,如果你建立的是持久会话的连接,那么有可能 Broker 已经保存你在之前的连接时订阅的主题,你就没有必要再发起 SUBSCRIBE 请求了,这个小优化在网络带宽或者设备处理能力较差的情况尤为重要。相应的代码如下:

import paho.mqtt.client as mqtt

'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):
    print("granted_qos:", granted_qos)

'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):
    print(message.topic, message.payload)

def on_connect(client, userdata, flags, rc):
    if rc == 0 :
        if flags["session present"] == 0:
            print("subscribing")
            client.subscribe("test", 1)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行上述代码得到如下输出结果:

subscribing
granted_qos: (1,)

当运行的发布消息中的代码之后,将输出:

test b'hello world'

当终止掉订阅消息中的代码运行之后,再次运行该代码,会发现什么都不输出。因为第一次运行的时候,Broker 上面没有保存这个 Client 的会话,所以需要进行订阅,当重新运行之后,因为 Broker 上面已经保存了这个 Client 的会话,所以就不需要再订阅了,你就不会看到订阅相关的输出了。

4.3. 取消订阅

在上述订阅消息中建立连接并订阅了相应主题的基础上,我们取消对之前订阅的主题

import paho.mqtt.client as mqtt

'''
当代理响应取消订阅请求时调用
'''
def on_unsubscribe(client, userdata, mid):
    print("message id:", mid)
    client.disconnect()

def on_connect(client, userdata, flags, rc):
    if rc == 0 :
        print("unsubscribing")
        client.unsubscribe("test")
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_unsubscribe = on_unsubscribe

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

相应的输出如下:

unsubscribing
message id: 1

之后再运行订阅消息中的代码和发布消息中的代码,此时运行订阅消息的终端不再有输出。

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/85
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# MQTT
Python第三方库matplotlib的入门使用
机器学习局 | 梯度下降算法详解
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%