WebSocket 集群方案

2,012次阅读
没有评论

基于 Spring Cloud 使用一致性哈希算法实现分布式 WebSocket. / 基于 RabbitMQ 广播实现分布式 WebSocket.

WebSocket 协议

参考 RFC 标准规范。

思路

核心问题在于具备 WebSocket 服务端的 SpringBoot 或 SpringCloud 等微服务应用是有状态的。服务端的内存中存在维持连接的 Session。客户端连接服务器时,只能和集群中的唯一一个服务实例来连接,而且数据传输过程中也直接和此实例通信。

解决方案有很多。

方案一:将状态放到应用之外的存储中,但不可行

这个方案非常类似于 HTTP 协议的 Token 实现(非 JWT),Java 中具体的例子就是 Spring Session 以及 Spring Security 的 Token Store,比如 RedisTokenStore 实现了将 OAuth2 Token 存放到 Redis 的逻辑。此方案的核心前提是可序列化。WebSocket 协议和 HTTP 协议并不同,一个 WebSocket Session 对应于一次 TCP Socket 连接而非数据,新的连接将会基于新的四元组,不可能做到序列化。

方案二:一致性哈希

像微服务注册中心一样,维护一个全局映射关系 —— 当前客户端的 WebSocket 连接是和哪一个实例/节点保持的。每次通信都经过此注册中心或哈希,查询到具体的服务实例,将连接交给它。同样会遇到一些问题,比如:哈希环的实现、如何解决一致性问题(单节点故障的 Session 迁移重连将代价降到最低)、CAP 取舍、实例的区分依据是什么(IP ? 实例 ID?)、可能与注册中心构成强依赖、以及部署环境的变更带来运维上的挑战等难题。这个方案实施有一定的成本和技术限制,可以参考这篇文章的大体思路,理解了核心的哈希环的概念(如下图),再配合注册中心、服务上下线消息,就能完成基本的实现。我利用 Nacos、Redis、RabbitMQ 基于 Spring Cloud 写了一个通过一致性哈希来实现的全栈项目,前端和后端都已开源 —— WebSocket 集群的一致性哈希实践(Spring Cloud 后端) – GitHub , WebSocket 客户端模拟与服务列表(React 前端) – GitHub, 最近刚刚写好基本的功能,参考 @ufiredong 的方式利用 Java 语言的 Docker SDK 实现模拟 WebSocket 实例的上下线。未来还会稍微做细微的优化。

📔websocket-clusterPublic

Spring Cloud project for WebSocket cluster with consistent-hashing algorithm.

Java🌟 106🛠 40

WebSocket 集群方案

另外,Nacos 作为微服务的注册中心是国内比较流行的选择。但有人早就向 Nacos 社区提出了维护哈希环的议题;姑且不说此议题是否合理,但 Nacos 的维护人员竟然没有能够理解这样的需求,还是有点令人诧异。

最终效果如下, 上线一个新的服务实例后,客户端会自动根据最新哈希环重新路由到最新节点上,收发消息一切正常:

https://www.bilibili.com/blackboard/html5mobileplayer.html?bvid=BV1sg411q78V&danmaku=0&high_quality=1

方案三:广播队列

每次连接都需要通知到所有实例,实例各自都判断连接状态在不在自己这里,不在的话直接忽略,在就处理。类似于发布订阅模型,可以通过 MQ(RabbitMQ、Kafka、RocketMQ 等)或 Redis 做到。此方案实现简单,适用于集群规模不大的场景,因为需要所有的节点进行判断或计算。我愿称之为:分布式事件驱动。

以下基于 RabbitMQ 做了简单实现。

简单广播实现 WebSocket 集群

声明式 API 的好处就体现在这里:短短几行就直接利用了 RabbitMQ 的 Java SDK 创建好交换机和队列以及绑定关系。

package me.lawrenceli.websocket.server.configuration;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class MqConfig {

    private static final String FANOUT_EXCHANGE = "websocket-exchange";

    @Bean
    public FanoutExchange fanoutExchange() {
        log.info("创建广播交换机 [{}]", FANOUT_EXCHANGE);
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    public AnonymousQueue queueForWebSocket() {
        log.info("创建用于 WebSocket 的匿名队列");
        return new AnonymousQueue();
    }

    /**
     * @param fanoutExchange    交换机
     * @param queueForWebSocket 队列
     * @return Binding
     */
    @Bean
    public Binding bindingSingle(FanoutExchange fanoutExchange, AnonymousQueue queueForWebSocket) {
        log.info("把匿名队列 [{}] 绑定到广播交换器 [{}]", queueForWebSocket.getName(), fanoutExchange.getName());
        return BindingBuilder.bind(queueForWebSocket).to(fanoutExchange);
    }

}

然后是生产者和消费者。生产者将消息发送到队列中进行广播,集群的消费者监听队列,判断此 WebSocket Session 是否在当前消费节点再做进一步处理。

消息生产方,一般是外层接收到消息通信请求,然后调用进行集群内广播:

package me.lawrenceli.websocket.server.configuration;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class FanoutSender {

    @Autowired
    private FanoutExchange fanoutExchange;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object message) {
        log.info("开始发送广播: [{}]", message.toString());
        rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message);
    }

}

消息消费方,多节点共同消费同一消息,区分是否需要自己来处理:

