实战 - 简单的消息队列实现
实现简单的消息队列(订阅 + 发布 + 广播)
1、服务端
public class Demo05_Mq_Server {
public static void main(String[] args) throws Exception {
Set<Session> userList = new HashSet<>();
SocketD.createServer("sd:udp")
.config(c -> c.port(8602))
.listen(new EventListener()
.doOnOpen(s -> {
userList.add(s);
})
.doOnClose(s -> {
userList.remove(s);
})
.doOn("mq.sub", (s, m) -> {
//::订阅指令
String topic = m.meta("topic");
if (StrUtils.isNotEmpty(topic)) {
//标记订阅关系
s.attrPut(topic, "1");
}
}).doOn("mq.push", (s, m) -> {
//::推送指令
String topic = m.meta("topic");
String id = m.meta("id");
if (StrUtils.isNotEmpty(topic) && StrUtils.isNotEmpty(id)) {
//开始给订阅用户广播
for (Session s1 : userList.stream().filter(s1 -> s.attrMap().containsKey(topic)).collect(Collectors.toList())) {
//Qos0 发送广播
s1.send("mq.broadcast", m);
}
}
})
).start();
}
}
2、客户端
public class Demo05_Mq_Client {
public static void main(String[] args) throws Exception {
MqClient client = new MqClient("127.0.0.1", 8602);
client.connect();
client.subscribe("user.created", (message) -> {
System.out.println(message);
});
client.subscribe("user.updated", (message) -> {
System.out.println(message);
});
client.publish("user.created", "test");
}
public static class MqClient {
private Map<String, Consumer<String>> listenerMap = new HashMap<>();
private String server;
private int port;
private ClientSession clientSession;
public MqClient(String server, int port) {
this.server = server;
this.port = port;
}
/**
* 连接
*/
public void connect() throws Exception {
clientSession = SocketD.createClient("sd:udp://" + server + ":" + port)
.config(c -> c.heartbeatInterval(5)) //心跳频率调高,确保不断连
.listen(new EventListener()
.doOn("mq.broadcast", (s, m) -> {
String topic = m.meta("topic");
Consumer<String> listener = listenerMap.get(topic);
if (listener != null) {
//Qos0
listener.accept(m.dataAsString());
}
}))
.open();
}
/**
* 订阅消息
*/
public void subscribe(String topic, Consumer<String> listener) throws IOException {
listenerMap.put(topic, listener);
//Qos0
clientSession.send("mq.sub", new StringEntity("").metaPut("topic", topic));
}
/**
* 发布消息
*/
public void publish(String topic, String message) throws IOException {
Entity entity = new StringEntity(message)
.metaPut("topic", topic)
.metaPut("id", StrUtils.guid());
//Qos0
clientSession.send("mq.push", entity);
}
}
}