NestJS WebSocket 网关与 React 协同:实现支持万级并发连接的工业级实时预警系统

各位同学,大家好!我是你们的编程导师。

今天我们不谈那些虚头巴脑的“优雅”、“设计模式”或者“SOLID原则”,虽然它们也很重要,但如果你想在工业互联网的大海里游泳,连基本的呼吸(并发连接)都成问题,那这些词儿就真成了马谡——只会背书,不会打仗。

今天我们要聊的是:NestJS WebSocket 网关与 React 协同:实现支持万级并发连接的工业级实时预警系统

这题目听起来是不是像是在秀肌肉?没错,今天这堂课,就是要在你的代码库里建一座“跨服聊天室”,而且是那种连接数爆表、消息延迟低到让你想跳楼、但稳定性高到让你想哭的“核电站级”聊天室。

准备好了吗?让我们直接把服务器开到 1.0 瓦特!


第一部分:当你的 WebSocket 连接数突破 1000,生活就变了

想象一下,你是一个工厂的厂长。你的工厂里有 10,000 台机器。每台机器都有一个传感器,它们每隔几秒钟就要向服务器报告:“嘿,我很好!”或者“救命啊,我要爆炸了!”

如果你用普通的 WebSocket 服务器,大概只能稳稳当当接住 1000 个连接。一旦超过这个数,你的 Node.js 进程就会像便秘一样——不仅吞吐量极低,内存占用还会蹭蹭往上涨,最后直接 Out of Memory

为什么?因为 Node.js 是单线程的,虽然有异步 I/O,但处理 1 万个 TCP 连接的上下文切换和内存分配,足以让你的 CPU 疯狂转圈,直到风扇开始唱起《义勇军进行曲》。

我们的目标是:

  1. 水平扩展:我们要能开 10 个甚至 20 个 Node 进程。
  2. 消息分发:不管机器在哪,不管客户端在哪,消息必须精准送达。
  3. 容错性:挂了一个节点,其他的还得像没事儿人一样。

这怎么搞?这就需要我们引入 NestJS 的 WebSocket Gateway,以及它的“核武器”——Redis 适配器。


第二部分:NestJS Gateway 的骨架搭建

首先,我们需要一个 Gateway。在 NestJS 里,这就像是在 Spring Boot 里写一个 @RestController 一样简单。

我们要建立一个 AlertGateway

// alert.gateway.ts
import {
  WebSocketGateway,
  WebSocketServer,
  SubscribeMessage,
  OnGatewayConnection,
  OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@WebSocketGateway({
  path: '/socket.io', // 这是我们的 WebSocket 路径
  cors: {
    origin: '*', // 在生产环境里,别这么写,那是给黑客留的后门
    methods: ['GET', 'POST'],
  },
  transports: ['websocket'], // 明确指定只用 WebSocket,别用 HTTP 长轮询,那太慢了
})
export class AlertGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  // 连接建立时
  handleConnection(client: Socket) {
    console.log(`🔌 连接建立: ${client.id}`);
    // 这里可以加逻辑:校验 token,分配权限
  }

  // 连接断开时
  handleDisconnect(client: Socket) {
    console.log(`🔌 连接断开: ${client.id}`);
  }

  // 处理客户端发来的消息
  @SubscribeMessage('joinRoom')
  handleJoinRoom(client: Socket, payload: { factoryId: string }) {
    // 举个例子:客户端想订阅某个工厂的数据
    client.join(payload.factoryId);
    return { status: 'ok', message: `Joined room ${payload.factoryId}` };
  }

  // 处理告警推送(这是后端逻辑层调用的)
  broadcastAlert(payload: any) {
    // broadcastToRoom 是 Socket.io 提供的,向特定房间广播
    this.server.to(payload.factoryId).emit('alert', payload);
  }
}

这段代码看起来很简单对吧?但这只是个骨架。如果你现在只运行这一个 Gateway,然后开 10 个这玩意儿,恭喜你,你创建了 10 个独立的世界。工厂 A 的机器报警了,在节点 1 上,节点 2 上的工人完全看不到。这就像两个人在两座孤岛上打电话,没信号。


第三部分:分布式架构的秘钥——Redis 适配器

要想万级并发,我们必须解决“分布式状态共享”的问题。每个节点必须知道别的节点上有谁在监听。

这里我们就要请出 Redis 了。Redis 在这里充当的是一个“大喇叭”,或者说是“消息中转站”。

我们需要安装 @nestjs/event-emitter(虽然这里主要讲 WebSocket,但事件驱动是标配)和 socket.io-redis

