介绍
正如我们可能已经知道的那样,Node.js是一个异步和事件驱动的JavaScript运行时和引擎,为今天存在的许多基于服务器端的、网络化的应用程序提供动力。在本文中,我们将探讨Node.js与MQTT的互动,MQTT是一种用于物联网(IoT)世界的发布/订阅(pub/sub)协议和标准。
在本文中,我们计划仅涵盖重要的公共MQTT API和函数,并探讨Node.js中的简单发布者和订阅者脚本。
什么是MQTT?
1999年,IBM的Andy Standford-Clark和Arlen Nipper设计了MQTT协议的最初版本。当时的目标是构建一个支持低带宽、轻量级且消耗最少资源的协议,因为通过卫星链路连接的设备非常昂贵。
该规范有两个版本:MQTT 3.1.1和MQTT 5.0.0。大多数商业MQTT代理现在支持MQTT 5,但许多IoT托管云服务仅支持MQTT 3.1.1,这曾经是最受欢迎和广泛支持的版本。
强烈建议新的IoT部署使用5.0.0版本(2018年批准),因为其新功能更侧重于强大的系统和云原生可伸缩性。您可以在MQTT的GitHub页面上阅读有关两个版本之间的亮点和详细差异的更多信息。
今天的用途
MQTT是一种轻量级的客户端-服务器协议,实现了发布/订阅消息传输模型,并已用于连接关键的IoT应用程序、机器对机器(M2M)通信和许多其他需要具有有限带宽但卓越性能的消息传输平滑界面的领域。这是其简单的设计、易用性和开放规范的结果。
自2014年以来,MQTT一直由OASIS技术指导委员会管理。该委员会负责维护、更新和维护该标准,包括组织用例文档和保护其知识产权。
它依赖于TCP/IP,这是一个可靠的、面向连接的低级网络堆栈,传输层独立于数据有效载荷结构,实现了一种有序和正确的双向主机对主机的通信模型。
发布/订阅模式还允许消息从一个应用程序分发到许多不同的应用程序。但在这种情况下,发布者和订户通常是独立的应用程序(解耦),只通过代理或服务器连接。例如,流行的发布/订阅消息平台RabbitMQ在内部使用了MQTT。
MQTT的用例
MQTT已经找到了许多用例,我们将在下面探讨其中一些。
- 启用消息广播:客户端可以发布消息或有效载荷到多个其他客户端应用程序,当然,这些应用程序需要通过代理事先通过主题订阅这些消息.
- 提供轻量级和最小的消息头/资源:这允许在消息传输中使用最小的带宽,因此MQTT可以有效地扩展以服务数百万个IoT设备
- 在其核心,它提供了可靠的消息传输或交付:大多数IoT设备都依赖于这一点,但从高层次上看,MQTT定义了保证消息传输得以处理的服务层
- 构建高度安全的消息传输应用程序:MQTT在这方面非常出色,因为消息有效载荷可以使用TLS和其他现代身份验证机制(如OAUTH)进行加密和安全保护
- 确保客户端应用程序具有持久连接:这对于网络连接可能不稳定的区域中的临时消息存储非常方便
- 相关的是,MQTT有助于减少在信号差的蜂窝网络上设备重新连接所需的时间
- 在不同行业和领域找到应用,包括汽车、石油和天然气、制造、物流、交通和智能家居设备行业
- 您可以在MQTT文档的用例页面上找到有关这些信息的更多详细信息。最受欢迎的开源家庭自动化项目之一,Home Assistant,基于MQTT协议。
MQTT的发布/订阅架构
MQTT协议包括代理(broker),它充当中央服务器,将发布者客户端的消息引导到订阅者客户端,以及一个或多个客户端应用程序,可以是消息或数据的订阅者或发布者。
代理可以看作是邮递员,确保消息被传递给其各自的接收者。它们应该被设计成高度可扩展和安全的,因为它们是MQTT客户端的中心关注点。
在高层次上,代理充当一个网关,将发布的消息从发布者应用程序路由到适当的订阅者。通常情况下,代理可以部署在多个集群或实例上,并置于负载均衡器后,以提高容错性。MQTT客户端将消息发布到中央代理(通常是到主题),其他客户端可以订阅代理上的相同主题以接收这些消息。
因此,一个客户端将消息发布到主题,而其他客户端订阅该主题,表示它们有兴趣接收相关消息。代理具有一种过滤机制,用于控制应该将消息发送给哪些订阅者。这意味着代理检查或筛选特定主题或主题组上的订阅者(或订阅者列表),然后将消息发送给它们。
总之,代理读取、确认和处理来自发布者客户端或应用程序的消息(包括确定主题的订阅者以及将所有适当的消息发送给它们)。
注意:MQTT依赖一种过滤消息的方式,代理将消息发送给对获取消息内容感兴趣的订阅者。消息通常包含一个主题,代理使用该主题来确定如何将特定消息路由到适当的订阅客户端。
基于主题的过滤涉及客户端(发布者和订阅者)通过主题与代理进行交互,主题是每个消息有效载荷的一部分。
到目前为止,我们已经使用了一些对一些读者可能听起来很新的技术术语。在下一节中,我们将探讨一些这些术语及其含义。
一些MQTT技术概念解释
- 桥接(Bridge) - 两个MQTT代理之间的连接
- MQTT客户端(MQTT client) - 连接到经过安全网络连接的MQTT代理的设备或使用MQTT客户端库编写的应用程序;MQTT客户端可以是发布者或订阅者应用/客户端
- 消息(Message) - 简单地说,要发布的消息,可以是缓冲区(Buffer)、字符串(String)或JSON对象
- 主题(Topics) - 代理用于过滤并将适当的消息发送到连接的客户端的字符串标识符。主题名称通常以层次结构的方式进行构建,带有分隔符,称为主题级别 (注意,消息必须包含代理可以使用的主题,以适当地路由有效载荷到感兴趣的客户端) 示例主题名称:mytopic/homeautomation/closedoor
- 发布者(Publisher) - 发布者客户端将数据或消息分发到服务器/代理上的主题,供其他可能有兴趣获取这些消息的订阅者客户端使用
- 物联网(IoT) - 由嵌入式系统、自动化设备、无线网络和控制机制组成的连接设备的世界
- 解耦(Decoupling) - 在这种情况下,解耦意味着发布者/订阅者只需要知道代理的主机名/IP和端口 - 不像传统的客户端-服务器架构,其中客户端和服务器通过端点/API直接通信,通常以URI格式进行通信。
在应用级别使用MQTT
现在,让我们看看MQTT在应用级别是如何工作的。我们将使用MQTT的Node.js客户端库mqtt.js。
MQTT.js是MQTT协议的开源JavaScript库,适用于Node.js和浏览器。通常,该库可用于发布消息和订阅MQTT代理上的主题。
关于MQTT.js库的一些要点
- 支持ES模块和Common.js样式的文件导入
- 它具有基于Promise的API接口,因为MQTT本身是异步工作的
- MQTT.js默认使用旧的MQTT v3.1.1,以支持旧代理,但当前的最新版本是5.0
- MQTT客户端带有内置的错误处理程序,在程序员未能处理其代码中的错误时非常有用
请注意,还有可用于各种编程语言和主要操作系统(Linux、Windows和macOS)的客户端库。
连接/断开MQTT代理/服务器
客户端连接始终由代理/服务器处理,因为MQTT订阅者和发布者是独立的、解耦的应用程序。正如我们之前提到的,发布者和订阅者都是MQTT客户端,因此需要连接到同一个代理/服务器。
客户端永远不会直接连接到彼此;连接通常是在一个客户端和代理之间通过TCP/IP进行的。其他MQTT实现或变种也可以通过UDP连接(MQTT<-SN)。
代理负责:
- 接收所有消息
- 通过确定哪个订阅客户端订阅每条消息来筛选适当的消息
- 保持客户端之间的连接/会话
- 对客户端进行身份验证和授权
- 将消息发送给正确的客户端
代理应该容易扩展并集成到不同的后端系统中。它们可以具有相当高的容错性,因为它们是发布者/订阅者通信的最关键点。
要首次连接到代理,客户端发送CONNECT消息。一旦启动,代理将返回一个CONNACK消息和一个状态代码。还需要注意的一点是,代理始终保持连接处于活动状态,除非客户端发送断开事件或其互联网连接中断。
目前有一些流行的免费托管的MQTT代理。Eclipse的Mosquitto就是其中之一,它可以运行在所有主要操作系统上。
还有其他商业云端或托管的代理,比如HIVEMQ。如果我们不打算安装和管理自己的代理,它们通常会非常方便。您可以在MQTT网站上找到用于快速测试的优秀MQTT代理。
安装我们的客户端库
要安装MQTT.js,请运行以下命令:
npm install mqtt --save
请注意,MQTT.js捆绑了与代理进行交互的命令。要使MQTT协议接口可用于系统路径,我们可以全局安装它:
npm install mqtt -g
安装完成后,我们的package.json文件应如下所示:
// package.json
{
"name": "mqtt-demo",
"version": "1.0.0",
"description": "一个Node.js和MQTT演示",
"main": "index.js",
"scripts": {
"start-publisher": "nodemon publisher.js",
"start-consumer": "nodemon subscriber.js"
},
"keywords": [
"Node.js",
"MQTT",
"Pub/Sub",
"IoT",
"message",
"transport"
],
"author": "Alexander Nnakwue",
"license": "MIT",
"dependencies": {
"dotenv": "^10.0.0",
"mqtt": "^4.3.2"
},
"devDependencies": {
"nodemon": "^2.0.15"
}
}
要测试程序,可以在一个终端窗口上运行发布者,然后在另一个终端窗口上运行订阅者。
创建一个MQTT发布客户端
现在,让我们创建一个发布消息的MQTT客户端。为此,我们可以导入MQTT.js库并使用connect方法。
const mqtt = require('mqtt');
require('dotenv').config();
const clientId = 'mqttjs_' + Math.random().toString(8).substr(2, 4);
const client = mqtt.connect(process.env.BROKER_URL, { clientId: clientId });
请注意,我们已经将BROKER_URL添加到我们的环境文件中。如我们所见,connect方法接受给定的URL(代理服务器URL)和可选的服务器选项对象。接受的协议可以是MQTT、ws、wss、tcp、tls等等。connect方法返回一个已连接的客户端。
为了在MQTT客户端连接中断时尝试重新连接,我们可以将reconnectPeriod选项(两次重新连接之间的时间间隔)设置为大于零。默认值为1秒,这意味着在断开连接后,它会几乎立即尝试重新打开连接。
const client = mqtt.connect(process.env.BROKER_URL, { clientId: clientId, clean: false, reconnectPeriod: 1 });
如果将reconnectPeriod客户端选项的值设置为0,则将禁用重新连接,并在连接断开时终止。
当将resubscribe选项设置为其默认值(true)时,客户端可以在连接中断时自动重新连接和重新订阅先前订阅的主题。尤其是对于自托管的代理,我们可能需要使用用户名和密码进行身份验证。有关服务器选项对象的更多详细信息可以在MQTT GitHub上找到。
发布数据和消息
一旦连接到代理,MQTT客户端几乎可以立即发送消息。发布事件需要消息负载和主题名称,代理可以使用它来识别订阅方。此外,还有一个回调选项,用于检查错误或消息数据包是否已传输。
发送的消息类型或数据包具有以下属性:
- topicName
- dupFlag
- qos
- payload
- packetId或messageId
- retainFlag
{
cmd: 'publish',
topic: 'test/connection',
payload: '{"1":"Hello world","2":"Welcome to the test connection"}',
qos: 1,
retain: true,
messageId: 12041,
dup: false
}
命令,如下所示。
当客户端向代理发送消息时,代理会根据开发人员设置的某些标准来处理消息。这应该包括QoS级别,它确定消息达到预期接收方的保证类型,并确保消息传递保证。
处理阶段通常涉及读取消息、确认消息和识别订阅主题的客户端。最后一步是将消息发送给订阅的客户端。
以下是publisher.js文件的完整代码,包括一些公共方法/API:
//publisher.js
const mqtt = require('mqtt')
require('dotenv').config()
//the client id is used by the MQTT broker to keep track of clients and and their // state
const clientId = 'mqttjs_' + Math.random().toString(8).substr(2, 4)
const client = mqtt.connect(process.env.BROKER_URL, {clientId: clientId, clean: false});
// console.log(process.env.BROKER_URL, 'client', clientId)
const topicName = 'test/connection'
client.on("connect",function(connack){
console.log("client connected", connack);
// on client connection publish messages to the topic on the server/broker
const payload = {1: "Hello world", 2: "Welcome to the test connection"}
client.publish(topicName, JSON.stringify(payload), {qos: 1, retain: true}, (PacketCallback, err) => {
if(err) {
console.log(err, 'MQTT publish packet')
}
})
//assuming messages comes in every 3 seconds to our server and we need to publish or process these messages
setInterval(() => console.log("Message published"), 3000);
})
client.on("error", function(err) {
console.log("Error: " + err)
if(err.code == "ENOTFOUND") {
console.log("Network error, make sure you have an active internet connection")
}
})
client.on("close", function() {
console.log("Connection closed by client")
})
client.on("reconnect", function() {
console.log("Client trying a reconnection")
})
client.on("offline", function() {
console.log("Client is currently offline")
})
接下来,我们可以继续实现订阅客户端,该客户端会接收主题上的消息。
订阅消息
为了接收我们感兴趣的主题的消息,客户端通过subscribe事件向代理发送订阅请求。所订阅的消息通常包含消息数据包负载,如下所示。
//stdout
Packet {
cmd: 'publish',
retain: true,
qos: 0,
dup: false,
length: 73,
topic: 'test/connection',
payload: <Buffer 7b 22 31 22 3a 22 48 65 6c 6c 6f 20 77 6f 72 6c 64 22 2c 22 32 22 3a 22 57 65 6c 63 6f 6d 65 20 74 6f 20 74 68 65 20 74 65 73 74 20 63 6f 6e 6e 65 63 ... 6 more bytes>
} {"1":"Hello world","2":"Welcome to the test connection"}```
//
[ { topic: 'test/connection', qos: 0 } ] granted
与其将主题名称作为常规的分隔字符串,我们还可以将主题存储为通配符,以便订户可以轻松订阅主题模式,而不是一次订阅一个主题。发布者和订阅者客户端需要提前了解主题模式的主题名称。
需要注意的是,订阅客户端需要提前了解他们将要接收的数据的结构,以便能够正确地处理数据。发布者以特定格式将消息发送到代理的特定主题,并接收该消息的预定订户需要知道数据的结构,以便能够在不破坏应用程序的情况下正确处理它。
订户客户端的完整代码如下所示。
// subscriber.js
const mqtt = require('mqtt')
const client = mqtt.connect("mqtt://test.mosquitto.org")
const topicName = 'test/connection'
// connect to same client and subscribe to same topic name
client.on('connect', () => {
// can also accept objects in the form {'topic': qos}
client.subscribe(topicName, (err, granted) => {
if(err) {
console.log(err, 'err');
}
console.log(granted, 'granted')
})
})
// on receive message event, log the message to the console
client.on('message', (topic, message, packet) => {
console.log(packet, packet.payload.toString());
if(topic === topicName) {
console.log(JSON.parse(message));
}
})
client.on("packetsend", (packet) => {
console.log(packet, 'packet2');
})
MQTT的其他功能
以下是MQTT的一些特点。
保留的消息
保留的消息是一个设置为true的MQTT消息,其保留标志/选项设置为true。默认情况下,当代理/服务器接收到没有订户的主题的消息时,消息会被丢弃。但是,MQTT有一种通过设置标志来保留这些消息的机制,该标志告诉发布者保留消息。请注意,代理每个主题只存储一个保留的消息。
广泛的身份验证和数据安全支持
MQTT支持各种身份验证方法和数据安全机制,包括TLS和OAuth,通常在MQTT代理上配置。因此,
这意味着实施这些服务器的客户端需要遵守所定义的机制。
默认情况下,MQTT支持重新连接机制,可以在网络连接质量低的区域建立持久连接。这对于存储消息非常重要,但与传统队列系统不同,代理不仅仅存储消息。MQTT通过确保客户端会话持久存在并且QoS级别大于0来存储消息。
服务质量(QoS)
为了处理发布/订阅系统中的典型挑战,MQTT实现了三个服务质量(QoS)级别。这三个级别包括0、1和2,它们确定消息达到预期接收方(客户端或代理)的保证类型。
为支持可靠的消息传递,该协议支持三种不同类型的服务质量消息:
- 0 – 至多一次
- 1 – 至少一次
- 2 – 正好一次
为存储消息,客户端必须订阅QoS大于0的主题。
数据不可知
MQTT是数据不可知的,客户端的用例确定了数据的结构方式。因此,可以发送任何类型的消息,包括图像、编码文本、加密数据等。
注意:默认的未加密MQTT端口是1883。还支持加密的TCP/IP端口8883,用于使用SSL的MQTT。
结论
MQTT提供了适用于网络带宽有限的领域的双向发布/订阅消息模型。服务独立于我们的主要应用程序,因为它们是解耦的,可以单独扩展代理或服务器。
回复