Skip to content

Vertx实现websocket数据推送

Vert.x基于Netty框架, 采用多-reactor线程模型,通过事件循环(Event Loop)处理并发请求,实现了异步非阻塞IO。它支持多种语言,但本文聚焦于Java。核心特性包括:

  • 事件驱动:基于事件循环机制,有效管理并发,提升应用性能。
  • 多语言支持:允许使用Java、JavaScript、Groovy、Ruby等多种语言编写微服务。
  • 轻量级:极低的内存占用,适合微服务架构。
  • 模块化:丰富的组件生态,易于扩展和集成。

Maven依赖

xml
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-spring-boot-starter</artifactId>
    <version>${vertx.version}</version>
</dependency>

这里使用的是4.5.1版本

结合spring boot

服务启动类

java
@Slf4j
@SpringBootApplication
public class MyApplication {

    public static void main(String[] args) {
        System.setProperty("spring.devtools.restart.enabled", "false");
        SpringApplication application = new SpringApplication(MyApplication.class);
        application.setApplicationStartup(new BufferingApplicationStartup(2048));
        application.run(args);

        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 部署 Verticle
        vertx.deployVerticle(WebSocketVerticle.class.getName())
                .onSuccess(id ->
                        log.info("WebSocketVerticle deployed, id={}", id))
                .onFailure(err -> log.error("Deploy failed", err));
    }
}

主体管理类

java
import cn.hutool.extra.spring.SpringUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
public class WebSocketVerticle extends AbstractVerticle {

//    @Value("${vertx.port:9000}")
    private Integer port;

    /** 线程安全保存所有连接 */
    private final ConcurrentHashMap<String, ServerWebSocket> sessions = new ConcurrentHashMap<>();

    @Override
    public void start(Promise<Void> startPromise) {
        // 1. 提前在 Worker 线程中初始化 Spring Bean(如果必须)
        vertx.executeBlocking(promise -> {
            // 手动读取 Spring Environment 中的值
            port = SpringUtil.getBean(org.springframework.core.env.Environment.class)
                    .getProperty("vertx.port", Integer.class, 8900);
            promise.complete();
        }).onSuccess(v -> {
            // 在 EventLoop 中启动 HTTP 服务器
            vertx.createHttpServer()
                    .webSocketHandler(this::handle)
                    .listen(port)
                    .<Void>mapEmpty()
                    .onComplete(startPromise);

//            // 周期性广播示例:每 3 秒推送一次服务器时间
//            vertx.setPeriodic(3_000, id -> {
//                String payload = LocalDateTime.now().toString();
//                sendToAll(payload);
//            });
        }).onFailure(startPromise::fail);

//        super.start(startPromise);
    }

    private void handle(ServerWebSocket ws) {
        WsPathEnum pathEnum = WsPathEnum.of(ws.path());
        if (pathEnum == null) {
            ws.reject(404);
            return;
        }

        // 提前取一次,缓存引用
        BizVerticle service = SpringUtil.getBean(pathEnum.getBizClazz());

        ws.textMessageHandler(msg -> {
            // 如果 onMessage 内可能阻塞,用 executeBlocking(如读写数据库)
            vertx.executeBlocking(
                    promise -> {
                        service.onMessage(ws, msg);
                        promise.complete();
                    },
                    false,               // 顺序执行保证
                    ar -> { /* 异常处理 */ }
            );
        });


//        另一种实现
//        if (!"/chat".equalsIgnoreCase(ws.path())) {
//            ws.reject(404);
//            return;
//        }
//
//        String id = UUID.randomUUID().toString();
//        sessions.put(id, ws);
//        log.info("WS connected: {}, online={}", id, sessions.size());
//
//        // 接收客户端文本
//        ws.textMessageHandler(msg -> {
//            log.info("recv from {}: {}", id, msg);
//            // 原样回显,可改成业务逻辑
//            ws.writeTextMessage("echo: " + msg);
//        });
//
//        // 关闭时清理
//        ws.closeHandler(v -> {
//            sessions.remove(id);
//            log.info("WS closed: {}, online={}", id, sessions.size());
//        });

        ws.exceptionHandler(t -> log.error("WS error: {}", t));
    }

//    /* ========= 业务暴露的推送 API ========= */
//    public void sendToAll(String msg) {
//        sessions.values().forEach(ws -> {
//            if (!ws.isClosed()) ws.writeTextMessage(msg);
//        });
//    }
//
//    public void sendToUser(String id, String msg) {
//        ServerWebSocket ws = sessions.get(id);
//        if (ws != null && !ws.isClosed()) ws.writeTextMessage(msg);
//    }
}

枚举类

java
import java.util.Arrays;

public enum WsPathEnum {

    CHAT("/chat", ChatBizVerticle.class),
    ORDER("/order", OrderBizVerticle.class),
    NOTICE("/notice", NoticeBizVerticle.class);

    private final String path;
    private final Class<? extends BizVerticle> bizClazz;

    WsPathEnum(String path, Class<? extends BizVerticle> bizClazz) {
        this.path = path;
        this.bizClazz = bizClazz;
    }

    public String getPath() {
        return path;
    }

    public Class<? extends BizVerticle> getBizClazz() {
        return bizClazz;
    }

    public static WsPathEnum of(String path) {
        return Arrays.stream(values())
                .filter(e -> e.path.equals(path))
                .findFirst()
                .orElse(null);
    }
}

继承父类

ChatBizVerticle等实现类需要加上@Component注解并继承BizVerticle类

java
public abstract class BizVerticle {
    protected void onMessage(ServerWebSocket ws, String msg) {
        ws.writeTextMessage("echo: " + msg);
    }

//    protected abstract void onMessage(ServerWebSocket ws, String msg);
}

这样就可以根据不同的业务选择不同的实现类处理

注意

在 Vert.x 的 AbstractVerticle 里,Spring 的 @Value 注解不会生效,因为 Verticle 不是由 Spring 容器直接实例化,而是由 Vert.x 自己通过反射 newInstance() 创建的。所以如果使用 @Value 获取到的值为 null。

前端测试代码

html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    const url = 'ws://localhost:9000/chat';
    let ws;

    function connect() {
        if (!window.WebSocket) {
            return;
        }
        ws = new WebSocket(url);
        ws.onopen    = () => console.log('connected');
        ws.onmessage = e => console.log('recv:', e.data);
        ws.onclose   = () => setTimeout(connect, 3000); // 3 秒后重连
        ws.onerror   = console.error;
    }
    connect();

    function send(message) {
        if (!window.WebSocket) {
            return;
        }
        if (ws.readyState == WebSocket.OPEN) {
            ws.send(message);
        } else {
            alert("The socket is not open.");
        }
    }
</script>
<form onsubmit="return false;">
    <input type="text" name="message" value="Hello, World!"/>
    <input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)"/>
</form>
</body>
</html>

粤ICP备20009776号