MQTT(Message Queuing Telemetry Transport)是一种轻量级物联网消息传输协议,它基于发布/订阅模式,适用于硬件受限、网络带宽有限、延迟较高的环境。在本文中,我们将详细介绍如何在Java项目中使用MQTT,实现连接到MQTT服务器、订阅主题和发布消息的功能。
步骤1:引入客户端库
首先,我们需要添加Eclipse Paho Java Client库的依赖项到项目的pom.xml
文件中:
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
步骤2:创建MQTT连接
我们将使用免费公共MQTT服务器提供的MQTT服务器,以下是服务器接入信息:
普通TCP连接
设置MQTT Broker的基本连接参数,用户名和密码为非必选参数:
String broker = "tcp://iot.mqtt.cn:1883";
String username = "emqx";
String password = "public";
String clientid = "publish_client";
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);
TLS/SSL连接
如果需要使用自签名证书进行TLS/SSL连接,需要添加bcpkix-jdk15on
依赖项到pom.xml
文件,并创建一个SSLUtils
工具类。
<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.70</version>
</dependency>
然后,根据TLS/SSL连接设置选项:
// 设置 SSL/TLS 连接地址
String broker = "ssl://broker.emqx.io:8883";
// 设置 socket factory
String caFilePath = "/cacert.pem";
String clientCrtFilePath = "/client.pem";
String clientKeyFilePath = "/client.key";
SSLSocketFactory socketFactory = SSLUtils.getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "");
options.setSocketFactory(socketFactory);
步骤3:发布MQTT消息
创建一个发布客户端类PublishSample
,该类将发布一条"Hello MQTT"消息到主题mqtt/test
:
public class PublishSample {
public static void main(String[] args) {
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";
String clientid = "publish_client";
String content = "Hello MQTT";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
client.connect(options);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");
System.out.println("topic: " + topic);
System.out.println("message content: " + content);
client.disconnect();
client.close();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
步骤4:订阅MQTT主题
创建一个订阅客户端类SubscribeSample
,该类将订阅主题mqtt/test
:
public class SubscribeSample {
public static void main(String[] args) {
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";
String clientid = "subscribe_client";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message from topic: " + topic);
System.out.println("QoS: " + message.getQos());
System.out.println("Message content: " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete: " + token.isComplete());
}
});
client.connect(options);
client.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}
步骤5:测试
现在,您可以运行SubscribeSample
订阅mqtt/test
主题。然后运行PublishSample
发布消息到mqtt/test
主题。您将看到发布端成功发布消息,同时订阅端接收到消息。
总结
通过这篇文章,我们详细学习了如何在Java中使用Paho Java Client来连接到公共MQTT服务器,并实现了测试客户端与MQTT服务器的连接、消息发布和订阅。希望这篇文章对您在Java中使用MQTT有所帮助。