优化 Broker 集群可用性(添加安全停止)
要高可用的集群至少需要两个或以上的 Broker 服务节点。如果再增加“安全停止”处理,可实现客户端:
- 不卡顿
- 不异常
或者说只要网络没问题,集群不管怎么变(只要还有一个同类节点在),客户端都将丝滑无感!
1、服务端处理
以下采用原生接口方式,IOC容器的可以借助容器的生命周期时机点:
public class ServerApp {
public static void main(String[] args) throws IOException {
Server server = SocketD.createServer("sd:tcp")
.config(c -> c.port(8602).fragmentHandler(new BrokerFragmentHandler()))
.listen(new BrokerListener();)
.start();
//增加安全停止
Runtime.getRuntime().addShutdownHook(new Thread(()->{
//预停止(对端的 session.isClosing() 会变成 true,方便查找过滤)
server.prestop();
System.out.println("stop 1/3");
//等5秒(这个时间自已定),用于完成正在转发中的消息
RunUtils.runAndTry(()-> Thread.sleep(5_1000));
System.out.println("stop 2/3");
//停止(对端会触发 listener::onClose 事件)
server.stop();
System.out.println("stop 3/3");
}));
}
}
服务端执行会话的 prestop 时,对端 ClusterClientSession 中的会话仍然有效,但是有新任务时不再使用。
2、客户端处理
增加安全停止处理
public class ClientApp {
public static void main(String[] args) throws IOException {
ClusterClientSession clientSession = (ClusterClientSession) SocketD.createClusterClient(
"sd:tcp://127.0.0.1:8602?@=demoapp",
"sd:tcp://127.0.0.1:8603?@=demoapp")
.listen(new EventListener().doOn("hello", (s, m) -> {
System.out.println(m);
}))
.open();
//增加安全停止
Runtime.getRuntime().addShutdownHook(new Thread(()->{
//预关闭(对端的 session.isClosing 会变成 true,方便查找过滤)
clientSession.preclose();
System.out.println("stop 1/3");
//等5秒(这个时间自已定),用于完成正在处理中的消息
RunUtils.runAndTry(()-> Thread.sleep(5_1000));
System.out.println("stop 2/3");
//关闭(对端会触发 listener::onClose 事件)
clientSession.close()
System.out.println("stop 3/3");
}));
}
}
客户端执行会话的 preclose 时,对端 BrokerListener 中的会话仍然有效,但是有新任务时不再使用。