了解三个基础发送接口
1、发送(像 websocket)
send
#::启动服务端
await SocketD.create_server("sd:ws") \
.config(lambda c: c.port(8602)) \
.start()
#::打开客户端会话
clientSession = await SocketD.create_client("sd:ws://127.0.0.1:8602/?u=a&p=2") \
.open()
#发送
clientSession.send("/demo", StringEntity("hello wrold!"))
2、发送并请求 + 一个答复(像 http)
sendAndRequest + replyEnd(或者 reply。反正只收一个答复,管它是不是最后的)
#::启动服务端
await SocketD.create_Server("sd:ws") \
.config(lambda c: c.port(8602)) \
.listen(EventListener().do_on_message(lambda s,m:
if m.is_request():
s.reply_end(m, StringEntity("And you too."))
).start()
#::打开客户端会话
clientSession = await SocketD.create_client("sd:ws://127.0.0.1:8602/?u=a&p=2") \
.open()
#发送并请求(且,等待答复)
reply = await clientSession.send_and_request("/demo", StringEntity("hello wrold!"))
sendAndRequest 的同步模式(一般需要指定超时,默认为 10s)
await clientSession.send_and_request("/demo", StringEntity("hello wrold!"))
sendAndRequest 的异步模式(一般需要指定超时,默认为 10s)
#示例,指定10秒超时
clientSession.send_and_request("/demo", StringEntity("hello wrold!"), 10_000).then_reply(lambda reply:
#收到答复
).then_error(lambda error:
#如果有异常(比如超时,或者协议告警)
)
3、发送并订阅 + 多个答复(像 reactive stream)
sendAndSubscribe + reply(*n) + replyEnd
#::启动服务端
await SocketD.create_server("sd:ws")
.config(lambda c: c.port(8602))
.listen(EventListener().do_on_message(lambda s,m:
if m.is_subscribe():
s.reply(m, StringEntity("And you too."))
s.reply_end(m, StringEntity("Welcome to my home"))
).start()
#::打开客户端会话
clientSession = await SocketD.create_client("sd:ws://127.0.0.1:8602/?u=a&p=2") \
.open()
#发送并订阅(且,接收答复流)
clientSession.send_and_subscribe("/demo", StringEntity("hello wrold!")).then_reply(lambda reply:
#如果需要识别是否为最后一个答复?
if reply.is_end():
...
).then_error(lambda error:
#如果有异常(比如超时,或者协议告警)
)
sendAndSubscribe 只有异步模式(如果有需要,可以指定具体超时。默认为 2h)
#示例,指定60秒超时
clientSession.send_and_subscribe("/demo", StringEntity("hello wrold!"), 60_000).then_reply(lambda reply:
#如果需要识别是否为最后一个答复?
if reply.is_end():
...
).then_error(lambda error:
#如果有异常(比如超时,或者协议告警)
)