主题
Vertx实现websocket数据推送
Vert.x基于Netty框架, 采用多-reactor线程模型,通过事件循环(Event Loop)处理并发请求,实现了异步非阻塞IO。它支持多种语言,但本文聚焦于Java。核心特性包括:
- 事件驱动:基于事件循环机制,有效管理并发,提升应用性能。
- 多语言支持:允许使用Java、JavaScript、Groovy、Ruby等多种语言编写微服务。
- 轻量级:极低的内存占用,适合微服务架构。
- 模块化:丰富的组件生态,易于扩展和集成。
Maven依赖
xml
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-spring-boot-starter</artifactId>
<version>${vertx.version}</version>
</dependency>
这里使用的是4.5.1
版本
结合spring boot
服务启动类
java
@Slf4j
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
System.setProperty("spring.devtools.restart.enabled", "false");
SpringApplication application = new SpringApplication(MyApplication.class);
application.setApplicationStartup(new BufferingApplicationStartup(2048));
application.run(args);
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 部署 Verticle
vertx.deployVerticle(WebSocketVerticle.class.getName())
.onSuccess(id ->
log.info("WebSocketVerticle deployed, id={}", id))
.onFailure(err -> log.error("Deploy failed", err));
}
}
主体管理类
java
import cn.hutool.extra.spring.SpringUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class WebSocketVerticle extends AbstractVerticle {
// @Value("${vertx.port:9000}")
private Integer port;
/** 线程安全保存所有连接 */
private final ConcurrentHashMap<String, ServerWebSocket> sessions = new ConcurrentHashMap<>();
@Override
public void start(Promise<Void> startPromise) {
// 1. 提前在 Worker 线程中初始化 Spring Bean(如果必须)
vertx.executeBlocking(promise -> {
// 手动读取 Spring Environment 中的值
port = SpringUtil.getBean(org.springframework.core.env.Environment.class)
.getProperty("vertx.port", Integer.class, 8900);
promise.complete();
}).onSuccess(v -> {
// 在 EventLoop 中启动 HTTP 服务器
vertx.createHttpServer()
.webSocketHandler(this::handle)
.listen(port)
.<Void>mapEmpty()
.onComplete(startPromise);
// // 周期性广播示例:每 3 秒推送一次服务器时间
// vertx.setPeriodic(3_000, id -> {
// String payload = LocalDateTime.now().toString();
// sendToAll(payload);
// });
}).onFailure(startPromise::fail);
// super.start(startPromise);
}
private void handle(ServerWebSocket ws) {
WsPathEnum pathEnum = WsPathEnum.of(ws.path());
if (pathEnum == null) {
ws.reject(404);
return;
}
// 提前取一次,缓存引用
BizVerticle service = SpringUtil.getBean(pathEnum.getBizClazz());
ws.textMessageHandler(msg -> {
// 如果 onMessage 内可能阻塞,用 executeBlocking(如读写数据库)
vertx.executeBlocking(
promise -> {
service.onMessage(ws, msg);
promise.complete();
},
false, // 顺序执行保证
ar -> { /* 异常处理 */ }
);
});
// 另一种实现
// if (!"/chat".equalsIgnoreCase(ws.path())) {
// ws.reject(404);
// return;
// }
//
// String id = UUID.randomUUID().toString();
// sessions.put(id, ws);
// log.info("WS connected: {}, online={}", id, sessions.size());
//
// // 接收客户端文本
// ws.textMessageHandler(msg -> {
// log.info("recv from {}: {}", id, msg);
// // 原样回显,可改成业务逻辑
// ws.writeTextMessage("echo: " + msg);
// });
//
// // 关闭时清理
// ws.closeHandler(v -> {
// sessions.remove(id);
// log.info("WS closed: {}, online={}", id, sessions.size());
// });
ws.exceptionHandler(t -> log.error("WS error: {}", t));
}
// /* ========= 业务暴露的推送 API ========= */
// public void sendToAll(String msg) {
// sessions.values().forEach(ws -> {
// if (!ws.isClosed()) ws.writeTextMessage(msg);
// });
// }
//
// public void sendToUser(String id, String msg) {
// ServerWebSocket ws = sessions.get(id);
// if (ws != null && !ws.isClosed()) ws.writeTextMessage(msg);
// }
}
枚举类
java
import java.util.Arrays;
public enum WsPathEnum {
CHAT("/chat", ChatBizVerticle.class),
ORDER("/order", OrderBizVerticle.class),
NOTICE("/notice", NoticeBizVerticle.class);
private final String path;
private final Class<? extends BizVerticle> bizClazz;
WsPathEnum(String path, Class<? extends BizVerticle> bizClazz) {
this.path = path;
this.bizClazz = bizClazz;
}
public String getPath() {
return path;
}
public Class<? extends BizVerticle> getBizClazz() {
return bizClazz;
}
public static WsPathEnum of(String path) {
return Arrays.stream(values())
.filter(e -> e.path.equals(path))
.findFirst()
.orElse(null);
}
}
继承父类
ChatBizVerticle等实现类需要加上@Component注解并继承BizVerticle类
java
public abstract class BizVerticle {
protected void onMessage(ServerWebSocket ws, String msg) {
ws.writeTextMessage("echo: " + msg);
}
// protected abstract void onMessage(ServerWebSocket ws, String msg);
}
这样就可以根据不同的业务选择不同的实现类处理
注意
在 Vert.x 的 AbstractVerticle 里,Spring 的 @Value 注解不会生效,因为 Verticle 不是由 Spring 容器直接实例化,而是由 Vert.x 自己通过反射 newInstance() 创建的。所以如果使用 @Value 获取到的值为 null。
前端测试代码
html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
const url = 'ws://localhost:9000/chat';
let ws;
function connect() {
if (!window.WebSocket) {
return;
}
ws = new WebSocket(url);
ws.onopen = () => console.log('connected');
ws.onmessage = e => console.log('recv:', e.data);
ws.onclose = () => setTimeout(connect, 3000); // 3 秒后重连
ws.onerror = console.error;
}
connect();
function send(message) {
if (!window.WebSocket) {
return;
}
if (ws.readyState == WebSocket.OPEN) {
ws.send(message);
} else {
alert("The socket is not open.");
}
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="Hello, World!"/>
<input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)"/>
</form>
</body>
</html>