MQTT中文站
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
      • Node-RED 安装部署
      • Node-RED 用户指南
      • Node-RED 创建节点
      • Node-RED 示例教程
      • Node-RED 开发流程
      • Node-RED 接口参考
      • Node-RED 配置模板
      • Node-RED 常见问题
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册

如何在 Rust 中使用 MQTT

2 年前 • MQTT 编程
如何在 Rust 中使用 MQTT-MQTT中文站

1. Rust编程语言的简介
Rust是Mozilla研发的编译型通用编程语言,它的核心原则是安全性、并发性和实用性。结合函数式、并发式、过程式以及面向对象的编程风格,Rust提供了出色的性能和高效的内存利用。不同于其他具有垃圾收集机制的语言,Rust没有运行时,这使其在高性能要求的服务和嵌入式设备上运行得更加高效。其强大的类型系统和所有权模型进一步确保了内存和线程的安全性,从而在编译时消除了众多错误。

2. MQTT的物联网传输协议
MQTT是一个轻量级的物联网通信协议,基于发布/订阅模式,它针对网络带宽低和代码资源有限的环境进行了优化。被广泛应用于物联网、移动应用、智能硬件、车联网和能源行业。

3. Rust中的paho-mqtt客户端库
为了使Rust与MQTT进行高效通信,我们选择使用paho-mqtt,一个在Rust社区中广受欢迎的MQTT客户端库。它支持最新的MQTT版本,并提供多种传输协议选项。

项目初始化

本项目使用 Rust 1.44.0 进行开发测试,并使用 Cargo 1.44.0 包管理工具进行项目管理,读者可用如下命令查看当前的 Rust 版本。

~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)

选择 MQTT 客户端库

paho-mqtt 是目前 Rust 中,功能完善且使用较多的 MQTT 客户端,最新的 0.7.1 版本支持 MQTT v5、3.1.1、3.1,支持通过标准 TCP、SSL / TLS、WebSockets 传输数据,QoS 支持 0、1、2 等。

初始化项目

执行以下命令创建名为 mqtt-example 的 Rust 新项目。

~ cargo new mqtt-example
    Created binary (application) `mqtt-example` package

编辑项目中的 Cargo.toml 文件,在 dependencies 中添加 paho-mqtt 库的地址,以及指定订阅、发布代码文件对应的二进制文件。

[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }

[[bin]]
name = "sub"
path = "src/sub/main.rs"

[[bin]]
name = "pub"
path = "src/pub/main.rs"

Rust MQTT 的使用

创建客户端连接

本文将使用 EMQX 提供的 免费公共 MQTT 服务器 作为测试连接的 MQTT 服务器,该服务基于 EMQX 的 MQTT 物联网云平台 创建。服务器接入信息如下:

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

配置 MQTT Broker 连接参数

配置 MQTT Broker 连接地址(包括端口)、topic (这里我们配置了两个 topic ),以及客户端 id。

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];

编写 MQTT 连接代码

编写 MQTT 连接代码,为了提升使用体验,可在执行二进制文件时通过命令行参数的形式传入连接地址。通常我们需要先创建一个客户端,然后将该客户端连接到 broker.emqx.io。

let host = env::args().nth(1).unwrap_or_else(||
    DFLT_BROKER.to_string()
);

// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
    .server_uri(host)
    .client_id(DFLT_CLIENT.to_string())
    .finalize();

// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
    println!("Error creating the client: {:?}", err);
    process::exit(1);
});

// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
    .keep_alive_interval(Duration::from_secs(20))
    .clean_session(true)
    .finalize();

// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
    println!("Unable to connect:\n\t{:?}", e);
    process::exit(1);
}

发布消息

这里我们总共发布五条消息,根据循环的奇偶性,分别向 rust/mqtt、 rust/test 这两个主题发布。

for num in 0..5 {
    let content =  "Hello world! ".to_string() + &num.to_string();
    let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
    if num % 2 == 0 {
        println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
        msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
    } else {
        println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
    }
    let tok = cli.publish(msg);

            if let Err(e) = tok {
                    println!("Error sending message: {:?}", e);
                    break;
            }
}

订阅消息

在客户端连接之前,需要先初始化消费者。这里我们会循环处理消费者中的消息队列,并打印出订阅的 topic 名称及接收到的消息内容。

fn subscribe_topics(cli: &mqtt::Client) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
    ...
    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();
    ...
    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {
        if let Some(msg) = msg {
            println!("{}", msg);
        }
        else if !cli.is_connected() {
            if try_reconnect(&cli) {
                println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {
                break;
            }
        }
    }
    ...
}

完整代码

消息发布代码

