程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

MQTT系列 | QoS介绍

发表于 2019-04-20 | 分类于 MQTT | 0 | 阅读次数 1840

1. MQTT中的QoS等级

MQTT设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,提供了三种不同层次QoS(Quality of Service):

  • QoS0,At most once,至多一次;
  • QoS1,At least once,至少一次;
  • QoS2,Exactly once,确保只有一次。

QoS 是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议:

  • QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
  • QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
  • QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。

注意:

QoS是Sender和Receiver之间的协议,而不是Publisher和Subscriber之间的协议。换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscibe的时候和Broker协商的QoS等级。

1.1. QoS0

QoS0等级下,Sender和Receiver之间一次消息的传递流程如下:

Sender向Receiver发送一个包含消息数据的PUBLISH包,然后不管结果如何,丢掉已发送的PUBLISH包,一条消息的发送完成。

1.2. QoS1

QoS1要保证消息至少到达一次,所以有一个应答的机制。Sender和Receiver的一次消息的传递流程如下:

  1. Sender向Receiver发送一个带有数据的PUBLISH包,并在本地保存这个PUBLISH包;
  2. Receiver收到PUBLISH包以后,向Sender发送一个PUBACK数据包,PUBACK数据包没有消息体(Payload),在可变头中有一个包标识(Packet Identifier),和它收到的PUBLISH包中的Packet Identifier一致。
  3. Sender收到PUBACK之后,根据PUBACK包中的Packet Identifier找到本地保存的PUBLISH包,然后丢弃掉,一次消息的发送完成。

但是消息传递流程中可能会出现问题:

  • 如果Sender在一段时间内没有收到PUBLISH包对应的PUBACK,它将该PUBLISH包的DUP标识设为1(代表是重新发送的PUBLISH包),然后重新发送该PUBLISH包。

  • Receiver可能会重复收到消息,需自行去重。

1.3. QoS2

相比QoS0和QoS1,QoS2不仅要确保Receiver能收到Sender发送的消息,还需要确保消息不重复。它的重传和应答机制就要复杂一些,同时开销也是最大的。QoS2下,一次消息的传递流程如下所示:

  1. Sender发送QoS为2的PUBLISH数据包,数据包 Packet Identifier 为 P,并在本地保存该PUBLISH包;
  2. Receiver收到PUBLISH数据包后,在本地保存PUBLISH包的Packet Identifier P,并回复Sender一个PUBREC数据包,PUBREC数据包可变头中的Packet Identifier为P,没有消息体(Payload);
  3. 当Sender收到PUBREC,它就可以安全的丢弃掉初始Packet Identifier为P的PUBLISH数据包。同时保存该PUBREC数据包,并回复Receiver一个PUBREL数据包,PUBREL数据包可变头中的Packet Identifier为P,没有消息体;
  4. 当Receiver收到PUBREL数据包,它可以丢掉保存的PUBLISH包的Packet Identifier P,并回复Sender一个可变头中 Packet Identifier 为 P,没有消息体(Payload)的PUBCOMP数据包;
  5. 当Sender收到PUBCOMP包,那么认为传输已完成,则丢掉对应的PUBREC数据包;

上面是一次完整无误的传输过程,然而传输过程中可能会出现以下情况:

  • 情况1:Sender发送PUBLISH数据包给Receiver的时候,发送失败;

  • 情况2:Sender已经成功发送PUBLISH数据包给Receiver了,但是Receiver发送PUBREC数据包失败;

  • 情况3:Sender已经成功收到了PUBREC数据包,但是PUBREL数据包发送失败;

  • 情况4:Receiver已经收到了PUBREL数据包,但是发送PUBCOMP数据包时发送失败

