### 1、什么是流？

本协议“流”的本质，是通过 sid（streamId）为来回的相关消息建立起关联性。在异步的场景下：

* 我发了1000个消息包，你回了1000个消息包。哪个包，是回应哪个包的？
* 我发了1000个消息包，哪些包是作为一串整体给你的？

“流”像无形的纽带，但可以通过接口感受它存在：

```java
//发起端
session.sendAndRequest("demo", new StringEntity("hi")).thenReply(reply->{
  //当答复时
});

//接收端
public void onMessage(Session s, Message m){
    if(m.isRequest()){
        //答复
        s.reply(m, new StringEntity("me too"));
    }
}
```

所有消息发出都会有 sid，其中明显用到“流”的主要有：

* 握手时（连接与连接确认，会基于一个 sid）
* 数据分片时（所有分片，会基于一个 sid）
* sendAndRequest 时
* sendAndSubscribe 时
* reply、replyEnd 时


### 2、发送语义事件流

之前的 send 发了后，对方有没有收到是不知道的。可通过基于流的接口，进一步改造：

```java
//说明是以消费者 "a" 的身分，订阅了主题 "demo"
Entity entity = new StringEntity("").metaPut("topic","demo").metaPut("consumer","a");
Reply response = session.sendAndRequest("mq.subscribe", entity).await(); //此处用异步等待
```

```java
//说明发的消息主题是 "demo"
Entity entity = new StringEntity("hi").metaPut("topic", "demo");
Reply response = session.sendAndRequest("mq.publish", entity).await();
```

如果长时间没有收到答复 ，则会超时异常！就像 http 那样。


### 3、监听语义事件流

这个演示代码不科学哦，主要表达应用的效果。有了“流”，就给会话消息建立了关系及质量保证！

```java
public class Server {
    public void main(String[] args) throws Throwable {
        Map<String,String> topicConsumerMap ...;
        Set<Sessoin> sessionSet ...;
        
        SocketD.createServer("sd:tcp")
                .listen(new EventListener().on("mq.subscribe", (s,m)->{
                    String consumer = m.meta("consumer");
                    
                    s.attrPut(consumer, "1");
                    
                    sessionSet.add(s);
                    topicConsumerMap.put(m.meta("topic"), m.meta("consumer"));
                    
                    if(m.isRequest()){
                        //如果是请求，则进行答复
                        s.replyEnd(m, new StringEntity("").metaPut("mq.confirm", "1");
                    }
                }).on("mq.publish", (s,m)->{
                    String consumer = topicConsumerMap.get(m.meta("topic"));
                    
                    //转发
                    sessionSet.stream().filter(s1->s1.attrHas(consumer)).forEach(s1->{
                        s1.send(m.event(), m);
                    });
                    
                    if(m.isRequest()){
                        //如果是请求，则进行答复
                        s.replyEnd(m, new StringEntity("").metaPut("mq.confirm", "1");
                    }
                }))
                .start();
    }
}
```