MQTT中文站
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
      • Node-RED 安装部署
      • Node-RED 用户指南
      • Node-RED 创建节点
      • Node-RED 示例教程
      • Node-RED 开发流程
      • Node-RED 接口参考
      • Node-RED 配置模板
      • Node-RED 常见问题
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册

实现百万级物联网通信:探索Mica-MQTT的潜力

2 年前 • MQTT 教程

一、简介

Mica-MQTT是一个基于Java AIO实现的开源组件,旨在提供简单易用、低延迟、高性能的百万级MQTT客户端和物联网Broker服务。它更容易集成到现有服务中,降低自主开发物联网平台的成本。

二、使用场景

Mica-MQTT适用于多种场景,包括但不限于:

  • 物联网云端MQTT Broker
  • 边缘设备之间的消息通信
  • 群组类即时通讯
  • 消息推送
  • 简单易用的MQTT客户端

三、优势

Mica-MQTT的主要优势包括:

  • 简单易用,功能强大
  • 易于二次开发和扩展
  • 高性能,支持百万级连接

四、功能

Mica-MQTT提供了丰富的功能集,包括但不限于:

  • 支持MQTT v3.1、v3.1.1和v5.0协议
  • 支持WebSocket MQTT子协议(兼容mqtt.js)
  • 支持HTTP REST API,详细文档请参见[1]
  • 提供MQTT客户端库
  • 提供MQTT服务端
  • 支持MQTT客户端和服务端的共享订阅
  • 支持MQTT遗嘱消息
  • 支持MQTT保留消息
  • 支持自定义消息处理和转发
  • 提供阿里云MQTT客户端连接示例
  • 支持GraalVM编译成本机可执行程序
  • 快速集成到Spring Boot项目(使用mica-mqtt-spring-boot-starter)
  • 支持与Prometheus和Grafana对接
  • 基于Redis Pub/Sub实现集群,详细信息请参见mica-mqtt-broker模块[2]

五、依赖

Spring Boot项目

客户端依赖

<dependency>
  <groupId>net.dreamlu</groupId>
  <artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
  <version>${mica-mqtt.version}</version>
</dependency>
客户端配置示例
mqtt:
  client:
    enabled: true               # 是否开启客户端,默认:true
    ip: 127.0.0.1               # 连接的服务端 IP,默认:127.0.0.1
    port: 1883                  # 端口,默认:1883
    name: Mica-Mqtt-Client      # 客户端名称,默认:Mica-Mqtt-Client
    clientId: 000001            # 客户端ID(通常为设备SN,不可重复)
    user-name: mica             # 认证用户名
    password: 123456            # 认证密码
    timeout: 5                  # 超时时间(秒),默认:5秒
    reconnect: true             # 是否自动重连,默认:true
    re-interval: 5000           # 重连时间(毫秒),默认:5000毫秒
    version: mqtt_3_1_1         # MQTT协议版本,可选MQTT_3_1、mqtt_3_1_1、mqtt_5,默认:mqtt_3_1_1
    read-buffer-size: 8KB       # 接收数据的缓冲区大小,默认:8KB
    max-bytes-in-message: 10MB  # 消息解析的最大字节数,默认:10MB
    buffer-allocator: heap      # 内存分配器(堆内存或堆外内存),默认:堆内存
    keep-alive-secs: 60         # Keep-Alive时间(秒)
    clean-session: true         # MQTT Clean Session,默认:true
    ssl:
      enabled: false            # 是否开启SSL认证,2.1.0版本开始支持双向认证
      keystore-path:            # 可选参数:SSL双向认证的密钥库路径,支持classpath:/路径。
      keystore-pass:            # 可选参数:SSL双向认证的密钥库密码
      truststore-path:          # 可选参数:SSL双向认证的信任库路径,支持classpath:/路径。
      truststore-pass:          # 可选参数:SSL双向认证的信任库密码

客户端监听示例

@Service
public class MqttClientConnectListener {
    private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);

    @Autowired
    private MqttClientCreator mqttClientCreator;

    @EventListener
    public void onConnected(MqttConnectedEvent event) {
        logger.info("MqttConnectedEvent: {}", event);
    }

    @EventListener
    public void onDisconnect(MqttDisconnectEvent event) {
        // 在客户端离线时更新重连时的客户端ID、用户名和密码
        logger.info("MqttDisconnectEvent: {}", event);
        mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
            .username("newUserName")
            .password("newPassword");
    }
}

自定义客户端配置示例(可选)

@Configuration(proxyBeanMethods = false)
public class MqttClientCustomizerConfiguration {

  @Bean
  public MqttClientCustomizer mqttClientCustomizer() {
    return new MqttClientCustomizer() {
      @Override
      public void customize(MqttClientCreator creator) {
        // 在此处可以自定义配置,会覆盖YAML配置
        System.out.println("----------------MqttServerCustomizer-----------------");
      }
    };
  }
}

客户端订阅示例

@Service
public class MqttClientSubscribeListener {
  private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);

  @MqttClientSubscribe("/test/#")
  public void subQos0(String topic, byte[] payload) {
    logger.info("topic: {} payload: {}", topic, new String(payload, StandardCharsets.UTF_8));


  }

  @MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.AT_LEAST_ONCE)
  public void subQos1(String topic, byte[] payload) {
    logger.info("topic: {} payload: {}", topic, new String(payload, StandardCharsets.UTF_8));
  }

  @MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
  public void thingSubRegister(String topic, byte[] payload) {
    // 1.3.8版本开始支持,@MqttClientSubscribe注解支持${}变量替换,默认替换为+
    // 注意:Mica-MQTT会先从Spring Boot配置中替换参数${},如果存在配置将优先被替换。
    logger.info("topic: {} payload: {}", topic, new String(payload, StandardCharsets.UTF_8));
  }
}