use std::{
    env,
    process,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;

fn main() {
    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Define the set of options for the connection.
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(true)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }

    // Create a message and publish it.
    // Publish message to 'test' and 'hello' topics.
    for num in 0..5 {
        let content =  "Hello world! ".to_string() + &num.to_string();
        let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
        if num % 2 == 0 {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
            msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
        } else {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
        }
        let tok = cli.publish(msg);

                if let Err(e) = tok {
                        println!("Error sending message: {:?}", e);
                        break;
                }
    }


    // Disconnect from the broker.
    let tok = cli.disconnect(None);
    println!("Disconnect from the broker");
    tok.unwrap();
}

消息订阅代码

为了提升使用体验,消息订阅做了断开重连的处理,并在重新建立连接后对主题进行重新订阅。

use std::{
    env,
    process,
    thread,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
    println!("Connection lost. Waiting to retry connection");
    for _ in 0..12 {
        thread::sleep(Duration::from_millis(5000));
        if cli.reconnect().is_ok() {
            println!("Successfully reconnected");
            return true;
        }
    }
    println!("Unable to reconnect after several attempts.");
    false
}

// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();

    // Define the set of options for the connection.
    let lwt = mqtt::MessageBuilder::new()
        .topic("test")
        .payload("Consumer lost connection")
        .finalize();
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(false)
        .will_message(lwt)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }

    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {
        if let Some(msg) = msg {
            println!("{}", msg);
        }
        else if !cli.is_connected() {
            if try_reconnect(&cli) {
                println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {
                break;
            }
        }
    }

    // If still connected, then disconnect now.
    if cli.is_connected() {
        println!("Disconnecting");
        cli.unsubscribe_many(DFLT_TOPICS).unwrap();
        cli.disconnect(None).unwrap();
    }
    println!("Exiting");
}

运行与测试

编译二进制文件

执行以下命令,会在 mqtt-example/target/debug 目录下生成消息订阅、发布对应的 sub、pub 二进制文件。

cargo build
如何在 Rust 中使用 MQTT-MQTT中文站

消息订阅

执行 sub 二进制文件,等待消费发布。

如何在 Rust 中使用 MQTT-MQTT中文站

消息发布

执行 pub 二进制文件,可以看到分别往 rust/test 、rust/mqtt 这两个主题发布了消息。

如何在 Rust 中使用 MQTT-MQTT中文站

同时在消息订阅中可看到发布的消息

如何在 Rust 中使用 MQTT-MQTT中文站

至此,我们完成了使用 paho-mqtt 客户端连接到 公共 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。

8. 结论
结合Rust的高性能特性和MQTT的轻量级通信,物联网应用可以实现更快、更可靠的消息传输和处理。这不仅提高了物联网设备的性能,还为开发者带来了更为便捷的开发体验。

总之,Rust与MQTT结合为物联网通信带来了一场革命。利用这些技术,开发者可以设计出响应迅速、安全并且低成本的物联网应用。

打赏赞(1)微海报分享
mqtt qos rust string

阿里云接入MQTT 5.0服务器搭建与测试

Ubuntu上的Eclipse Paho MQTT C库详细安装与使用指南

猜你喜欢

改善基础设施:HiveMQ如何推动智能城市发展

改善基础设施:HiveMQ如何推动智能城市发展

08/07
2024
为什么企业选择全托管HiveMQ云进行MQTT部署

为什么企业选择全托管HiveMQ云进行MQTT部署

07/01
2024
MQTT 赋能工业 PLC 数据采集与应用

MQTT 赋能工业 PLC 数据采集与应用

06/30
2024
  • 解决方案
    • 智能家居
    • 汽车与出行
    • 工业制造
    • 能源电力
    • 石油石化
    • 交通物流
    • 零售
  • 学习
    • MQTT 规范
    • MQTT 教程
    • MQTT 软件
    • MQTT 客户端库
    • MQTT 服务器
    • 工具和应用程序
  • 关于我们
    • 了解创科慧仁
    • 加入创科慧仁
    • 投资者关系
    • 新闻动态
    • 合作伙伴
    • 联系我们
  • 友情链接
    • Modbus中文网
    • 跳动符号官网
    • 物联网世界
    • RFID世界网
    • 深圳物联网协会
    • isoftstone软通动力
    • 中国发展战略学研究会
    • B.P商业伙伴
  • 在线客服
  • 全国客户服务热线
    4006909885
  • 官方公众号
  • 联系邮箱
    contact@mqtt.cn
Copyright © 2025 MQTT中文站. All rights reserved.Designed by nicetheme. 京ICP备20029519号
在线客服

微信咨询

微信咨询

4006909885

服务热线 7*24小时

电话咨询
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册
 

正在加载评论...
 

您必须登录才能发表评论。

    string(5) "2.0.0"