Skip to content

Commit 7fc4279

Browse files
committed
refactor(Break Change): 重构共享主题实现,使之符合 mqtt5 协议规范.
1. 重构共享主题实现,使之符合 mqtt5 协议规范 2. springboot 2.6.9 -> 2.7.10 3. readme.md 更新 4. ClientSub 增加了 shareName 字段,导致 kryo 反序列化之前的数据失败.
1 parent d82315f commit 7fc4279

File tree

12 files changed

+169
-94
lines changed

12 files changed

+169
-94
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
<parent>
66
<groupId>org.springframework.boot</groupId>
77
<artifactId>spring-boot-starter-parent</artifactId>
8-
<version>2.6.9</version>
8+
<version>2.7.10</version>
99
<relativePath/> <!-- lookup parent from repository -->
1010
</parent>
1111
<groupId>com.jun</groupId>
1212
<artifactId>mqttx</artifactId>
13-
<version>1.2.1</version>
13+
<version>1.2.2</version>
1414
<name>mqttx</name>
1515
<description>mqtt broker</description>
1616
<url>https://github.com/Amazingwujun/mqttx</url>

readme.md

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -254,15 +254,14 @@ services:
254254

255255
#### 4.6 共享主题支持
256256

257-
共享订阅是 `mqtt5` 协议规定的内容,很多 mq(例如 `kafka`) 都有实现
257+
共享订阅是协议 `mqtt5` 规定的内容,**`MQTTX`** 参考协议标准实现
258258

259-
1. `mqttx.share-topic.enable`: 功能开关,默认 `true`
259+
1. 格式: `$share/{ShareName}/{filter}`, `$share` 为前缀, `ShareName` 为共享订阅名, `filter` 就是非共享订阅主题过滤器。
260260

261-
2. 格式: `$share/{ShareName}/{filter}`, `$share` 为前缀, `ShareName` 为共享订阅名, `filter` 就是非共享订阅主题过滤器。
262-
263-
3. 目前支持 `hash`, `random`, `round` 三种规则
264-
265-
> `hash` 选出的 **client** 会随着**订阅客户端数量**及**发送消息客户端 `clientId`** 变化而变化
261+
2. 支持如下两种消息分发规则
262+
1. `round`: 轮询
263+
2. `random`: 随机
264+
3. 详细内容请参考协议 [MQTT Version 5.0 (oasis-open.org)](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250)
266265

267266
下图展示了共享主题与常规主题之间的差异:
268267

@@ -569,8 +568,7 @@ Content-Length: 91
569568
| `mqttx.websocket.enable` | `false` | websocket 开关 |
570569
| `mqttx.websocket.port` | `8083` | websocket 监听端口 |
571570
| `mqttx.websocket.path` | `/mqtt` | websocket path |
572-
| `mqttx.share-topic.enable` | `true` | 共享主题功能开关 |
573-
| `mqttx.share-topic.share-sub-strategy` | `round` | 负载均衡策略, 目前支持随机、轮询、哈希 |
571+
| `mqttx.share-topic.share-sub-strategy` | `round` | 负载均衡策略, 目前支持随机、轮询 |
574572
| `mqttx.sys-topic.enable` | `false` | 系统主题功能开关 |
575573
| `mqttx.sys-topic.interval` | `60s` | 定时发布间隔 |
576574
| `mqttx.message-bridge.enable` | `false` | 消息桥接功能开关 |

src/main/java/com/jun/mqttx/broker/handler/PublishHandler.java

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import com.jun.mqttx.utils.JsonSerializer;
3232
import com.jun.mqttx.utils.RateLimiter;
3333
import com.jun.mqttx.utils.Serializer;
34-
import com.jun.mqttx.utils.TopicUtils;
3534
import io.netty.buffer.Unpooled;
3635
import io.netty.channel.ChannelHandlerContext;
3736
import io.netty.handler.codec.mqtt.*;
@@ -42,7 +41,9 @@
4241
import org.springframework.util.ObjectUtils;
4342
import org.springframework.util.StringUtils;
4443
import reactor.core.publisher.Flux;
44+
import reactor.core.publisher.GroupedFlux;
4545
import reactor.core.publisher.Mono;
46+
import reactor.core.scheduler.Schedulers;
4647

