Mica-MQTT 是一款强大的 MQTT(Message Queuing Telemetry Transport)物联网组件,旨在提供出色的性能和灵活性。它适用于各种使用场景,包括物联网、消息通信、即时通讯(IM)和消息推送。本文将向您介绍 Mica-MQTT 的主要功能、优势以及使用场景。
使用场景
Mica-MQTT 可以用于多种用途,包括但不限于:
- 物联网(云端 MQTT Broker): 用于支持大规模物联网设备的通信和数据传输。
- 物联网(边缘端消息通信): 适用于连接边缘设备的消息传递,支持低延迟通信。
- 群组类 IM: 用于构建即时通讯应用,支持群组聊天和私聊。
- 消息推送: 用于实现消息推送服务,将消息快速可靠地传递给接收者。
- 简单易用的 MQTT 客户端: Mica-MQTT 提供了 MQTT 客户端,使开发者可以轻松与 MQTT 代理进行通信。
优势
Mica-MQTT 具有以下显著优势:
- 灵活而强大: Mica-MQTT 提供了丰富的功能集,同时保持了灵活性,可以根据需要进行二次开发或扩展。
- 支持 MQTT 协议: 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议,满足不同 MQTT 版本的需求。
- WebSocket 支持: 支持 MQTT 子协议的 WebSocket 连接,允许浏览器和其他应用使用 MQTT 协议进行通信。
- HTTP REST API: 提供 HTTP REST API,使您可以使用 HTTP 请求进行通信。具体的 API 文档详见官方文档。
- 集群支持: Mica-MQTT 支持 MQTT 客户端和服务器的共享订阅,采用高效的 topic 树存储方式,能够处理百万级别的 topic,保持高性能。
- 遗嘱消息和保留消息: 支持 MQTT 遗嘱消息和保留消息,确保消息的可靠性和持久性。
- Spring Boot 集成: 提供 Spring Boot 项目的快速接入,使集成更加简单。
- 监控支持: 支持与 Prometheus 和 Grafana 集成,实现监控和性能优化。
- GraalVM 支持: 您可以使用 GraalVM 将 Mica-MQTT 编译成本机可执行程序,以获得更好的性能。
默认端口
Mica-MQTT 使用以下默认端口:
您可以在 演示地址 上查看 Mica-MQTT 的演示,使用账号 mica
和密码 mica
登录以了解更多。
总结
Mica-MQTT 是一款功能丰富、性能出色的 MQTT 物联网组件,适用于各种 IoT 和通信需求。如果您正在寻找可靠的 MQTT 解决方案,Mica-MQTT 可能是您的理想之选。
Spring boot 项目
客户端:
一、添加依赖
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
<version>${最新版本}</version>
</dependency>
二、mqtt 客户端
2.1 配置项示例
mqtt:
client:
enabled: true # 是否开启客户端,默认:true
ip: 127.0.0.1 # 连接的服务端 ip ,默认:127.0.0.1
port: 1883 # 端口:默认:1883
name: Mica-Mqtt-Client # 名称,默认:Mica-Mqtt-Client
clientId: 000001 # 客户端Id(非常重要,一般为设备 sn,不可重复)
user-name: mica # 认证的用户名
password: 123456 # 认证的密码
timeout: 5 # 超时时间,单位:秒,默认:5秒
reconnect: true # 是否重连,默认:true
re-interval: 5000 # 重连时间,默认 5000 毫秒
version: mqtt_3_1_1 # mqtt 协议版本,可选 MQTT_3_1、mqtt_3_1_1、mqtt_5,默认:mqtt_3_1_1
read-buffer-size: 8KB # 接收数据的 buffer size,默认:8k
max-bytes-in-message: 10MB # 消息解析最大 bytes 长度,默认:10M
buffer-allocator: heap # 堆内存和堆外内存,默认:堆内存
keep-alive-secs: 60 # keep-alive 时间,单位:秒
clean-session: true # mqtt clean session,默认:true
ssl:
enabled: false # 是否开启 ssl 认证,2.1.0 开始支持双向认证
keystore-path: # 可选参数:ssl 双向认证 keystore 目录,支持 classpath:/ 路径。
keystore-pass: # 可选参数:ssl 双向认证 keystore 密码
truststore-path: # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
truststore-pass: # 可选参数:ssl 双向认证 truststore 密码
注意:ssl 存在三种情况
服务端开启ssl | 客户端 |
---|---|
ClientAuth 为 NONE(不需要客户端验证) | 仅仅需要开启 ssl 即可不用配置证书 |
ClientAuth 为 OPTIONAL(与客户端协商) | 需开启 ssl 并且配置 truststore 证书 |
ClientAuth 为 REQUIRE (必须的客户端验证) | 需开启 ssl 并且配置 truststore、 keystore证书 |
2.2 可实现接口(注册成 Spring Bean 即可)
接口 | 是否必须 | 说明 |
---|---|---|
IMqttClientConnectListener | 否 | 客户端连接成功监听 |
2.3 客户端上下线监听
使用 Spring event 解耦客户端上下线监听,注意: 1.3.4
开始支持。会跟自定义的 IMqttClientConnectListener
实现冲突,取一即可。
/**
* 示例:客户端连接状态监听
*
* @author L.cm
*/
@Service
public class MqttClientConnectListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);
@Autowired
private MqttClientCreator mqttClientCreator;
@EventListener
public void onConnected(MqttConnectedEvent event) {
logger.info("MqttConnectedEvent:{}", event);
}
@EventListener
public void onDisconnect(MqttDisconnectEvent event) {
// 离线时更新重连时的密码,适用于类似阿里云 mqtt clientId 连接带时间戳的方式
logger.info("MqttDisconnectEvent:{}", event);
// 在断线时更新 clientId、username、password
mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
.username("newUserName")
.password("newPassword");
}
}
2.4 自定义 java 配置(可选)
@Configuration(proxyBeanMethods = false)
public class MqttClientCustomizerConfiguration {
@Bean
public MqttClientCustomizer mqttClientCustomizer() {
return new MqttClientCustomizer() {
@Override
public void customize(MqttClientCreator creator) {
// 此处可自定义配置 creator,会覆盖 yml 中的配置
System.out.println("----------------MqttServerCustomizer-----------------");
}
};
}
}
2.5 订阅示例
@Service
public class MqttClientSubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);
@MqttClientSubscribe("/test/#")
public void subQos0(String topic, byte[] payload) {
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.AT_LEAST_ONCE)
public void subQos1(String topic, byte[] payload) {
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
public void thingSubRegister(String topic, byte[] payload) {
// 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
// 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
}
2.6 共享订阅 topic 说明
- 共享订阅:订阅前缀
$queue/
,多个客户端订阅了$queue/topic
,发布者发布到topic
,则只有一个客户端会接收到消息。 - 分组订阅:订阅前缀
$share/<group>/
,组客户端订阅了$share/group1/topic
、$share/group2/topic
..,发布者发布到topic
,则消息会发布到每个 group 中,但是每个 group 中只有一个客户端会接收到消息。
注意: 如果发布的 topic
以 /
开头,例如:/topic/test
,需要订阅 $share/group1//topic/test
,另外 mica-mqtt 默认随机消息路由,共享订阅的多个客户端会随机收到消息。
2.7 MqttClientTemplate 使用示例
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
* @author wsq
*/
@Service
public class MainService {
private static final Logger logger = LoggerFactory.getLogger(MainService.class);
@Autowired
private MqttClientTemplate client;
public boolean publish() {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
return true;
}
public boolean sub() {
client.subQos0("/test/#", (context, topic, message, payload) -> {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
return true;
}
}
服务端
一、添加依赖
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-server-spring-boot-starter</artifactId>
<version>${最新版本}</version>
</dependency>
二、mqtt 服务
2.1 配置项
mqtt:
server:
enabled: true # 是否开启服务端,默认:true
# ip: 0.0.0.0 # 服务端 ip 默认为空,0.0.0.0,建议不要设置
port: 1883 # 端口,默认:1883
name: Mica-Mqtt-Server # 名称,默认:Mica-Mqtt-Server
buffer-allocator: HEAP # 堆内存和堆外内存,默认:堆内存
heartbeat-timeout: 120000 # 心跳超时,单位毫秒,默认: 1000 * 120
read-buffer-size: 8KB # 接收数据的 buffer size,默认:8k
max-bytes-in-message: 10MB # 消息解析最大 bytes 长度,默认:10M
auth:
enable: false # 是否开启 mqtt 认证
username: mica # mqtt 认证用户名
password: mica # mqtt 认证密码
debug: true # 如果开启 prometheus 指标收集建议关闭
stat-enable: true # 开启指标收集,debug 和 prometheus 开启时需要打开,默认开启,关闭节省内存
web-port: 8083 # http、websocket 端口,默认:8083
websocket-enable: true # 是否开启 websocket,默认: true
http-enable: false # 是否开启 http api,默认: false
http-basic-auth:
enable: false # 是否开启 http basic auth,默认: false
username: mica # http basic auth 用户名
password: mica # http basic auth 密码
ssl: # mqtt tcp ssl 认证
enabled: false # 是否开启 ssl 认证,2.1.0 开始支持双向认证
keystore-path: # 必须参数:ssl keystore 目录,支持 classpath:/ 路径。
keystore-pass: # 必选参数:ssl keystore 密码
truststore-path: # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
truststore-pass: # 可选参数:ssl 双向认证 truststore 密码
client-auth: none # 是否需要客户端认证(双向认证),默认:NONE(不需要)
注意:ssl 存在三种情况
服务端开启ssl | 客户端 |
---|---|
ClientAuth 为 NONE(不需要客户端验证) | 仅仅需要开启 ssl 即可不用配置证书 |
ClientAuth 为 OPTIONAL(与客户端协商) | 需开启 ssl 并且配置 truststore 证书 |
ClientAuth 为 REQUIRE (必须的客户端验证) | 需开启 ssl 并且配置 truststore、 keystore证书 |
2.2 可实现接口(注册成 Spring Bean 即可)
接口 | 是否必须 | 说明 |
---|---|---|
IMqttServerUniqueIdService | 否 | 用于 clientId 不唯一时,自定义实现唯一标识,后续接口使用它替代 clientId |
IMqttServerAuthHandler | 是 | 用于服务端认证 |
IMqttServerSubscribeValidator | 否(建议实现) | 1.1.3 新增,用于对客户端订阅校验 |
IMqttServerPublishPermission | 否(建议实现) | 1.2.2 新增,用于对客户端发布权限校验 |
IMqttMessageListener | 否(1.3.x为否) | 消息监听 |
IMqttConnectStatusListener | 是 | 连接状态监听 |
IMqttSessionManager | 否 | session 管理 |
IMqttSessionListener | 否 | session 监听 |
IMqttMessageStore | 集群是,单机否 | 遗嘱和保留消息存储 |
AbstractMqttMessageDispatcher | 集群是,单机否 | 消息转发,(遗嘱、保留消息转发) |
IpStatListener | 否 | t-io ip 状态监听 |
IMqttMessageInterceptor | 否 | 消息拦截器,1.3.9 新增 |
2.3 IMqttMessageListener (用于监听客户端上传的消息) 使用示例
@Service
public class MqttServerMessageListener implements IMqttMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class);
@Override
public void onMessage(ChannelContext context, String clientId, Message message) {
logger.info("clientId:{} message:{} payload:{}", clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
}
}
2.4 自定义配置(可选)
@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
@Bean
public MqttServerCustomizer mqttServerCustomizer() {
return new MqttServerCustomizer() {
@Override
public void customize(MqttServerCreator creator) {
// 此处可自定义配置 creator,会覆盖 yml 中的配置
System.out.println("----------------MqttServerCustomizer-----------------");
}
};
}
}
2.5 MqttServerTemplate 使用示例
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
/**
* @author wsq
*/
@Service
public class ServerService {
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
server.publishAll("/test/123", body.getBytes(StandardCharsets.UTF_8));
return true;
}
}
2.6 客户端上下线监听
使用 Spring event 解耦客户端上下线监听,注意: 1.3.4
开始支持。会跟自定义的 IMqttConnectStatusListener
实现冲突,取一即可。
@Service
public class MqttConnectStatusListener {
private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);
@EventListener
public void online(MqttClientOnlineEvent event) {
logger.info("MqttClientOnlineEvent:{}", event);
}
@EventListener
public void offline(MqttClientOfflineEvent event) {
logger.info("MqttClientOfflineEvent:{}", event);
}
}
2.7 基于 mq 消息广播集群处理
详见: mica-mqtt-broker
2.8 Prometheus + Grafana 监控对接
<!-- 开启 prometheus 指标收集 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
支持得指标 | 说明 |
---|---|
mqtt_connections_accepted | 共接受过连接数 |
mqtt_connections_closed | 关闭过的连接数 |
mqtt_connections_size | 当前连接数 |
mqtt_messages_handled_packets | 已处理消息数 |
mqtt_messages_handled_bytes | 已处理消息字节数 |
mqtt_messages_received_packets | 已接收消息数 |
mqtt_messages_received_bytes | 已处理消息字节数 |
mqtt_messages_send_packets | 已发送消息数 |
mqtt_messages_send_bytes | 已发送消息字节数 |
非 Spring boot 项目
客户端
topic 通配符含义
/
:用来表示层次,比如 a/b,a/b/c。#
:表示匹配>=0
个层次,比如 a/# 就匹配 a/,a/b,a/b/c。单独的一个 # 表示匹配所有。不允许 a# 和 a/#/c。+
:表示匹配一个层次,例如 a/+ 匹配 a/b,a/c,不匹配 a/b/c。单独的一个 + 是允许的,a+ 不允许,也可以和多层通配符一起使用,+/tennis/# 、sport/+/player1 都有有效的。
使用说明
MQTT 遗嘱消息场景
- 当客户端断开连接时,发送给相关的订阅者的遗嘱消息。在设备 A 进行连接时候,遗嘱消息设定为
offline
,手机App B 订阅这个遗嘱主题。 - 当 A 异常断开时,手机App B 会收到这个
offline
的遗嘱消息,从而知道设备 A 离线了。
MQTT 保留消息场景
- 例如,某设备定期发布自身 GPS 坐标,但对于订阅者而言,从它发起订阅到第一次收到数据可能需要几秒钟,也可能需要十几分钟甚至更多,这样并不友好。因此 MQTT 引入了保留消息。
- 而每当有订阅者建立订阅时,服务端就会查找是否存在匹配该订阅的保留消息,如果保留消息存在,就会立即转发给订阅者。
- 借助保留消息,新的订阅者能够立即获取最近的状态。
共享订阅
mica-mqtt 支持两种共享订阅方式:
- 共享订阅:订阅前缀
$queue/
,多个客户端订阅了$queue/topic
,发布者发布到topic
,则只有一个客户端会接收到消息。 - 分组订阅:订阅前缀
$share/<group>/
,组客户端订阅了$share/group1/topic
、$share/group2/topic
..,发布者发布到topic
,则消息会发布到每个 group 中,但是每个 group 中只有一个客户端会接收到消息。
注意: 如果发布的 topic
以 /
开头,例如:/topic/test
,需要订阅 $share/group1//topic/test
,另外 mica-mqtt 默认随机消息路由,共享订阅的多个客户端会随机收到消息。
客户端使用
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1") // mqtt 服务端 ip 地址
.port(1883) // 默认:1883
.username("admin") // 账号
.password("123456") // 密码
.version(MqttVersion.MQTT_5) // 默认:3_1_1
.clientId("xxxxxx") // 非常重要务必手动设置,一般设备 sn 号,默认:MICA-MQTT- 前缀和 36进制的纳秒数
.bufferAllocator(ByteBufferAllocator.DIRECT) // 堆内存和堆外内存,默认:堆内存
.readBufferSize(512) // 消息一起解析的长度,默认:为 8092 (mqtt 消息最大长度)
.maxBytesInMessage(1024 * 10) // 最大包体长度,如果包体过大需要设置此参数,默认为: 10M (10*1024*1024)
.keepAliveSecs(120) // 默认:60s
.timeout(10) // 超时时间,t-io 配置,可为 null,为 null 时,t-io 默认为 5
.reconnect(true) // 是否重连,默认:true
.reInterval(5000) // 重连重试时间,reconnect 为 true 时有效,t-io 默认为:5000
.willMessage(builder -> {
builder.topic("/test/offline").messageText("down"); // 遗嘱消息
})
.connectListener(new IMqttClientConnectListener() {
@Override
public void onConnected(ChannelContext context, boolean isReconnect) {
logger.info("链接服务器成功...");
}
@Override
public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
logger.info("与链接服务器断开连接...");
}
})
.properties() // mqtt5 properties
.connectSync(); // 同步连接,也可以使用 connect(),可以避免 broker 没启动照成启动卡住。
// 消息订阅,同类方法 subxxx
client.subQos0("/test/#", (context, topic, message, payload) -> {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
// 取消订阅
client.unSubscribe("/test/#");
// 发送消息
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// 断开连接
client.disconnect();
// 重连
client.reconnect();
// 停止
client.stop();
服务端
// 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
MqttServer mqttServer = MqttServer.create()
// 服务端 ip 默认为空,0.0.0.0,建议不要设置
.ip("0.0.0.0")
// 默认:1883
.port(1883)
// 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
.readBufferSize(512)
// 最大包体长度,如果包体过大需要设置此参数,默认为: 8092
.maxBytesInMessage(1024 * 100)
// 自定义认证
.authHandler((clientId, userName, password) -> true)
// 消息监听
.messageListener((context, clientId, message) -> {
logger.info("clientId:{} message:{} payload:{}", clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
})
// 堆内存和堆外内存选择,默认:堆内存
.bufferAllocator(ByteBufferAllocator.HEAP)
// 心跳超时时间,默认:120s
.heartbeatTimeout(120_1000L)
// ssl 配置
.useSsl("", "", "")
// 自定义客户端上下线监听
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {
}
@Override
public void offline(String clientId) {
}
})
// 自定义消息转发,可用 mq 广播实现集群化处理
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {
}
@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // 开启 debug 信息日志
.start();
// 发送给某个客户端
mqttServer.publish("clientId","/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// 发送给所有在线监听这个 topic 的客户端
mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// 停止服务
mqttServer.stop();
回复