springboot mqtt(手把手讲解)
💡一则或许对你有用的小广告
欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论
- 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于
Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...
,点击查看项目介绍 ;演示链接: http://116.62.199.48:7070 ;- 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;
截止目前, 星球 内专栏累计输出 90w+ 字,讲解图 3441+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 3100+ 小伙伴加入学习 ,欢迎点击围观
在物联网(IoT)和实时通信领域,消息队列技术正扮演着越来越重要的角色。Spring Boot 作为 Java 生态系统中广受欢迎的框架,因其高效、灵活的特性,成为构建企业级应用的首选。而 MQTT(Message Queuing Telemetry Transport) 协议凭借轻量级、低带宽、高可靠的特点,成为物联网设备通信的黄金标准。本文将结合两者,从零开始讲解如何使用 Spring Boot MQTT 构建消息通信系统,并通过实际案例帮助读者理解其核心原理与应用场景。
一、MQTT 协议:物联网通信的“邮局系统”
1.1 MQTT 协议的核心概念
MQTT 是一种基于发布-订阅模式的轻量级消息传输协议,专为低带宽、高延迟或不可靠的网络环境设计。其核心概念包括:
- Broker(消息代理):类似“邮局”,负责接收、存储、转发消息。
- Publisher(发布者):向 Broker 发送消息的客户端。
- Subscriber(订阅者):从 Broker 接收消息的客户端。
- Topic(主题):消息的路由标识符,例如
"sensor/temperature"
。
比喻:想象一个邮局系统,Broker 是邮局,Publisher 是寄信人,Subscriber 是收信人,而 Topic 是信件上的地址。寄信人将信件(消息)投递到邮局,邮局根据地址将信件分发给对应的收信人。
1.2 MQTT 的优势与适用场景
- 低开销:消息头仅 2 字节,适合资源受限的设备(如传感器、嵌入式设备)。
- 可靠性:支持 QoS(Quality of Service)级别(0、1、2),确保消息不丢失或重复。
- 实时性:Broker 可立即转发消息,适用于监控、报警等场景。
适用场景:
- 智能家居设备的远程控制
- 工业物联网(IIoT)数据采集
- 移动端实时推送(如订单状态更新)
二、Spring Boot 集成 MQTT 的核心步骤
2.1 项目依赖配置
在 Spring Boot 3.x 中,需通过以下依赖集成 MQTT 客户端:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.2 配置 MQTT Broker
在 application.yml
中配置 Broker 地址、端口、客户端 ID 等参数:
spring:
mqtt:
host: tcp://localhost:1883
username: admin
password: admin
client-id: spring-mqtt-client
2.3 创建配置类
通过 @Configuration
注解定义 MQTT 连接工厂和消息通道:
@Configuration
public class MqttConfig {
@Value("${spring.mqtt.host}")
private String host;
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.client-id}")
private String clientId;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{host});
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
}
三、实现消息发布与订阅
3.1 发布消息(Publisher)
通过 MqttTemplate
发送消息到指定 Topic:
@Service
public class MqttService {
@Autowired
private MqttTemplate mqttTemplate;
public void publishMessage(String topic, String payload) {
mqttTemplate.convertAndSend(topic, payload);
System.out.println("Published message to topic: " + topic);
}
}
3.2 订阅消息(Subscriber)
使用 @MqttMessageHandler
注解监听指定 Topic 的消息:
@Component
public class MqttSubscriber {
@MqttMessageHandler
@Payload
public void handleMessage(String payload, @Header(MqttHeaders.RECEIVED_TOPIC) String topic) {
System.out.println("Received message from topic " + topic + ": " + payload);
}
}
3.3 完整案例:温度传感器监控系统
假设我们需要构建一个实时监控传感器数据的系统,流程如下:
- 传感器设备(Publisher)每秒发送温度数据到 Topic
"sensor/temperature"
。 - Spring Boot 应用(Subscriber)订阅该 Topic,并将数据存储到数据库。
代码实现:
// 发布端(模拟传感器)
public class SensorSimulator {
@Autowired
private MqttService mqttService;
public void simulateTemperature() {
while (true) {
double temperature = 25 + Math.random() * 10;
mqttService.publishMessage("sensor/temperature", String.valueOf(temperature));
Thread.sleep(1000);
}
}
}
// 订阅端(数据存储)
@Component
public class DataRecorder {
@MqttMessageHandler
public void recordTemperature(@Payload String payload, @Header("topic") String topic) {
if (topic.equals("sensor/temperature")) {
double temp = Double.parseDouble(payload);
// 将温度保存到数据库
saveToDatabase(temp);
}
}
}
四、高级功能与优化
4.1 QoS 级别配置
通过 MqttConnectOptions
设置消息服务质量:
options.setQos(2); // 可靠交付,确保消息至少被接收一次
4.2 消息持久化与保留
- 持久化订阅:Broker 保存订阅关系,即使客户端断开也能继续接收消息。
- 保留消息:Broker 保存 Topic 的最后一条消息,新订阅者可立即获取。
mqttTemplate.convertAndSend("sensor/temperature", payload,
msg -> {
msg.setRetained(true); // 启用保留消息
return msg;
});
4.3 异常处理与重连机制
通过 MqttCallback
监听连接状态,实现自动重连逻辑:
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId, mqttClientFactory(), "sensor/temperature");
adapter.setCompletionTimeout(5000); // 连接超时时间
adapter.setQos(1);
adapter.addCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost. Trying to reconnect...");
}
});
return adapter;
}
五、常见问题与解决方案
5.1 消息丢失问题
原因:Broker 或客户端未正确处理 QoS 2 级别的确认机制。
解决:确保 Broker 支持持久化存储(如使用 Mosquitto 的 -p
参数指定数据目录)。
5.2 性能优化建议
- 批量发送:合并多条消息为单次发送,减少网络开销。
- Topic 分级设计:使用层级 Topic(如
"device/+/temperature"
)实现灵活订阅。
结论
通过本文的讲解,读者已掌握了 Spring Boot MQTT 的基础配置、消息通信实现以及高级功能扩展。MQTT 协议与 Spring Boot 的结合,为构建高效、可靠的物联网系统提供了强大支持。无论是智能家居、工业监控,还是实时数据分析场景,开发者均可借助这一技术栈快速实现需求。
下一步建议:尝试将本教程的案例部署到云环境(如 AWS IoT Core),或结合 Spring Data JPA 实现消息持久化存储,进一步探索其实际应用潜力。
(全文约 1800 字)