共享订阅Topic说明

Mica-MQTT客户端支持两种共享订阅方式:

  • 共享订阅:订阅前缀使用$queue/,多个客户端订阅了$queue/topic,当有消息发布到topic时,只有一个客户端会接收到消息。
  • 分组订阅:订阅前缀使用$share/<group>/,组内的客户端订阅了$share/group1/topic、$share/group2/topic等,当有消息发布到topic时,每个组内只有一个客户端会接收到消息。

MqttClientTemplate使用示例

import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

@Service
public class MainService {
    private static final Logger logger = LoggerFactory.getLogger(MainService.class);
    @Autowired
    private MqttClientTemplate client;

    public boolean publish() {
        client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
        return true;
    }

    public boolean sub() {
        client.subQos0("/test/#", (context, topic, message, payload) -> {
            logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
        });
        return true;
    }
}
2. 服务端依赖
<dependency>
  <groupId>net.dreamlu</groupId>
  <artifactId>mica-mqtt-server</artifactId>
  <version>${mica-mqtt.version}</version>
</dependency>
2.1 服务端使用
// 注意:为了能够接受更多连接(降低内存占用),请添加JVM参数 -Xss129k
MqttServer mqttServer = MqttServer.create()
    .ip("0.0.0.0")              // 服务端IP,默认为空(0.0.0.0),建议不要设置
    .port(1883)                 // 端口,默认:1883
    .readBufferSize(512)        // 读缓冲区大小,默认:512字节
    .maxBytesInMessage(1024 * 100) // 最大包体长度,默认:8092
    .authHandler((clientId, userName, password) -> true) // 自定义认证
    .messageListener((context, clientId, message) -> {
        logger.info("clientId: {} message: {} payload: {}", clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
    }) // 消息监听
    .bufferAllocator(ByteBufferAllocator.HEAP) // 内存分配器,默认:堆内存
    .heartbeatTimeout(120_1000L) // 心跳超时时间,默认:120秒
    .useSsl("", "", "") // SSL配置
    .connectStatusListener(new IMqttConnectStatusListener() {
        @Override
        public void online(String clientId) {
            // 客户端上线时的处理逻辑
        }

        @Override
        public void offline(String clientId) {
            // 客户端离线时的处理逻辑
        }
    }) // 自定义客户端上下线监听
    .messageDispatcher(new IMqttMessageDispatcher() {
        @Override
        public void config(MqttServer mqttServer) {
            // 配置消息分发
        }

        @Override
        public boolean send(Message message) {
            // 发送消息
            return false;
        }

        @Override
        public boolean send(String clientId, Message message) {
            // 发送消息给指定客户端
            return false;
        }
    }) // 自定义消息转发,可使用MQ广播实现集群处理
    .debug() // 开启调试信息日志
    .start();

// 发送消息给指定客户端
mqttServer.publish("clientId", "/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));

// 发送消息给所有在线监听这个Topic的客户端
mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));

// 停止服务
mqttServer.stop();

六、默认端口

以下是Mica-MQTT的默认端口:

  • MQTT TCP端口:1883
  • HTTP和WebSocket端口:8083

这些端口用于不同的通信协议和服务。


请注意,上述示例代码可能需要根据你的具体需求进行调整和扩展。如果需要更多信息或帮助,请随时提问。

打赏赞微海报分享
Mica-MQTT mqtt 客户端库

Eclipse Paho Java MQTT 客户端:实现 IoT 设备通信

Ascoltatori:支持 MQTT 及多种代理/协议的消息发布/订阅库

猜你喜欢

改善基础设施:HiveMQ如何推动智能城市发展

改善基础设施:HiveMQ如何推动智能城市发展

08/07
2024
为什么企业选择全托管HiveMQ云进行MQTT部署

为什么企业选择全托管HiveMQ云进行MQTT部署

07/01
2024
MQTT 赋能工业 PLC 数据采集与应用

MQTT 赋能工业 PLC 数据采集与应用

06/30
2024
  • 解决方案
    • 智能家居
    • 汽车与出行
    • 工业制造
    • 能源电力
    • 石油石化
    • 交通物流
    • 零售
  • 学习
    • MQTT 规范
    • MQTT 教程
    • MQTT 软件
    • MQTT 客户端库
    • MQTT 服务器
    • 工具和应用程序
  • 关于我们
    • 了解创科慧仁
    • 加入创科慧仁
    • 投资者关系
    • 新闻动态
    • 合作伙伴
    • 联系我们
  • 友情链接
    • Modbus中文网
    • 跳动符号官网
    • 物联网世界
    • RFID世界网
    • 深圳物联网协会
    • isoftstone软通动力
    • 中国发展战略学研究会
    • B.P商业伙伴
  • 在线客服
  • 全国客户服务热线
    4006909885
  • 官方公众号
  • 联系邮箱
    contact@mqtt.cn
Copyright © 2025 MQTT中文站. All rights reserved.Designed by nicetheme. 京ICP备20029519号
在线客服

微信咨询

微信咨询

4006909885

服务热线 7*24小时

电话咨询
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册
string(5) "2.0.0"