核心逻辑:

  1. Gateway A 监听到一个 alert 事件。
  2. Gateway A 通过 Redis 广播这个事件。
  3. Gateway B、C、D 都订阅了 Redis 的这个频道。
  4. Gateway B 收到广播,发现有个客户端 B1 在监听这个工厂 ID,于是 Gateway B 把消息发给客户端 B1。

在 NestJS 中,这被称为 RedisIoAdapter

// alert.gateway.ts (修改版)
import { Server } from 'socket.io';
import { RedisIoAdapter } from './redis.io.adapter'; // 我们需要自己实现这个适配器

@WebSocketGateway({
  path: '/socket.io',
  cors: { origin: '*' },
  transports: ['websocket'],
})
export class AlertGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  async afterInit(server: Server) {
    // 这一步是魔法发生的地方
    const redisAdapter = new RedisIoAdapter(server);
    await redisAdapter.connectToRedis();

    console.log('🚀 Redis 适配器初始化完成,分布式集群已就位!');
  }

  // ... 其他代码保持不变
}

这里有一个非常关键的实现细节,RedisIoAdapter 的核心逻辑其实非常简单粗暴,但有效:

// redis.io.adapter.ts
import { Server } from 'socket.io';
import { createClient } from 'redis';

export class RedisIoAdapter {
  constructor(private server: Server) {}

  async connectToRedis() {
    const pubClient = createClient({ url: 'redis://localhost:6379' });
    const subClient = pubClient.duplicate();

    await Promise.all([pubClient.connect(), subClient.connect()]);

    this.server.adapter.set('createClient', (opt) => {
      return opt;
    });

    this.server.adapter.publish = async (roomId, data, callback) => {
      await pubClient.sAdd(`socket.io#${roomId}`, data);
      await subClient.sPop(`socket.io#${roomId}`, callback);
    };

    // ... 省略 subscribeToAll 等繁琐逻辑,实际使用通常会直接引用 socket.io-redis 包
    // 这里为了演示原理,我们直接依赖 socket.io-redis 包更稳妥:
    const { RedisAdapter } = require('@socket.io/redis-adapter');
    this.server.adapter(RedisAdapter.createAdapter(pubClient, subClient));
  }
}

看,就是这么简单!一旦有了这个适配器,你的 WebSocket 网关就不再是单机的了。不管你启动 5 个进程还是 50 个进程,它们在同一个房间里,消息是实时同步的。


第四部分:工业级消息协议设计

在万级并发下,协议设计的冗余是致命的。你的消息包必须极简。

我们假设我们的系统是用来监控工厂的。每个机器都有 ID。我们需要一种机制,让 React 前端只收到它关心的机器的消息。

方案:房间机制。

  • 服务端:
    当机器 42 报警时,我们向 ID 为 factory-line-1 的房间广播消息。

    // 譬如,我们有一个 Service 负责接收 IoT 设备的数据
    async handleIoTData(data: any) {
       // 假设 data.machineId = 'M-42'
       // 假设 data.factoryId = 'F-01'
    
       // 告诉 Gateway 去干活
       this.alertGateway.broadcastAlert({
         factoryId: 'F-01', // 这决定发到哪个 Redis 频道
         machineId: 'M-42',
         level: 'CRITICAL', // 警告级别
         message: '温度过高',
         timestamp: Date.now()
       });
    }
  • 客户端(React):
    用户登录后,根据他的权限(比如他只负责 F-01 号线的维护),自动加入房间 F-01

    useEffect(() => {
      const socket = new WebSocket('ws://localhost:3000/socket.io');
      socket.on('connect', () => {
        // 发送加入房间的指令
        socket.emit('joinRoom', { factoryId: 'F-01' });
      });
    
      socket.on('alert', (data) => {
        // 收到消息了!
        addAlertToState(data); // 更新 UI
        playAlarmSound();      // 播放滴滴滴的声音
      });
    
      return () => {
        socket.disconnect();
      };
    }, []);

这就构成了完整的闭环。系统不需要把所有 10 万台机器的消息都发给所有用户,而是通过 Redis 和房间机制,精准打击。


第五部分:React 前端的性能优化——别把 UI 冻死了

这是很多新手最容易踩的坑。你以为你发起了 10,000 个 WebSocket 连接,但是当这 10,000 条消息同时砸向你的浏览器时,React 的虚拟 DOM 渲染引擎会瞬间崩溃,页面卡成PPT。

问题所在:
如果你在 useEffect 里直接 setState,并且用 useState 存放一个数组,每次新消息来都 push 进去,React 可能会疯狂地 Diff 那个巨大的数组,导致页面卡死。