针对上述的问题,较为详细的处理方法如下:

  • 不管是情况1还是情况2,因为Sender在一定时间内没有收到PUBREC,那么它会把PUBLISH包的DUP标识设为1,重新发送该PUBLISH数据包;

  • 不管是情况3还是情况4,因为Sender在一定时间内没有收到PUBCOMP包,那么它会重新发送PUBREL数据包;

  • 针对情况2,Receiver可能会收到多个重复的PUBLISH包,更加完善的处理如下:

    Receiver在收到PUBLISH数据包之后,马上回复一个PUBREC数据包。并会在本地保存PUBLISH包的Packet Identifier P,不管之后因为重传多少次这个Packet Identifier 为P的数据包,Receiver都认为是重复的,丢弃。同时Receiver接收到QoS为2的PUBLISH数据包后,**并不马上投递给上层,**而是在本地做持久化,将消息保存起来(这里需要是持久化而不是保存在内存)。

  • 针对情况4,更加完善的处理如下:

    Receiver收到PUBREL数据包后,正式将消息递交给上层应用层,投递之后销毁Packet Identifier P,并发送PUBCOMP数据包,销毁之前的持久化消息。之后不管接收到多少个PUBREL数据包,因为没有Packet Identifier P,直接回复PUBCOMP数据包即可。

2. QoS降级

在 MQTT 协议中,从 Broker 到 Subscriber 这段消息传递的实际 QoS 等于:Publisher 发布消息时指定的 QoS 等级和 Subscriber 在订阅时与 Broker 协商的 QoS 等级,这两个 QoS 等级中的最小那一个。

Actual Subscribe QoS = MIN(Publish QoS, Subscribe QoS)

如下面代码所示:

该subscriber订阅消息时指定的QoS为1

import paho.mqtt.client as mqtt

'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):
    print("granted_qos:", granted_qos)

'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):
    print("message qos", message.qos)
    print("message topic", message.topic)
    print("message payload", message.payload)

def on_connect(client, userdata, flags, rc):
    if rc == 0 :
        print("subscribing")
        client.subscribe("test", 1)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行上述代码输出的结果为:

subscribing
granted_qos: (1,)

之后运行下面的publisher代码,指定发送的PUBLISH数据包的QoS等级为0

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.publish("test", payload="hello world", qos=0)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_pub", clean_session=False)
mqtt_client.on_connect = on_connect

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

结果上面运行subscriber代码的终端输出如下内容:

message qos 0
message topic test
message payload b'hello world'

上述的结果表示,订阅者收到的消息的qos等级为0。同样如果修改subscriber中的订阅主题的QoS等级为0,publisher中发布的PUBLISH包的QoS为1,那么输出结果同上。

3. QoS和会话

如果 Client 想接收离线消息,必须使用持久化的会话(Clean Session = 0)连接到 Broker,这样 Broker 才会存储 Client 在离线期间没有确认接收的 QoS 大于 等于1 的消息。

在发送QoS为1或2的情况,Broker(此时为Sender)会将发送的PUBLISH数据包保存到本地,直到收到一系列回复的数据包,然而Client(此时为Receiver)在离线期间无法回复相应的数据包,所以会一直存储。

4. QoS等级使用建议

在以下情况下你可以选择 QoS0:

  • Client 和 Broker 之间的网络连接非常稳定,例如一个通过有线网络连接到 Broker 的测试用 Client;
  • 可以接受丢失部分消息,比如你有一个传感器以非常短的间隔发布状态数据,所以丢一些也可以接受;
  • 你不需要离线消息。

在以下情况下你应该选择 QoS1:

  • 你需要接收所有的消息,而且你的应用可以接受并处理重复的消息;
  • 你无法接受 QoS2 带来的额外开销,QoS1 发送消息的速度比 QoS2 快很多。

在以下情况下你应该选择 QoS2:

  • 你的应用必须接收到所有的消息,而且你的应用在重复的消息下无法正常工作,同时你也能接受 QoS2 带来的额外开销。

本文参考

  1. MQTT协议QoS2 准确一次送达的实现
卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/83
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# MQTT
Python第三方库matplotlib的入门使用
机器学习局 | 梯度下降算法详解
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%