package me.lawrenceli.websocket.server.configuration;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class FanoutReceiver {

    @RabbitListener(queues = "#{queueForWebSocket.name}")
    public void singleReceiver(Object message) {
        log.info("队列接收到了消息: [{}]", message.toString());
        // 判断 WebSocket Session 是否在当前节点
        // 一般维护在一个 ConcurrentHashMap 静态变量中,这里叫 sessionMap
        // message 中有类似 SessionId 的字段
        if (sessionMap.contains(sessionId)) {
            log.info("WebSocket Session 在当前节点");
            // 执行相应的流程
        } else {
            log.info("当前节点无此 WebSocket Session");
            // 什么都不用做,直接忽略
        }
    }

}

注意,方案 2、方案 3 都没有解决单节点故障问题,因为从本质上讲,没有实现 WebSocket Session 的全局共享。但是方案二能够依赖哈希环的更新,最大程度上地降低迁移连接的代价,客户端会有重试的机制来针对单节点故障问题做容错。如果规模较大,项目复杂,且客户端连接数多,推荐使用一致性哈希的方案二,当然下面也有其他的社区方案。

其他案例

即刻 App 后端基于 Node.js 使用了 socket.io 实现多端实时通信,Joway 的这篇文章交代的一些细节也涉及了分布式部署的问题。

socket.io 官网也提供了多节点的使用方法,不外乎 IP Hash 之类的方案。

正文完
可以使用微信扫码关注公众号(ID:xzluomor)
post-qrcode
 0
评论(没有评论)

文心AIGC

2023 年 6 月
 1234
567891011
12131415161718
19202122232425
2627282930  
文心AIGC
文心AIGC
人工智能ChatGPT,AIGC指利用人工智能技术来生成内容,其中包括文字、语音、代码、图像、视频、机器人动作等等。被认为是继PGC、UGC之后的新型内容创作方式。AIGC作为元宇宙的新方向,近几年迭代速度呈现指数级爆发,谷歌、Meta、百度等平台型巨头持续布局
文章搜索
热门文章
潞晨尤洋:日常办公没必要上私有模型,这三类企业才需要 | MEET2026

潞晨尤洋:日常办公没必要上私有模型,这三类企业才需要 | MEET2026

潞晨尤洋:日常办公没必要上私有模型,这三类企业才需要 | MEET2026 Jay 2025-12-22 09...
共推空天领域智能化升级!趋境科技与金航数码强强联手

共推空天领域智能化升级!趋境科技与金航数码强强联手

共推空天领域智能化升级!趋境科技与金航数码强强联手 十三 2025-12-09 18:18:41 来源:量子位...
起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机”

起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机”

起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机” 西风 202...
面向「空天具身智能」,北航团队提出星座规划新基准丨NeurIPS’25

面向「空天具身智能」,北航团队提出星座规划新基准丨NeurIPS’25

面向「空天具身智能」,北航团队提出星座规划新基准丨NeurIPS’25 鹭羽 2025-12-13 22:37...
5天连更5次,可灵AI年末“狂飙式”升级

5天连更5次,可灵AI年末“狂飙式”升级

5天连更5次,可灵AI年末“狂飙式”升级 思邈 2025-12-10 14:28:37 来源:量子位 让更大规...
最新评论
ufabet ufabet มีเกมให้เลือกเล่นมากมาย: เกมเดิมพันหลากหลาย ครบทุกค่ายดัง
tornado crypto mixer tornado crypto mixer Discover the power of privacy with TornadoCash! Learn how this decentralized mixer ensures your transactions remain confidential.
ดูบอลสด ดูบอลสด Very well presented. Every quote was awesome and thanks for sharing the content. Keep sharing and keep motivating others.
ดูบอลสด ดูบอลสด Pretty! This has been a really wonderful post. Many thanks for providing these details.
ดูบอลสด ดูบอลสด Pretty! This has been a really wonderful post. Many thanks for providing these details.
ดูบอลสด ดูบอลสด Hi there to all, for the reason that I am genuinely keen of reading this website’s post to be updated on a regular basis. It carries pleasant stuff.
Obrazy Sztuka Nowoczesna Obrazy Sztuka Nowoczesna Thank you for this wonderful contribution to the topic. Your ability to explain complex ideas simply is admirable.
ufabet ufabet Hi there to all, for the reason that I am genuinely keen of reading this website’s post to be updated on a regular basis. It carries pleasant stuff.
ufabet ufabet You’re so awesome! I don’t believe I have read a single thing like that before. So great to find someone with some original thoughts on this topic. Really.. thank you for starting this up. This website is something that is needed on the internet, someone with a little originality!
ufabet ufabet Very well presented. Every quote was awesome and thanks for sharing the content. Keep sharing and keep motivating others.
热评文章
小冰之父李笛智能体创业,公司取名Nextie!陆奇是股东

小冰之父李笛智能体创业,公司取名Nextie!陆奇是股东

小冰之父李笛智能体创业,公司取名Nextie!陆奇是股东 Jay 2025-12-09 08:26:01 来源...
梁文锋,Nature全球年度十大科学人物!

梁文锋,Nature全球年度十大科学人物!

梁文锋,Nature全球年度十大科学人物! 一水 2025-12-09 09:46:23 来源:量子位 来自安...
起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机”

起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机”

起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机” 西风 202...
摩尔线程新一代GPU架构10天后发布

摩尔线程新一代GPU架构10天后发布

摩尔线程新一代GPU架构10天后发布 思邈 2025-12-09 15:46:09 来源:量子位 国内首个聚焦...
极客公园创新大会 2026在京落幕,罗永浩、张楠、何小鹏、刘靖康等共议 AI 时代「进程由我」

极客公园创新大会 2026在京落幕,罗永浩、张楠、何小鹏、刘靖康等共议 AI 时代「进程由我」

极客公园创新大会 2026在京落幕,罗永浩、张楠、何小鹏、刘靖康等共议 AI 时代「进程由我」 henry 2...