解决方案:

  1. 使用 useReducer 或 Zustand:比 useState 更适合管理复杂状态。
  2. 虚拟滚动:这是必须的。你的预警列表可能有 1000 条历史消息,但你只需要渲染屏幕上可见的那 10 条。
  3. 批处理:确保不要在消息的 onmessage 回调里触发多个组件更新,尽量把逻辑聚合。

实战代码:一个高性能的 WebSocket Hook

// useRealtimeAlert.ts
import { useEffect, useReducer, useRef } from 'react';
import io, { Socket } from 'socket.io-client';

type AlertState = Array<{
  id: string;
  machineId: string;
  level: 'INFO' | 'WARNING' | 'CRITICAL';
  message: string;
  time: number;
}>;

// 定义操作类型
type AlertAction =
  | { type: 'ADD_ALERT'; payload: AlertState[0] }
  | { type: 'CLEAR_ALERTS' };

const alertReducer = (state: AlertState, action: AlertAction): AlertState => {
  switch (action.type) {
    case 'ADD_ALERT':
      // 限制数组长度,比如只保留最新的 100 条
      return [action.payload, ...state].slice(0, 100);
    case 'CLEAR_ALERTS':
      return [];
    default:
      return state;
  }
};

export const useRealtimeAlert = (factoryId: string) => {
  const [alerts, dispatch] = useReducer(alertReducer, []);
  const socketRef = useRef<Socket | null>(null);

  useEffect(() => {
    // 1. 建立连接
    socketRef.current = io('http://localhost:3000', {
      transports: ['websocket'],
      autoConnect: false, // 我们手动控制连接
    });

    const socket = socketRef.current;

    socket.on('connect', () => {
      console.log('✅ 连接成功');
      socket.emit('joinRoom', { factoryId });
    });

    // 2. 监听消息
    socket.on('alert', (newAlert) => {
      dispatch({ type: 'ADD_ALERT', payload: newAlert });
    });

    // 3. 监听断线重连(指数退避)
    socket.on('disconnect', (reason) => {
      console.log('❌ 断开连接:', reason);
      if (reason === 'io server disconnect') {
        // 如果是服务器踢掉的,我们需要手动重连
        socket.connect();
      }
    });

    socket.connect();

    // 4. 清理函数
    return () => {
      socket.disconnect();
    };
  }, [factoryId]);

  return { alerts, socket: socketRef.current };
};

在这个 Hook 里,我们使用了 useReducer 来管理状态变化,并且限制了数组长度。这保证了即使消息像洪水一样涌来,我们的内存和 CPU 占用也是可控的。


第六部分:心跳与保活——别让僵尸连接占内存

在万级连接的场景下,有一个恐怖的事情:客户端断网了(比如工人的笔记本没电了),但是 TCP 连接没有优雅地关闭。这会导致服务器端一直保留着这个 Socket 对象,等着接收数据,结果就是内存泄漏。

工业级标准做法:

  1. 服务端心跳:Gateway 每 30 秒向所有连接的客户端发送一个 { type: 'ping' } 消息。
  2. 客户端响应:客户端收到 ping,必须立即回复 pong
  3. 超时处理:如果服务端连续 3 次(90秒)收不到 pong,直接关闭该连接。

在 NestJS Gateway 中,我们可以利用 Socket.IO 的 setTimeout 或者自定义逻辑:

@SubscribeMessage('pong')
handlePong(client: Socket) {
  // 更新客户端的最后活跃时间
  // 清除之前的 timeout 计时器
  // 重置心跳计数器
  return { status: 'ack' };
}

// 在 handleConnection 或 afterInit 中设置全局心跳
afterInit(server: Server) {
  // ... Redis adapter 初始化代码 ...

  // 启动全局心跳检测器
  setInterval(() => {
    server.sockets.forEach((socket) => {
      // 如果 socket 处于非活跃状态,断开它
      // 这是一个简化版的实现,生产环境需要更精细的计时器管理
      // 这里我们依赖 socket.io 的 readyState
      if (!socket.connected) return;

      // 发送 ping
      socket.emit('ping');

      // 设置 5 秒钟不回 pong 就踢人
      socket.setTimeout(5000, () => {
        if (socket.connected) {
          socket.disconnect();
          console.log('🤖 强制断开僵尸连接:', socket.id);
        }
      });
    });
  }, 30000); // 每 30 秒一次
}

这招非常管用。它能帮你清理掉那些“死而不僵”的连接,保证内存占用的纯净。