4748
import java.time.Instant;
4849
import java.util.*;
@@ -69,7 +70,7 @@ public class PublishHandler extends AbstractMqttTopicSecureHandler implements Wa
6970
private final IPublishMessageService publishMessageService;
7071
private final IPubRelMessageService pubRelMessageService;
7172
private final String brokerId;
72-
private final boolean enableTopicSubPubSecure, enableShareTopic, enableRateLimiter, ignoreClientSelfPub;
73+
private final boolean enableTopicSubPubSecure, enableRateLimiter, ignoreClientSelfPub;
7374
/** 共享主题轮询策略 */
7475
private final ShareStrategy shareStrategy;
7576
/** 消息桥接开关 */
@@ -86,20 +87,25 @@ public class PublishHandler extends AbstractMqttTopicSecureHandler implements Wa
8687

8788
//@formatter:on
8889

89-
public PublishHandler(IPublishMessageService publishMessageService, IRetainMessageService retainMessageService,
90-
ISubscriptionService subscriptionService, IPubRelMessageService pubRelMessageService, ISessionService sessionService,
91-
@Nullable IInternalMessagePublishService internalMessagePublishService, MqttxConfig config,
92-
@Nullable KafkaTemplate<String, byte[]> kafkaTemplate, Serializer serializer) {
90+
public PublishHandler(IPublishMessageService publishMessageService,
91+
IRetainMessageService retainMessageService,
92+
ISubscriptionService subscriptionService,
93+
IPubRelMessageService pubRelMessageService,
94+
ISessionService sessionService,
95+
@Nullable IInternalMessagePublishService internalMessagePublishService,
96+
MqttxConfig config,
97+
@Nullable KafkaTemplate<String, byte[]> kafkaTemplate,
98+
Serializer serializer) {
9399
super(config.getCluster().getEnable());
94100
Assert.notNull(publishMessageService, "publishMessageService can't be null");
95101
Assert.notNull(retainMessageService, "retainMessageService can't be null");
96102
Assert.notNull(subscriptionService, "publishMessageService can't be null");
97103
Assert.notNull(pubRelMessageService, "publishMessageService can't be null");
98104
Assert.notNull(config, "mqttxConfig can't be null");
99105

100-
MqttxConfig.ShareTopic shareTopic = config.getShareTopic();
101-
MqttxConfig.MessageBridge messageBridge = config.getMessageBridge();
102-
MqttxConfig.RateLimiter rateLimiter = config.getRateLimiter();
106+
var shareTopic = config.getShareTopic();
107+
var messageBridge = config.getMessageBridge();
108+
var rateLimiter = config.getRateLimiter();
103109
this.sessionService = sessionService;
104110
this.serializer = serializer;
105111
this.publishMessageService = publishMessageService;
@@ -109,7 +115,6 @@ public PublishHandler(IPublishMessageService publishMessageService, IRetainMessa
109115
this.brokerId = config.getBrokerId();
110116
this.enableTopicSubPubSecure = config.getEnableTopicSubPubSecure();
111117
this.ignoreClientSelfPub = config.getIgnoreClientSelfPub();
112-
this.enableShareTopic = shareTopic.getEnable();
113118
if (!CollectionUtils.isEmpty(rateLimiter.getTopicRateLimits()) && rateLimiter.getEnable()) {
114119
enableRateLimiter = true;
115120
rateLimiter.getTopicRateLimits()
@@ -196,13 +201,16 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
196201

197202
// 响应
198203
switch (qos) {
199-
case AT_MOST_ONCE -> publish(pubMsg, ctx, false).doOnSuccess(unused -> {
200-
if (retain) {
201-
handleRetainMsg(pubMsg).subscribe();
202-
}
203-
}).subscribe();
204+
case AT_MOST_ONCE -> publish(pubMsg, ctx, false)
205+
.publishOn(Schedulers.boundedElastic())
206+
.doOnSuccess(unused -> {
207+
if (retain) {
208+
handleRetainMsg(pubMsg).subscribe();
209+
}
210+
}).subscribe();
204211
case AT_LEAST_ONCE -> {
205212
publish(pubMsg, ctx, false)
213+
.publishOn(Schedulers.boundedElastic())
206214
.doOnSuccess(unused -> {
207215
MqttMessage pubAck = MqttMessageFactory.newMessage(
208216
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
@@ -223,6 +231,7 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
223231
Session session = getSession(ctx);
224232
if (!session.isDupMsg(packetId)) {
225233
publish(pubMsg, ctx, false)
234+
.publishOn(Schedulers.boundedElastic())
226235
.doOnSuccess(unused -> {
227236
// 保存 pub
228237
session.savePubRelInMsg(packetId);
@@ -261,9 +270,11 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
261270
return Mono.empty();
262271
} else {
263272
return publish(pubMsg, ctx, false)
273+
.publishOn(Schedulers.boundedElastic())
264274
.doOnSuccess(unused -> pubRelMessageService.saveIn(clientId(ctx), packetId).subscribe());
265275
}
266276
})
277+
.publishOn(Schedulers.boundedElastic())
267278
.doOnSuccess(unused -> {
268279
var pubRec = MqttMessageFactory.newMessage(
269280
new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0),
@@ -313,7 +324,7 @@ public Mono<Void> publish(final PubMsg pubMsg, ChannelHandlerContext ctx, boolea
313324
}
314325

315326
// 获取 topic 订阅者 id 列表
316-
String topic = pubMsg.getTopic();
327+
final var topic = pubMsg.getTopic();
317328
Flux<ClientSub> clientSubFlux = subscriptionService.searchSubscribeClientList(topic)
318329
.filter(clientSub -> {
319330
if (ignoreClientSelfPub) {
@@ -325,27 +336,28 @@ public Mono<Void> publish(final PubMsg pubMsg, ChannelHandlerContext ctx, boolea
325336
});
326337

327338
// 共享订阅
328-
if (enableShareTopic && TopicUtils.isShare(topic)) {
329-
return clientSubFlux.collectList()
330-
.map(e -> chooseClient(e, topic))
331-
.flatMap(clientSub -> {
332-
pubMsg.setAppointedClientId(clientSub.getClientId());
333-
return publish0(clientSub, pubMsg, isClusterMessage).doOnSuccess(unused -> {
334-
// 满足如下条件,则发送消息给集群
335-
// 1 集群模式开启
336-
// 2 订阅的客户端连接在其它实例上
337-
if (isClusterMode() && !ConnectHandler.CLIENT_MAP.containsKey(clientSub.getClientId())) {
338-
internalMessagePublish(pubMsg);
339-
}
340-
});
341-
})
342-
.then();
343-
}
339+
var f1 = clientSubFlux.filter(ClientSub::isShareSub)
340+
.groupBy(ClientSub::getShareName)
341+
.flatMap(GroupedFlux::collectList)
342+
.map(t -> chooseClient(t, topic))
343+
.flatMap(clientSub -> {
344+
pubMsg.setAppointedClientId(clientSub.getClientId());
345+
return publish0(clientSub, pubMsg, isClusterMessage).doOnSuccess(unused -> {
346+
// 满足如下条件,则发送消息给集群
347+
// 1 集群模式开启
348+
// 2 订阅的客户端连接在其它实例上
349+
if (isClusterMode() && !ConnectHandler.CLIENT_MAP.containsKey(clientSub.getClientId())) {
350+
internalMessagePublish(pubMsg);
351+
}
352+
});
353+
});
344354

345-
return clientSubFlux
355+
// 普通订阅
356+
var f2 = clientSubFlux
357+
.filter(ClientSub::notShareSub)
346358
.collectList()
347359
.doOnSuccess(lst -> {
348-
// 将消息推送给集群中的broker
360+
// 将消息推送给集群中的 broker
349361
if (isClusterMode() && !isClusterMessage) {
350362
// 判断是否需要进行集群消息分发
351363
boolean flag = false;
@@ -361,8 +373,9 @@ public Mono<Void> publish(final PubMsg pubMsg, ChannelHandlerContext ctx, boolea
361373
}
362374
})
363375
.flatMapIterable(Function.identity())
364-
.flatMap(clientSub -> publish0(clientSub, pubMsg, isClusterMessage))
365-
.then();
376+
.flatMap(clientSub -> publish0(clientSub, pubMsg, isClusterMessage));
377+
378+
return Mono.when(f1, f2);
366379
}
367380

368381
/**
@@ -544,7 +557,6 @@ public boolean support(String channel) {
544557
* 共享订阅选择客户端, 支持的策略如下:
545558
* <ol>
546559
* <li>随机: {@link ShareStrategy#random}</li>
547-
* <li>哈希: {@link ShareStrategy#hash}</li>
548560
* <li>轮询: {@link ShareStrategy#round}</li>
549561
* </ol>
550562
*
@@ -554,15 +566,14 @@ public boolean support(String channel) {
554566
private ClientSub chooseClient(List<ClientSub> clientSubList, String topic) {
555567
// 集合排序
556568
clientSubList.sort(ClientSub::compareTo);
569+
final var size = clientSubList.size();
557570

558-
if (hash == shareStrategy) {
559-
return clientSubList.get(topic.hashCode() % clientSubList.size());
560-
} else if (random == shareStrategy) {
561-
int key = ThreadLocalRandom.current().nextInt(0, clientSubList.size());
562-
return clientSubList.get(key % clientSubList.size());
571+
if (random == shareStrategy) {
572+
int key = ThreadLocalRandom.current().nextInt(0, size);
573+
return clientSubList.get(key % size);
563574
} else if (round == shareStrategy) {
564575
int i = roundMap.computeIfAbsent(topic, s -> new AtomicInteger(0)).getAndIncrement();
565-
return clientSubList.get(i % clientSubList.size());
576+
return clientSubList.get(i % size);
566577
}
567578

568579
throw new IllegalArgumentException("不可能到达的代码, strategy:" + shareStrategy);

src/main/java/com/jun/mqttx/broker/handler/SubscribeHandler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.jun.mqttx.config.MqttxConfig;
2121
import com.jun.mqttx.entity.BrokerStatus;
2222
import com.jun.mqttx.entity.ClientSub;
23+
import com.jun.mqttx.entity.ShareTopic;
2324
import com.jun.mqttx.service.IRetainMessageService;
2425
import com.jun.mqttx.service.ISubscriptionService;
2526
import com.jun.mqttx.utils.TopicUtils;
@@ -100,8 +101,15 @@ public void process(final ChannelHandlerContext ctx, MqttMessage msg) {
100101
List<Integer> grantedQosLevels = new ArrayList<>(mqttTopicSubscriptions.size());
101102
var needSave = new ArrayList<ClientSub>();
102103
mqttTopicSubscriptions.forEach(mqttTopicSubscription -> {
103-
final String topic = mqttTopicSubscription.topicName();
104+
String topic = mqttTopicSubscription.topicName();
104105
int qos = mqttTopicSubscription.qualityOfService().value();
106+
final var isShareTopic = TopicUtils.isShare(topic);
107+
String shareName = null;
108+
if (isShareTopic) {
109+
ShareTopic shareTopic = TopicUtils.parseFrom(topic);
110+
topic = shareTopic.filter();
111+
shareName = shareTopic.name();
112+
}
105113

106114
if (!TopicUtils.isValid(topic)) {
107115
// Failure
@@ -120,7 +128,7 @@ public void process(final ChannelHandlerContext ctx, MqttMessage msg) {
120128
if (TopicUtils.isSys(topic)) {
121129
qos = 0x80;
122130
} else {
123-
ClientSub clientSub = ClientSub.of(clientId, qos, topic, isCleanSession(ctx));
131+
ClientSub clientSub = ClientSub.of(clientId, qos, topic, isCleanSession(ctx), shareName);
124132
needSave.add(clientSub);
125133
}
126134
}

src/main/java/com/jun/mqttx/broker/handler/UnsubscribeHandler.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,63 +17,56 @@
1717
package com.jun.mqttx.broker.handler;
1818

1919
import com.jun.mqttx.config.MqttxConfig;
20+
import com.jun.mqttx.entity.ShareTopic;
21+
import com.jun.mqttx.entity.Tuple2;
2022
import com.jun.mqttx.service.ISubscriptionService;
2123
import com.jun.mqttx.utils.TopicUtils;
2224
import io.netty.channel.ChannelHandlerContext;
2325
import io.netty.handler.codec.mqtt.*;
26+
import lombok.extern.slf4j.Slf4j;
27+
import org.springframework.util.ObjectUtils;
28+
import reactor.core.publisher.Flux;
2429
import reactor.core.publisher.Mono;
2530

2631
import java.util.ArrayList;
2732
import java.util.List;
28-
import java.util.stream.Collectors;
2933

3034
/**
3135
* {@link MqttMessageType#UNSUBSCRIBE} 消息处理器
3236
*
3337
* @author Jun
3438
* @since 1.0.4
3539
*/
40+
@Slf4j
3641
@Handler(type = MqttMessageType.UNSUBSCRIBE)
3742
public class UnsubscribeHandler extends AbstractMqttSessionHandler {
3843

39-
private final Boolean enableSysTopic;
4044
private final ISubscriptionService subscriptionService;
4145

4246
public UnsubscribeHandler(MqttxConfig config, ISubscriptionService subscriptionService) {
4347
super(config.getCluster().getEnable());
44-
this.enableSysTopic = config.getSysTopic().getEnable();
4548
this.subscriptionService = subscriptionService;
4649
}
4750

4851
@Override
4952
public void process(ChannelHandlerContext ctx, MqttMessage msg) {
50-
MqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) msg;
51-
int messageId = mqttUnsubscribeMessage.variableHeader().messageId();
52-
MqttUnsubscribePayload payload = mqttUnsubscribeMessage.payload();
53+
final var mqttUnsubscribeMessage = (MqttUnsubscribeMessage) msg;
54+
final var messageId = mqttUnsubscribeMessage.variableHeader().messageId();
55+
final var payload = mqttUnsubscribeMessage.payload();
5356

5457
// 系统主题
55-
List<String> collect = new ArrayList<>(payload.topics());
56-
if (enableSysTopic) {
57-
List<String> unSubSysTopics = collect.stream().filter(TopicUtils::isSys).collect(Collectors.toList());
58-
collect.removeAll(unSubSysTopics);
59-
Mono.when(unsubscribeSysTopics(unSubSysTopics, ctx), subscriptionService.unsubscribe(clientId(ctx), isCleanSession(ctx), collect))
60-
.doOnSuccess(unused -> {
61-
// response
62-
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
63-
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
64-
MqttMessageIdVariableHeader.from(messageId),
65-
null
66-
);
67-
ctx.writeAndFlush(mqttMessage);
68-
}).subscribe();
69-
return;
70-
}
58+
var generalTopics = new ArrayList<>(payload.topics());
59+
var unSubSysTopics = generalTopics.stream().filter(TopicUtils::isSys).toList();
60+
61+
// 移除系统主题
62+
generalTopics.removeAll(unSubSysTopics);
7163

72-
// 非系统主题
73-
subscriptionService.unsubscribe(clientId(ctx), isCleanSession(ctx), collect)
64+
// 删除订阅
65+
Mono.when(unsubscribeSysTopics(unSubSysTopics, ctx), subscriptionService.unsubscribe(clientId(ctx), isCleanSession(ctx), generalTopics))
66+
.doOnError(throwable -> log.error(String.format("主题订阅[%s]删除失败", generalTopics), throwable))
7467
.doOnSuccess(unused -> {
7568
// response
76-
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
69+
var mqttMessage = MqttMessageFactory.newMessage(
7770
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
7871
MqttMessageIdVariableHeader.from(messageId),
7972
null
@@ -89,6 +82,9 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
8982
* @param ctx {@link ChannelHandlerContext}
9083
*/
9184
private Mono<Void> unsubscribeSysTopics(List<String> unSubSysTopics, ChannelHandlerContext ctx) {
85+
if (ObjectUtils.isEmpty(unSubSysTopics)) {
86+
return Mono.empty();
87+
}
9288
return subscriptionService.unsubscribeSys(clientId(ctx), unSubSysTopics);
9389
}
9490
}

0 commit comments

Comments
 (0)