适配对接 Flux 响应式接口
本文使用 io.projectreactor
的响应式接口为例(其它响应式接口,微调即可)
- 引入响应式接口包
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.7</version>
</dependency>
- 应用效果预览
@Controller
public class DemoController implements Render {
@Inject
SocketdRx socketdRx;
@Mapping("/hello")
public Mono<String> hello(String name) {
return socketdRx.sendAndRequest("/hello", Entity.of().metaPut("name", name))
.map(r -> r.dataAsString());
}
}
@Configuration
public class SocketdConfig(){
@Bean
public ClientSession clientInit(@Inject("${demo.server}") String serverUrl){
return SocketD.createClient(serverUrl).open();
}
@Bean
public SocketdRx rxInit(ClientSession clientSession){
return new SocketdRx(clientSession);
}
}
- 定义适配对接类
public class SocketdRx {
private final ClientSession clientSession;
public SocketdRx(ClientSession clientSession) {
this.clientSession = clientSession;
}
public Mono<Void> send(String event, Entity entity) {
return Mono.create(sink -> {
try {
clientSession.send(event, entity);
sink.success();
} catch (Throwable e) {
sink.error(e);
}
});
}
public Mono<Reply> sendAndRequest(String event, Entity entity) {
return Mono.create(sink -> {
try {
clientSession.sendAndRequest(event, entity).thenReply(reply -> {
sink.success(reply);
}).thenError(e -> {
sink.error(e);
});
} catch (Throwable e) {
sink.error(e);
}
});
}
public Flux<Reply> sendAndSubscribe(String event, Entity entity) {
return Flux.create(sink -> {
try {
clientSession.sendAndSubscribe(event, entity).thenReply(reply -> {
sink.next(reply);
if (reply.isEnd()) {
sink.complete();
}
}).thenError(e -> {
sink.error(e);
});
} catch (Throwable e) {
sink.error(e);
}
});
}
}