MQTT入门指南
MQTT 简介
MQTT 是一种为连接物联网设备而设计的发布/订阅协议。与 HTTP 的请求/响应模式不同,MQTT 以事件驱动的方式运作,允许将消息推送给客户端。这种架构方法通过解耦数据生产者和数据消费者,消除了它们之间的依赖关系,从而实现了高度可扩展的解决方案。建立 MQTT 连接以发布和订阅消息的两个关键组件是 MQTT 客户端和 MQTT 代理,如下图所示。
MQTT 的优势
MQTT 提供了几个关键优势:
- 轻量级和高效:MQTT 最小化了客户端所需的资源和网络带宽。
- 双向通信:MQTT 促进了设备和服务器之间的通信,支持发布和订阅。它还允许向设备组广播消息。
- 可扩展性:MQTT 可以扩展以支持物联网或工业物联网生态系统中的数百万设备或“事物”。
- 服务质量(QoS)级别:MQTT 指定了不同的 QoS 级别以确保可靠的消息传递。
- 持久会话:MQTT 支持设备和服务器之间的持久会话,减少了在不可靠网络上的重新连接时间。
- 安全特性:MQTT 支持 TLS 加密以确保消息的保密性,并支持认证协议以验证客户端。
MQTT 应用和案例
MQTT 在各个行业和领域都有应用。它已被 BMW、Liberty Global、Fortum、Hytera、Awair 和 Matternet 等公司采用。这些公司在汽车、电信、能源、公共安全和连接产品领域成功利用了 MQTT。
MQTT 基础:掌握这一物联网协议的要点
MQTT 的核心是 MQTT 代理和 MQTT 客户端。MQTT 代理是发送者和接收者之间的中介,将消息分发给适当的接收者。MQTT 客户端将消息发布到代理,其他客户端订阅特定主题以接收消息。每个 MQTT 消息都包含一个主题,客户端订阅他们感兴趣的主题。MQTT 代理维护一个订阅者列表,并使用它将消息传递给相关客户端。
MQTT 代理还可以为离线客户端缓冲消息,即使在不可靠的网络条件下也确保可靠的消息传递。为了实现这一点,MQTT 支持三种不同的服务质量(QoS)级别:0(最多一次)、1(至少一次)和 2(恰好一次)。
MQTT 规范有两个版本:MQTT 3.1.1 和 MQTT 5。虽然大多数商业 MQTT 代理现在支持 MQTT 5,但一些物联网管理的云服务仍然主要支持 MQTT 3.1.1。我们强烈建议在新的物联网部署中使用 MQTT 5,因为它增强了专注于健壮性和云原生可扩展性的功能。
MQTT 客户端
有许多开源客户端可用于多种编程语言。HiveMQ 通过 HiveMQ MQTT 客户端库提供自己的 MQTT 客户端,旨在简化 MQTT 客户端的部署和实施,并为用户提供一流的功能、性能、安全性和可靠性。支持的一些编程语言包括 C#、C++、Java、Websockets、Python 等。Eclipse Paho 也提供了适用于 C/C++ 和 Python 等语言的 MQTT 客户端库。
MQTT 代理
MQTT 代理有多种实现方式,以满足不同的需求,如开源、商业和托管云服务。HiveMQ 提供商业版:HiveMQ 自托管和 HiveMQ 云,一个托管的云 MQTT 服务,以及 HiveMQ 社区版,一个开源版本。有关 MQTT 代理的详细列表,请访问 mqtt.cn。
MQTT 使用案例示例:
JAVA通过MQTT接入示例
说明:
本文中使用的代码为样例代码,仅用于体验平台通信功能,如需进行商用,可以参考资源获取获取对应语言的IoT Device SDK进行集成。
前提条件
- 已在管理控制台获取设备接入地址。获取地址的操作步骤,请参考平台对接信息。
- 已在管理控制台创建产品和设备。创建产品和设备的具体操作细节,请参考创建产品、注册单个设备或批量注册设备。
准备工作
安装IntelliJ IDEA
- 访问IntelliJ IDEA官网,选择合适系统的版本下载。(本文以windows 64-bit系统IntelliJ IDEA 2019.2.3 Ultimate为例)。
- 下载完成后,运行安装文件,根据界面提示安装。
导入代码样例
- 下载JAVA样例。
链接:https://pan.baidu.com/s/1JtQsONuIX8y7r0VpIepA5w?pwd=zne7
提取码:zne7
- 打开IDEA开发者工具,单击“ Import Project”。
- 选择步骤1中下载的样例,然后根据界面提示,单击“next”。
- 完成代码导入。
建立连接
设备或网关在接入物联网平台时首先需要和平台建立连接,从而将设备或网关与平台进行关联。开发者通过传入设备信息,将设备或网关连接到物联网平台。
MQTT基本信息
Topic | 权限 | 说明 |
Broker Address | iot.modbus.cn | MQTT地址 |
Broker Port | 1883 | MQTT端口号 |
Client ID | 4QR8TZ9ThuL4G | 设备号SN(请替换为自己的) |
MQTT账号密码
Topic | 权限 | 说明 |
User Name | ceshi | 用户账户 |
Password | 123456 | 用户密码 |
- 在建立连接之前,先修改以下参数:
// MQTT 服务器地址 private static final String MQTT_BROKER = "tcp://iot.modbus.cn:1883"; MQTT地址/端口号 // MQTT 客户端 ID private static final String MQTT_CLIENT_ID = "4QR8TZ9ThuL4G"; //设备号SN码 // MQTT 用户名 private static final String MQTT_USERNAME = "ceshi"; // MQTT 密码 private static final String MQTT_PASSWORD = "Abc123456"; // 订阅的主题 private static final String MQTT_TOPIC_SUBSCRIBE = "/server/coo/4QR8TZ9ThuL4G"; // 发布的主题 private static final String MQTT_TOPIC_PUBLISH = "/dev/coo/4QR8TZ9ThuL4G";
- MQTT_BROKER为物联网平台的设备对接地址,可参考平台对接信息获取(获取的是域名信息,可通过在cmd命令框中执行“ping 域名”,获取IP地址)。
- MQTT_CLIENT_ID为设备号SN,在成功添加设备后获取。
- 修改完1中的参数后就可使用MqttClient建立连接了。
- 连接成功后,设备显示在线。
备注:从设备地址是sensor_device_id ,寄存器是port_id,数据数值是Sdata ,具体稍后有具体说明。
使用MQTT通讯协议,下面的属性名称,Modbus功能码,数据格式,数据顺序不会影响最终数据,随便选择就可以!
MQTT 主题一览
Modbus物联网平台 作为物联网 PaaS 云平台,对设备 MQTT 接入提供了内置的访问协议规范,让设备和云平台的消息通信更加有章可循,大大简化了物联网项目的开发难度,缩短了产品的开发周期。
不同于普通的 MQTT 使用方式,我们提供了标准的内置主题,这足以实现绝大多数的物联网应用场景。
Topic | 权限 | 说明 |
/dev/coo/4QR8TZ9ThuL4G | 发布 | 上行通信:设备通过该Topic向物联网平台发送消息。 |
/server/coo/4QR8TZ9ThuL4G | 订阅 | 下行通信:设备通过订阅该Topic,获取从物联网平台下发的消息。 |
4QR8TZ9ThuL4G为示例设备号SN,请替换为自己的!
上行通信
Topic:/dev/coo/4QR8TZ9ThuL4G 设备号SN(请替换为自己的)
// 连接到 MQTT 服务器 mqttClient.connect(options); // 订阅主题 mqttClient.subscribe(MQTT_TOPIC_SUBSCRIBE); // 创建消息 MqttMessage message = new MqttMessage(); message.setPayload("[{"sensor_device_id": 1, "port_id": 1, "sdata": 66}]".getBytes()); // 发布消息 mqttClient.publish(MQTT_TOPIC_PUBLISH, message);
备注:从设备地址是sensor_device_id ,寄存器是port_id,数据数值是Sdata
下行通信
Topic:/server/coo/4QR8TZ9ThuL4G 设备号SN(请替换为自己的)
System.out.println("设备ID: " + sensor_device_id + ", 端口ID: " + port_id + ", 数据: " + sdata);
备注:从设备地址是sensor_device_id ,寄存器是port_id,数据数值是Sdata
完整代码:
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.json.JSONArray; import org.json.JSONObject; public class MqttDemo { // MQTT 服务器地址 private static final String MQTT_BROKER = "tcp://iot.modbus.cn:1883"; // MQTT 客户端 ID private static final String MQTT_CLIENT_ID = "4QR8TZ9ThuL4G"; // MQTT 用户名 private static final String MQTT_USERNAME = "ceshi"; // MQTT 密码 private static final String MQTT_PASSWORD = "Abc123456"; // 订阅的主题 private static final String MQTT_TOPIC_SUBSCRIBE = "/server/coo/4QR8TZ9ThuL4G"; // 发布的主题 private static final String MQTT_TOPIC_PUBLISH = "/dev/coo/4QR8TZ9ThuL4G"; // 心跳间隔,单位为秒 private static final int MQTT_KEEP_ALIVE_INTERVAL = 60; public static void main(String[] args) throws MqttException { // 创建 MQTT 客户端 MqttClient mqttClient = new MqttClient(MQTT_BROKER, MQTT_CLIENT_ID, new MemoryPersistence()); // 设置 MQTT 连接选项 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(MQTT_USERNAME); options.setPassword(MQTT_PASSWORD.toCharArray()); options.setCleanSession(true); options.setKeepAliveInterval(MQTT_KEEP_ALIVE_INTERVAL); // 设置回调 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("连接断开"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("收到消息。主题: " + topic); System.out.println("消息: " + new String(message.getPayload())); // 将消息转换为字符串 String msg = new String(message.getPayload()); // 使用 JSON 工具解析字符串 JSONArray jsonArray = new JSONArray(msg); for (int i = 0; i < jsonArray.length(); i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); int sensor_device_id = jsonObject.getInt("sensor_device_id"); int port_id = jsonObject.getInt("port_id"); int sdata = jsonObject.getInt("sdata"); // 在这里你可以处理接收到的数据 System.out.println("设备ID: " + sensor_device_id + ", 端口ID: " + port_id + ", 数据: " + sdata); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("消息发布完成"); } }); // 连接到 MQTT 服务器 mqttClient.connect(options); // 订阅主题 mqttClient.subscribe(MQTT_TOPIC_SUBSCRIBE); // 创建消息 MqttMessage message = new MqttMessage(); message.setPayload("[{"sensor_device_id": 1, "port_id": 1, "sdata": 66}]".getBytes()); // 发布消息 mqttClient.publish(MQTT_TOPIC_PUBLISH, message); } }
回复