一、简介
Mica-MQTT是一个基于Java AIO实现的开源组件,旨在提供简单易用、低延迟、高性能的百万级MQTT客户端和物联网Broker服务。它更容易集成到现有服务中,降低自主开发物联网平台的成本。
二、使用场景
Mica-MQTT适用于多种场景,包括但不限于:
- 物联网云端MQTT Broker
- 边缘设备之间的消息通信
- 群组类即时通讯
- 消息推送
- 简单易用的MQTT客户端
三、优势
Mica-MQTT的主要优势包括:
- 简单易用,功能强大
- 易于二次开发和扩展
- 高性能,支持百万级连接
四、功能
Mica-MQTT提供了丰富的功能集,包括但不限于:
- 支持MQTT v3.1、v3.1.1和v5.0协议
- 支持WebSocket MQTT子协议(兼容mqtt.js)
- 支持HTTP REST API,详细文档请参见[1]
- 提供MQTT客户端库
- 提供MQTT服务端
- 支持MQTT客户端和服务端的共享订阅
- 支持MQTT遗嘱消息
- 支持MQTT保留消息
- 支持自定义消息处理和转发
- 提供阿里云MQTT客户端连接示例
- 支持GraalVM编译成本机可执行程序
- 快速集成到Spring Boot项目(使用mica-mqtt-spring-boot-starter)
- 支持与Prometheus和Grafana对接
- 基于Redis Pub/Sub实现集群,详细信息请参见mica-mqtt-broker模块[2]
五、依赖
Spring Boot项目
客户端依赖
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
<version>${mica-mqtt.version}</version>
</dependency>
客户端配置示例
mqtt:
client:
enabled: true # 是否开启客户端,默认:true
ip: 127.0.0.1 # 连接的服务端 IP,默认:127.0.0.1
port: 1883 # 端口,默认:1883
name: Mica-Mqtt-Client # 客户端名称,默认:Mica-Mqtt-Client
clientId: 000001 # 客户端ID(通常为设备SN,不可重复)
user-name: mica # 认证用户名
password: 123456 # 认证密码
timeout: 5 # 超时时间(秒),默认:5秒
reconnect: true # 是否自动重连,默认:true
re-interval: 5000 # 重连时间(毫秒),默认:5000毫秒
version: mqtt_3_1_1 # MQTT协议版本,可选MQTT_3_1、mqtt_3_1_1、mqtt_5,默认:mqtt_3_1_1
read-buffer-size: 8KB # 接收数据的缓冲区大小,默认:8KB
max-bytes-in-message: 10MB # 消息解析的最大字节数,默认:10MB
buffer-allocator: heap # 内存分配器(堆内存或堆外内存),默认:堆内存
keep-alive-secs: 60 # Keep-Alive时间(秒)
clean-session: true # MQTT Clean Session,默认:true
ssl:
enabled: false # 是否开启SSL认证,2.1.0版本开始支持双向认证
keystore-path: # 可选参数:SSL双向认证的密钥库路径,支持classpath:/路径。
keystore-pass: # 可选参数:SSL双向认证的密钥库密码
truststore-path: # 可选参数:SSL双向认证的信任库路径,支持classpath:/路径。
truststore-pass: # 可选参数:SSL双向认证的信任库密码
客户端监听示例
@Service
public class MqttClientConnectListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);
@Autowired
private MqttClientCreator mqttClientCreator;
@EventListener
public void onConnected(MqttConnectedEvent event) {
logger.info("MqttConnectedEvent: {}", event);
}
@EventListener
public void onDisconnect(MqttDisconnectEvent event) {
// 在客户端离线时更新重连时的客户端ID、用户名和密码
logger.info("MqttDisconnectEvent: {}", event);
mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
.username("newUserName")
.password("newPassword");
}
}
自定义客户端配置示例(可选)
@Configuration(proxyBeanMethods = false)
public class MqttClientCustomizerConfiguration {
@Bean
public MqttClientCustomizer mqttClientCustomizer() {
return new MqttClientCustomizer() {
@Override
public void customize(MqttClientCreator creator) {
// 在此处可以自定义配置,会覆盖YAML配置
System.out.println("----------------MqttServerCustomizer-----------------");
}
};
}
}
客户端订阅示例
@Service
public class MqttClientSubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);
@MqttClientSubscribe("/test/#")
public void subQos0(String topic, byte[] payload) {
logger.info("topic: {} payload: {}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.AT_LEAST_ONCE)
public void subQos1(String topic, byte[] payload) {
logger.info("topic: {} payload: {}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
public void thingSubRegister(String topic, byte[] payload) {
// 1.3.8版本开始支持,@MqttClientSubscribe注解支持${}变量替换,默认替换为+
// 注意:Mica-MQTT会先从Spring Boot配置中替换参数${},如果存在配置将优先被替换。
logger.info("topic: {} payload: {}", topic, new String(payload, StandardCharsets.UTF_8));
}
}
共享订阅Topic说明
Mica-MQTT客户端支持两种共享订阅方式:
- 共享订阅:订阅前缀使用
$queue/
,多个客户端订阅了$queue/topic
,当有消息发布到topic
时,只有一个客户端会接收到消息。 - 分组订阅:订阅前缀使用
$share/<group>/
,组内的客户端订阅了$share/group1/topic
、$share/group2/topic
等,当有消息发布到topic
时,每个组内只有一个客户端会接收到消息。
MqttClientTemplate使用示例
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@Service
public class MainService {
private static final Logger logger = LoggerFactory.getLogger(MainService.class);
@Autowired
private MqttClientTemplate client;
public boolean publish() {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
return true;
}
public boolean sub() {
client.subQos0("/test/#", (context, topic, message, payload) -> {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
return true;
}
}
2. 服务端依赖
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-server</artifactId>
<version>${mica-mqtt.version}</version>
</dependency>
2.1 服务端使用
// 注意:为了能够接受更多连接(降低内存占用),请添加JVM参数 -Xss129k
MqttServer mqttServer = MqttServer.create()
.ip("0.0.0.0") // 服务端IP,默认为空(0.0.0.0),建议不要设置
.port(1883) // 端口,默认:1883
.readBufferSize(512) // 读缓冲区大小,默认:512字节
.maxBytesInMessage(1024 * 100) // 最大包体长度,默认:8092
.authHandler((clientId, userName, password) -> true) // 自定义认证
.messageListener((context, clientId, message) -> {
logger.info("clientId: {} message: {} payload: {}", clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
}) // 消息监听
.bufferAllocator(ByteBufferAllocator.HEAP) // 内存分配器,默认:堆内存
.heartbeatTimeout(120_1000L) // 心跳超时时间,默认:120秒
.useSsl("", "", "") // SSL配置
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {
// 客户端上线时的处理逻辑
}
@Override
public void offline(String clientId) {
// 客户端离线时的处理逻辑
}
}) // 自定义客户端上下线监听
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {
// 配置消息分发
}
@Override
public boolean send(Message message) {
// 发送消息
return false;
}
@Override
public boolean send(String clientId, Message message) {
// 发送消息给指定客户端
return false;
}
}) // 自定义消息转发,可使用MQ广播实现集群处理
.debug() // 开启调试信息日志
.start();
// 发送消息给指定客户端
mqttServer.publish("clientId", "/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// 发送消息给所有在线监听这个Topic的客户端
mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// 停止服务
mqttServer.stop();
六、默认端口
以下是Mica-MQTT的默认端口:
- MQTT TCP端口:1883
- HTTP和WebSocket端口:8083
这些端口用于不同的通信协议和服务。
请注意,上述示例代码可能需要根据你的具体需求进行调整和扩展。如果需要更多信息或帮助,请随时提问。