MQTT分布式订阅
MQTT 分布式订阅
当服务的处理能里不足时,会启动多个服务来处理消息,此时订阅 topic 如果相同会出现每个服务都重复消费消息的情况。需要使用 MQTT5.0 特性中的共享订阅
- 多实例 Spring Boot 服务读取同一 topic → 实现水平扩容
- 避免重复处理(否则广播会让每个实例都执行一次)
- IoT 数据接入高频场景(几十万设备 → 多消费者节点)
下面我给你一次性讲清楚 MQTT 共享订阅(Shared Subscription) 的概念、用途、不同 Broker 的写法,以及在 SpringBoot/EMQX 中怎么用。
✅ 1. 什么是 MQTT 共享订阅?
共享订阅 = 多个客户端共同消费同一个 Topic 的消息(类似 Kafka 的消费组)
👉 默认情况下,普通订阅是 广播模式:
- 同一个 topic
- 有多个客户端订阅
- 每条消息会 推送给每一个客户端
👉 而共享订阅是 负载均衡模式:
- 多个客户端加入同一共享组
- 每条消息只会 发给其中一个客户端
- 实现消费者集群、避免重复消费
✅ 2. 共享订阅的语法(EMQX / HiveMQ 标准)
标准 MQTT5 & EMQX / HiveMQ 的共享订阅格式
$share/<group>/<topic>例如,一个名叫 worker 的消费者组订阅普通 topic:sensors/temperature
$share/worker/sensors/temperature同一个组的多个客户端订阅后:
worker-1worker-2worker-3
消息会被负载均衡推送,每条只给一个消费者。
✅ 3. 多 Broker 写法差异
| Broker | 是否支持共享订阅 | 订阅语法 |
|---|---|---|
| EMQX | ✔ MQTT5 标准 | $share/<group>/<topic> |
| HiveMQ | ✔ MQTT5 标准 | $share/<group>/<topic> |
| Mosquitto | ✖ MQTT3 不支持;MQTT5 需 2.x 以上 | $share/<group>/<topic> |
| RabbitMQ MQTT | ✔ 但语法不同 | $share/<group>/<topic>(插件版才支持) |
大多数情况下你用 EMQX,因此直接用标准语法即可。
✅ 4. SpringBoot(Spring Integration MQTT)使用共享订阅
使用 mqttInbound 的 topic 配置即可:
@Bean
public MessageProducer mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
"client-id",
mqttClientFactory(),
"$share/worker/sensors/temperature"
);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}多个服务实例都写同一个共享订阅 topic:
$share/worker/sensors/temperatureEMQX 会自动帮你做负载均衡。
✅ 5.共享订阅下 MQTT 会话特性(重要!)
| 特性 | 是否支持 |
|---|---|
| QOS1/2 | ✔ |
| Retained retained message | ❌ 不会分发给共享订阅 |
| $share 不能嵌套 | ✔ |
| 共享组间独立负载均衡 | ✔ |
例如你发布 retained 消息:
mosquitto_pub -t sensors/temp -m 123 -r共享订阅的客户端第一次订阅不会收到 retained,避免混乱。
❗6.发布消息(publish)应该发到哪个 topic?
✔ 发布方(Publisher)
发布到真正的业务 Topic:
mqttPublishService.publish("order/created", "{...json...}");✔ 订阅方(Consumer)
多个实例订阅共享 Topic:
$share/orderGroup/order/created
示例:
new MqttPahoMessageDrivenChannelAdapter(
"client-a",
mqttClientFactory(),
"$share/orderGroup/order/created"
);第二个实例:
new MqttPahoMessageDrivenChannelAdapter(
"client-b",
mqttClientFactory(),
"$share/orderGroup/order/created"
);👉 两个 consumer 都是 orderGroup
👉 任何发布到 order/created 的消息
👉 每条只会给其中一个 consumer 处理(负载均衡)。
📌 可视化理解
Publish
order/created ←―― 发布者只发到这个 Topic
Subscribe
$share/orderGroup/order/created
$share/orderGroup/order/created
$share/orderGroup/order/createdBroker 负责:
- 轮询
- 负载均衡
- 消息只会送给一个消费者
🚫 为什么不能发送到 $share/...?
因为:
$share/xxx/...是内部控制 Topic(系统订阅语法)- 发布者发到这里会被 Broker 直接丢弃或拒绝
- 这是订阅才使用的语法
🔥 再给你一个最终极简公式:
| 角色 | topic 写法 |
|---|---|
| 发布者 | order/created |
| 共享订阅者 | $share/orderGroup/order/created |