Socket.D v2.5.12

流(或消息关联性)

</> markdown

1、什么是流?

本协议“流”的本质,是通过 sid(streamId)为来回的相关消息建立起关联性。在异步的场景下:

  • 我发了1000个消息包,你回了1000个消息包。哪个包,是回应哪个包的?
  • 我发了1000个消息包,哪些包是作为一串整体给你的?

“流”像无形的纽带,但可以通过接口感受它存在:

//发起端
session.sendAndRequest("demo", new StringEntity("hi")).thenReply(reply->{
  //当答复时
});

//接收端
public void onMessage(Session s, Message m){
    if(m.isRequest()){
        //答复
        s.reply(m, new StringEntity("me too"));
    }
}

所有消息发出都会有 sid,其中明显用到“流”的主要有:

  • 握手时(连接与连接确认,会基于一个 sid)
  • 数据分片时(所有分片,会基于一个 sid)
  • sendAndRequest 时
  • sendAndSubscribe 时
  • reply、replyEnd 时

2、发送语义事件流

之前的 send 发了后,对方有没有收到是不知道的。可通过基于流的接口,进一步改造:

//说明是以消费者 "a" 的身分,订阅了主题 "demo"
Entity entity = new StringEntity("").metaPut("topic","demo").metaPut("consumer","a");
Reply response = session.sendAndRequest("mq.subscribe", entity).await(); //此处用异步等待
//说明发的消息主题是 "demo"
Entity entity = new StringEntity("hi").metaPut("topic", "demo");
Reply response = session.sendAndRequest("mq.publish", entity).await();

如果长时间没有收到答复 ,则会超时异常!就像 http 那样。

3、监听语义事件流

这个演示代码不科学哦,主要表达应用的效果。有了“流”,就给会话消息建立了关系及质量保证!

public class Server {
    public void main(String[] args) throws Throwable {
        Map<String,String> topicConsumerMap ...;
        Set<Sessoin> sessionSet ...;
        
        SocketD.createServer("sd:tcp")
                .listen(new EventListener().on("mq.subscribe", (s,m)->{
                    String consumer = m.meta("consumer");
                    
                    s.attrPut(consumer, "1");
                    
                    sessionSet.add(s);
                    topicConsumerMap.put(m.meta("topic"), m.meta("consumer"));
                    
                    if(m.isRequest()){
                        //如果是请求,则进行答复
                        s.replyEnd(m, new StringEntity("").metaPut("mq.confirm", "1");
                    }
                }).on("mq.publish", (s,m)->{
                    String consumer = topicConsumerMap.get(m.meta("topic"));
                    
                    //转发
                    sessionSet.stream().filter(s1->s1.attrHas(consumer)).forEach(s1->{
                        s1.send(m.event(), m);
                    });
                    
                    if(m.isRequest()){
                        //如果是请求,则进行答复
                        s.replyEnd(m, new StringEntity("").metaPut("mq.confirm", "1");
                    }
                }))
                .start();
    }
}