Socket.D v2.5.11

适配对接 Flux 响应式接口

</> markdown

本文使用 io.projectreactor 的响应式接口为例(其它响应式接口,微调即可)

  • 引入响应式接口包
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.7</version>
</dependency>
  • 应用效果预览
@Controller
public class DemoController implements Render {
    @Inject
    SocketdRx socketdRx;

    @Mapping("/hello")
    public Mono<String> hello(String name) {
        return socketdRx.sendAndRequest("/hello", Entity.of().metaPut("name", name))
                .map(r -> r.dataAsString());
    }
}

@Configuration
public class SocketdConfig(){
    @Bean
    public ClientSession clientInit(@Inject("${demo.server}") String serverUrl){
        return SocketD.createClient(serverUrl).open();
    }
    @Bean
    public SocketdRx rxInit(ClientSession clientSession){
        return new SocketdRx(clientSession);
    }
}
  • 定义适配对接类
public class SocketdRx {
    private final ClientSession clientSession;

    public SocketdRx(ClientSession clientSession) {
        this.clientSession = clientSession;
    }

    public Mono<Void> send(String event, Entity entity) {
        return Mono.create(sink -> {
            try {
                clientSession.send(event, entity);
                sink.success();
            } catch (Throwable e) {
                sink.error(e);
            }
        });
    }

    public Mono<Reply> sendAndRequest(String event, Entity entity) {
        return Mono.create(sink -> {
            try {
                clientSession.sendAndRequest(event, entity).thenReply(reply -> {
                    sink.success(reply);
                }).thenError(e -> {
                    sink.error(e);
                });
            } catch (Throwable e) {
                sink.error(e);
            }
        });
    }

    public Flux<Reply> sendAndSubscribe(String event, Entity entity) {
        return Flux.create(sink -> {
            try {
                clientSession.sendAndSubscribe(event, entity).thenReply(reply -> {
                    sink.next(reply);

                    if (reply.isEnd()) {
                        sink.complete();
                    }
                }).thenError(e -> {
                    sink.error(e);
                });
            } catch (Throwable e) {
                sink.error(e);
            }
        });
    }
}