第七部分:Nginx 配置——服务器的门卫

无论你的 Node.js Gateway 写得多么完美,如果 Nginx 配置不当,用户连接服务器的时候会超时,或者连接数上不去。

关键配置点:

  1. WebSocket 协议升级:Nginx 必须允许 HTTP 头中的 UpgradeConnection 穿透。
  2. Keep-Alive:这是提高并发连接数的关键。不要让每个用户进来都握手一次,要复用 TCP 连接。
  3. 缓冲区大小:工业预警系统数据包通常不大,但为了保险,别设置太小。

Nginx 配置示例:

upstream websocket_backend {
    least_conn; # 负载均衡策略:最少连接优先
    server 127.0.0.1:3001;
    server 127.0.0.1:3002;
    server 127.0.0.1:3003;
}

server {
    listen 80;
    server_name your-industry-domain.com;

    location /socket.io/ {
        proxy_pass http://websocket_backend;

        # 关键配置:WebSocket 协议升级
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";

        # 关键配置:保持连接不超时(注意:这会占用很多文件描述符)
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # 超时时间设长一点
        proxy_connect_timeout 7d;
        proxy_send_timeout 7d;
        proxy_read_timeout 7d;
    }
}

注意那个 7d(7天)的设置。如果你设置的是 60 秒,当用户操作一下电脑待机,回来后连接就断了。对于工业预警系统,连接一旦断开,错过一条警报可能是灾难性的。


第八部分:监控与日志——别让你的系统“失明”

系统上线了,怎么知道它跑得好不好?你不能光看着屏幕上的数据发呆。

  1. 实时连接数监控:监控 Nginx 的 Active Connections
  2. 消息吞吐量:监控 Redis 的 processed 命令次数,或者 Node.js 的 process.nextTick 调用次数。
  3. 错误日志:使用 NestJS 的 PinoWinston 日志库,记录所有的 error 事件。

当有 1 万个用户在线时,你可能会发现偶尔会有 EADDRINUSE 错误(端口被占用)。这时候不要慌,重启服务就行了,或者优化 Nginx 的负载均衡算法。

第九部分:终极优化——Zlib 压缩

这可能是最后一个技术细节了,但对于“万级”来说,每一个字节都很重要。

如果你的消息 Payload 很大(比如带了图片、Base64 编码的传感器数据),每秒传输 1 万次未压缩的数据,带宽会瞬间打满。

Socket.IO 默认是支持二进制传输的,但对于 JSON 文本,我们可以开启 Zlib 压缩。

@WebSocketGateway({
  path: '/socket.io',
  cors: { origin: '*' },
  transports: ['websocket'],
  maxHttpBufferSize: 1e6, // 默认是 1e6 (1MB),如果数据包超过这个,会报错
  enableCompression: true, // 🔥 开启压缩!这是省流神器
})
export class AlertGateway {}

开启这个 enableCompression 后,NestJS 和 Socket.IO 会自动对发送的消息进行 gzip 压缩。对于一段包含大量重复字段的 JSON(比如工厂 ID、时间戳),压缩率可以达到 50% 甚至更高。这意味你的带宽成本直接减半。


第十部分:心态建设

好了,同学们,代码讲完了。但我必须告诉你们,写工业级实时系统的痛苦在于“维护”。

当你面对满屏红色的报错日志时,当你发现明明加了 Redis 适配器,但消息还是偶尔延迟了几秒时,不要怀疑人生。这就是分布式系统的复杂性。

几点建议:

  1. 不要过度设计:刚开始不要搞什么 Actor Model,不要搞什么 CQRS,先把 Redis 适配器配好,把心跳做好了。
  2. 测试:怎么测万级并发?别真搞 1 万人去点。写个脚本,用 Docker 启动 100 个模拟客户端,疯狂发送心跳。看看你的服务器 CPU 是多少,内存是多少。
  3. 优雅降级:如果 WebSocket 宕机了,你的 React 页面会白屏吗?有没有一个离线模式,或者提示用户“网络连接异常,请刷新页面”?

总结一下今天的干货:

  1. NestJS Gateway 是你的核心控制器。
  2. Redis IoAdapter 是你解决水平扩展问题的救世主。
  3. Redis Rooms 是你精准推送消息的关键。
  4. React useReducer + 虚拟滚动 是你处理海量 UI 更新的法宝。
  5. Nginx Keep-Alive 是你撑起万级连接的基石。

现在,去你的 IDE 里,敲下第一行代码吧。别让你的工厂机器在报警时,只有你自己不知道!

祝大家代码无 Bug,服务器不崩!下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注