Socket.D v2.5.11

实战 - 简单的消息队列实现

</> markdown

实现简单的消息队列(订阅 + 发布 + 广播)

1、服务端


public class Demo05_Mq_Server {
    public static void main(String[] args) throws Exception {
        Set<Session> userList = new HashSet<>();

        SocketD.createServer("sd:udp")
                .config(c -> c.port(8602))
                .listen(new EventListener()
                        .doOnOpen(s -> {
                            userList.add(s);
                        })
                        .doOnClose(s -> {
                            userList.remove(s);
                        })
                        .doOn("mq.sub", (s, m) -> {
                            //::订阅指令
                            String topic = m.meta("topic");
                            if (StrUtils.isNotEmpty(topic)) {
                                //标记订阅关系
                                s.attrPut(topic, "1");
                            }
                        }).doOn("mq.push", (s, m) -> {
                            //::推送指令
                            String topic = m.meta("topic");
                            String id = m.meta("id");

                            if (StrUtils.isNotEmpty(topic) && StrUtils.isNotEmpty(id)) {
                                //开始给订阅用户广播
                                for (Session s1 : userList.stream().filter(s1 -> s.attrMap().containsKey(topic)).collect(Collectors.toList())) {
                                    //Qos0 发送广播
                                    s1.send("mq.broadcast", m);
                                }
                            }
                        })
                ).start();
    }
}

2、客户端

public class Demo05_Mq_Client {
    public static void main(String[] args) throws Exception {
        MqClient client = new MqClient("127.0.0.1", 8602);
        client.connect();

        client.subscribe("user.created", (message) -> {
            System.out.println(message);
        });

        client.subscribe("user.updated", (message) -> {
            System.out.println(message);
        });

        client.publish("user.created", "test");
    }

    public static class MqClient {
        private Map<String, Consumer<String>> listenerMap = new HashMap<>();
        private String server;
        private int port;
        private ClientSession clientSession;

        public MqClient(String server, int port) {
            this.server = server;
            this.port = port;
        }

        /**
         * 连接
         */
        public void connect() throws Exception {
            clientSession = SocketD.createClient("sd:udp://" + server + ":" + port)
                    .config(c -> c.heartbeatInterval(5)) //心跳频率调高,确保不断连
                    .listen(new EventListener()
                            .doOn("mq.broadcast", (s, m) -> {
                                String topic = m.meta("topic");
                                Consumer<String> listener = listenerMap.get(topic);
                                if (listener != null) {
                                    //Qos0
                                    listener.accept(m.dataAsString());
                                }
                            }))
                    .open();
        }

        /**
         * 订阅消息
         */
        public void subscribe(String topic, Consumer<String> listener) throws IOException {
            listenerMap.put(topic, listener);
            //Qos0
            clientSession.send("mq.sub", new StringEntity("").metaPut("topic", topic));
        }

        /**
         * 发布消息
         */
        public void publish(String topic, String message) throws IOException {
            Entity entity = new StringEntity(message)
                    .metaPut("topic", topic)
                    .metaPut("id", StrUtils.guid());

            //Qos0
            clientSession.send("mq.push", entity);
        }
    }
}