流(或消息关联性)
1、什么是流?
本协议“流”的本质,是通过 sid(streamId)为来回的相关消息建立起关联性。在异步的场景下:
- 我发了1000个消息包,你回了1000个消息包。哪个包,是回应哪个包的?
- 我发了1000个消息包,哪些包是作为一串整体给你的?
“流”像无形的纽带,但可以通过接口感受它存在:
//发起端
session.sendAndRequest("demo", new StringEntity("hi")).thenReply(reply->{
//当答复时
});
//接收端
public void onMessage(Session s, Message m){
if(m.isRequest()){
//答复
s.reply(m, new StringEntity("me too"));
}
}
所有消息发出都会有 sid,其中明显用到“流”的主要有:
- 握手时(连接与连接确认,会基于一个 sid)
- 数据分片时(所有分片,会基于一个 sid)
- sendAndRequest 时
- sendAndSubscribe 时
- reply、replyEnd 时
2、发送语义事件流
之前的 send 发了后,对方有没有收到是不知道的。可通过基于流的接口,进一步改造:
//说明是以消费者 "a" 的身分,订阅了主题 "demo"
Entity entity = new StringEntity("").metaPut("topic","demo").metaPut("consumer","a");
Reply response = session.sendAndRequest("mq.subscribe", entity).await(); //此处用异步等待
//说明发的消息主题是 "demo"
Entity entity = new StringEntity("hi").metaPut("topic", "demo");
Reply response = session.sendAndRequest("mq.publish", entity).await();
如果长时间没有收到答复 ,则会超时异常!就像 http 那样。
3、监听语义事件流
这个演示代码不科学哦,主要表达应用的效果。有了“流”,就给会话消息建立了关系及质量保证!
public class Server {
public void main(String[] args) throws Throwable {
Map<String,String> topicConsumerMap ...;
Set<Sessoin> sessionSet ...;
SocketD.createServer("sd:tcp")
.listen(new EventListener().on("mq.subscribe", (s,m)->{
String consumer = m.meta("consumer");
s.attrPut(consumer, "1");
sessionSet.add(s);
topicConsumerMap.put(m.meta("topic"), m.meta("consumer"));
if(m.isRequest()){
//如果是请求,则进行答复
s.replyEnd(m, new StringEntity("").metaPut("mq.confirm", "1");
}
}).on("mq.publish", (s,m)->{
String consumer = topicConsumerMap.get(m.meta("topic"));
//转发
sessionSet.stream().filter(s1->s1.attrHas(consumer)).forEach(s1->{
s1.send(m.event(), m);
});
if(m.isRequest()){
//如果是请求,则进行答复
s.replyEnd(m, new StringEntity("").metaPut("mq.confirm", "1");
}
}))
.start();
}
}