各位同学,大家好!我是你们的编程导师。
今天我们不谈那些虚头巴脑的“优雅”、“设计模式”或者“SOLID原则”,虽然它们也很重要,但如果你想在工业互联网的大海里游泳,连基本的呼吸(并发连接)都成问题,那这些词儿就真成了马谡——只会背书,不会打仗。
今天我们要聊的是:NestJS WebSocket 网关与 React 协同:实现支持万级并发连接的工业级实时预警系统。
这题目听起来是不是像是在秀肌肉?没错,今天这堂课,就是要在你的代码库里建一座“跨服聊天室”,而且是那种连接数爆表、消息延迟低到让你想跳楼、但稳定性高到让你想哭的“核电站级”聊天室。
准备好了吗?让我们直接把服务器开到 1.0 瓦特!
第一部分:当你的 WebSocket 连接数突破 1000,生活就变了
想象一下,你是一个工厂的厂长。你的工厂里有 10,000 台机器。每台机器都有一个传感器,它们每隔几秒钟就要向服务器报告:“嘿,我很好!”或者“救命啊,我要爆炸了!”
如果你用普通的 WebSocket 服务器,大概只能稳稳当当接住 1000 个连接。一旦超过这个数,你的 Node.js 进程就会像便秘一样——不仅吞吐量极低,内存占用还会蹭蹭往上涨,最后直接 Out of Memory。
为什么?因为 Node.js 是单线程的,虽然有异步 I/O,但处理 1 万个 TCP 连接的上下文切换和内存分配,足以让你的 CPU 疯狂转圈,直到风扇开始唱起《义勇军进行曲》。
我们的目标是:
- 水平扩展:我们要能开 10 个甚至 20 个 Node 进程。
- 消息分发:不管机器在哪,不管客户端在哪,消息必须精准送达。
- 容错性:挂了一个节点,其他的还得像没事儿人一样。
这怎么搞?这就需要我们引入 NestJS 的 WebSocket Gateway,以及它的“核武器”——Redis 适配器。
第二部分:NestJS Gateway 的骨架搭建
首先,我们需要一个 Gateway。在 NestJS 里,这就像是在 Spring Boot 里写一个 @RestController 一样简单。
我们要建立一个 AlertGateway。
// alert.gateway.ts
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({
path: '/socket.io', // 这是我们的 WebSocket 路径
cors: {
origin: '*', // 在生产环境里,别这么写,那是给黑客留的后门
methods: ['GET', 'POST'],
},
transports: ['websocket'], // 明确指定只用 WebSocket,别用 HTTP 长轮询,那太慢了
})
export class AlertGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
// 连接建立时
handleConnection(client: Socket) {
console.log(`🔌 连接建立: ${client.id}`);
// 这里可以加逻辑:校验 token,分配权限
}
// 连接断开时
handleDisconnect(client: Socket) {
console.log(`🔌 连接断开: ${client.id}`);
}
// 处理客户端发来的消息
@SubscribeMessage('joinRoom')
handleJoinRoom(client: Socket, payload: { factoryId: string }) {
// 举个例子:客户端想订阅某个工厂的数据
client.join(payload.factoryId);
return { status: 'ok', message: `Joined room ${payload.factoryId}` };
}
// 处理告警推送(这是后端逻辑层调用的)
broadcastAlert(payload: any) {
// broadcastToRoom 是 Socket.io 提供的,向特定房间广播
this.server.to(payload.factoryId).emit('alert', payload);
}
}
这段代码看起来很简单对吧?但这只是个骨架。如果你现在只运行这一个 Gateway,然后开 10 个这玩意儿,恭喜你,你创建了 10 个独立的世界。工厂 A 的机器报警了,在节点 1 上,节点 2 上的工人完全看不到。这就像两个人在两座孤岛上打电话,没信号。
第三部分:分布式架构的秘钥——Redis 适配器
要想万级并发,我们必须解决“分布式状态共享”的问题。每个节点必须知道别的节点上有谁在监听。
这里我们就要请出 Redis 了。Redis 在这里充当的是一个“大喇叭”,或者说是“消息中转站”。
我们需要安装 @nestjs/event-emitter(虽然这里主要讲 WebSocket,但事件驱动是标配)和 socket.io-redis。
核心逻辑:
- Gateway A 监听到一个
alert事件。 - Gateway A 通过 Redis 广播这个事件。
- Gateway B、C、D 都订阅了 Redis 的这个频道。
- Gateway B 收到广播,发现有个客户端 B1 在监听这个工厂 ID,于是 Gateway B 把消息发给客户端 B1。
在 NestJS 中,这被称为 RedisIoAdapter。
// alert.gateway.ts (修改版)
import { Server } from 'socket.io';
import { RedisIoAdapter } from './redis.io.adapter'; // 我们需要自己实现这个适配器
@WebSocketGateway({
path: '/socket.io',
cors: { origin: '*' },
transports: ['websocket'],
})
export class AlertGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
async afterInit(server: Server) {
// 这一步是魔法发生的地方
const redisAdapter = new RedisIoAdapter(server);
await redisAdapter.connectToRedis();
console.log('🚀 Redis 适配器初始化完成,分布式集群已就位!');
}
// ... 其他代码保持不变
}
这里有一个非常关键的实现细节,RedisIoAdapter 的核心逻辑其实非常简单粗暴,但有效:
// redis.io.adapter.ts
import { Server } from 'socket.io';
import { createClient } from 'redis';
export class RedisIoAdapter {
constructor(private server: Server) {}
async connectToRedis() {
const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
this.server.adapter.set('createClient', (opt) => {
return opt;
});
this.server.adapter.publish = async (roomId, data, callback) => {
await pubClient.sAdd(`socket.io#${roomId}`, data);
await subClient.sPop(`socket.io#${roomId}`, callback);
};
// ... 省略 subscribeToAll 等繁琐逻辑,实际使用通常会直接引用 socket.io-redis 包
// 这里为了演示原理,我们直接依赖 socket.io-redis 包更稳妥:
const { RedisAdapter } = require('@socket.io/redis-adapter');
this.server.adapter(RedisAdapter.createAdapter(pubClient, subClient));
}
}
看,就是这么简单!一旦有了这个适配器,你的 WebSocket 网关就不再是单机的了。不管你启动 5 个进程还是 50 个进程,它们在同一个房间里,消息是实时同步的。
第四部分:工业级消息协议设计
在万级并发下,协议设计的冗余是致命的。你的消息包必须极简。
我们假设我们的系统是用来监控工厂的。每个机器都有 ID。我们需要一种机制,让 React 前端只收到它关心的机器的消息。
方案:房间机制。
-
服务端:
当机器 42 报警时,我们向 ID 为factory-line-1的房间广播消息。// 譬如,我们有一个 Service 负责接收 IoT 设备的数据 async handleIoTData(data: any) { // 假设 data.machineId = 'M-42' // 假设 data.factoryId = 'F-01' // 告诉 Gateway 去干活 this.alertGateway.broadcastAlert({ factoryId: 'F-01', // 这决定发到哪个 Redis 频道 machineId: 'M-42', level: 'CRITICAL', // 警告级别 message: '温度过高', timestamp: Date.now() }); } -
客户端(React):
用户登录后,根据他的权限(比如他只负责 F-01 号线的维护),自动加入房间F-01。useEffect(() => { const socket = new WebSocket('ws://localhost:3000/socket.io'); socket.on('connect', () => { // 发送加入房间的指令 socket.emit('joinRoom', { factoryId: 'F-01' }); }); socket.on('alert', (data) => { // 收到消息了! addAlertToState(data); // 更新 UI playAlarmSound(); // 播放滴滴滴的声音 }); return () => { socket.disconnect(); }; }, []);
这就构成了完整的闭环。系统不需要把所有 10 万台机器的消息都发给所有用户,而是通过 Redis 和房间机制,精准打击。
第五部分:React 前端的性能优化——别把 UI 冻死了
这是很多新手最容易踩的坑。你以为你发起了 10,000 个 WebSocket 连接,但是当这 10,000 条消息同时砸向你的浏览器时,React 的虚拟 DOM 渲染引擎会瞬间崩溃,页面卡成PPT。
问题所在:
如果你在 useEffect 里直接 setState,并且用 useState 存放一个数组,每次新消息来都 push 进去,React 可能会疯狂地 Diff 那个巨大的数组,导致页面卡死。
解决方案:
- 使用
useReducer或 Zustand:比useState更适合管理复杂状态。 - 虚拟滚动:这是必须的。你的预警列表可能有 1000 条历史消息,但你只需要渲染屏幕上可见的那 10 条。
- 批处理:确保不要在消息的
onmessage回调里触发多个组件更新,尽量把逻辑聚合。
实战代码:一个高性能的 WebSocket Hook
// useRealtimeAlert.ts
import { useEffect, useReducer, useRef } from 'react';
import io, { Socket } from 'socket.io-client';
type AlertState = Array<{
id: string;
machineId: string;
level: 'INFO' | 'WARNING' | 'CRITICAL';
message: string;
time: number;
}>;
// 定义操作类型
type AlertAction =
| { type: 'ADD_ALERT'; payload: AlertState[0] }
| { type: 'CLEAR_ALERTS' };
const alertReducer = (state: AlertState, action: AlertAction): AlertState => {
switch (action.type) {
case 'ADD_ALERT':
// 限制数组长度,比如只保留最新的 100 条
return [action.payload, ...state].slice(0, 100);
case 'CLEAR_ALERTS':
return [];
default:
return state;
}
};
export const useRealtimeAlert = (factoryId: string) => {
const [alerts, dispatch] = useReducer(alertReducer, []);
const socketRef = useRef<Socket | null>(null);
useEffect(() => {
// 1. 建立连接
socketRef.current = io('http://localhost:3000', {
transports: ['websocket'],
autoConnect: false, // 我们手动控制连接
});
const socket = socketRef.current;
socket.on('connect', () => {
console.log('✅ 连接成功');
socket.emit('joinRoom', { factoryId });
});
// 2. 监听消息
socket.on('alert', (newAlert) => {
dispatch({ type: 'ADD_ALERT', payload: newAlert });
});
// 3. 监听断线重连(指数退避)
socket.on('disconnect', (reason) => {
console.log('❌ 断开连接:', reason);
if (reason === 'io server disconnect') {
// 如果是服务器踢掉的,我们需要手动重连
socket.connect();
}
});
socket.connect();
// 4. 清理函数
return () => {
socket.disconnect();
};
}, [factoryId]);
return { alerts, socket: socketRef.current };
};
在这个 Hook 里,我们使用了 useReducer 来管理状态变化,并且限制了数组长度。这保证了即使消息像洪水一样涌来,我们的内存和 CPU 占用也是可控的。
第六部分:心跳与保活——别让僵尸连接占内存
在万级连接的场景下,有一个恐怖的事情:客户端断网了(比如工人的笔记本没电了),但是 TCP 连接没有优雅地关闭。这会导致服务器端一直保留着这个 Socket 对象,等着接收数据,结果就是内存泄漏。
工业级标准做法:
- 服务端心跳:Gateway 每 30 秒向所有连接的客户端发送一个
{ type: 'ping' }消息。 - 客户端响应:客户端收到
ping,必须立即回复pong。 - 超时处理:如果服务端连续 3 次(90秒)收不到
pong,直接关闭该连接。
在 NestJS Gateway 中,我们可以利用 Socket.IO 的 setTimeout 或者自定义逻辑:
@SubscribeMessage('pong')
handlePong(client: Socket) {
// 更新客户端的最后活跃时间
// 清除之前的 timeout 计时器
// 重置心跳计数器
return { status: 'ack' };
}
// 在 handleConnection 或 afterInit 中设置全局心跳
afterInit(server: Server) {
// ... Redis adapter 初始化代码 ...
// 启动全局心跳检测器
setInterval(() => {
server.sockets.forEach((socket) => {
// 如果 socket 处于非活跃状态,断开它
// 这是一个简化版的实现,生产环境需要更精细的计时器管理
// 这里我们依赖 socket.io 的 readyState
if (!socket.connected) return;
// 发送 ping
socket.emit('ping');
// 设置 5 秒钟不回 pong 就踢人
socket.setTimeout(5000, () => {
if (socket.connected) {
socket.disconnect();
console.log('🤖 强制断开僵尸连接:', socket.id);
}
});
});
}, 30000); // 每 30 秒一次
}
这招非常管用。它能帮你清理掉那些“死而不僵”的连接,保证内存占用的纯净。
第七部分:Nginx 配置——服务器的门卫
无论你的 Node.js Gateway 写得多么完美,如果 Nginx 配置不当,用户连接服务器的时候会超时,或者连接数上不去。
关键配置点:
- WebSocket 协议升级:Nginx 必须允许 HTTP 头中的
Upgrade和Connection穿透。 - Keep-Alive:这是提高并发连接数的关键。不要让每个用户进来都握手一次,要复用 TCP 连接。
- 缓冲区大小:工业预警系统数据包通常不大,但为了保险,别设置太小。
Nginx 配置示例:
upstream websocket_backend {
least_conn; # 负载均衡策略:最少连接优先
server 127.0.0.1:3001;
server 127.0.0.1:3002;
server 127.0.0.1:3003;
}
server {
listen 80;
server_name your-industry-domain.com;
location /socket.io/ {
proxy_pass http://websocket_backend;
# 关键配置:WebSocket 协议升级
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# 关键配置:保持连接不超时(注意:这会占用很多文件描述符)
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 超时时间设长一点
proxy_connect_timeout 7d;
proxy_send_timeout 7d;
proxy_read_timeout 7d;
}
}
注意那个 7d(7天)的设置。如果你设置的是 60 秒,当用户操作一下电脑待机,回来后连接就断了。对于工业预警系统,连接一旦断开,错过一条警报可能是灾难性的。
第八部分:监控与日志——别让你的系统“失明”
系统上线了,怎么知道它跑得好不好?你不能光看着屏幕上的数据发呆。
- 实时连接数监控:监控 Nginx 的
Active Connections。 - 消息吞吐量:监控 Redis 的
processed命令次数,或者 Node.js 的process.nextTick调用次数。 - 错误日志:使用 NestJS 的
Pino或Winston日志库,记录所有的error事件。
当有 1 万个用户在线时,你可能会发现偶尔会有 EADDRINUSE 错误(端口被占用)。这时候不要慌,重启服务就行了,或者优化 Nginx 的负载均衡算法。
第九部分:终极优化——Zlib 压缩
这可能是最后一个技术细节了,但对于“万级”来说,每一个字节都很重要。
如果你的消息 Payload 很大(比如带了图片、Base64 编码的传感器数据),每秒传输 1 万次未压缩的数据,带宽会瞬间打满。
Socket.IO 默认是支持二进制传输的,但对于 JSON 文本,我们可以开启 Zlib 压缩。
@WebSocketGateway({
path: '/socket.io',
cors: { origin: '*' },
transports: ['websocket'],
maxHttpBufferSize: 1e6, // 默认是 1e6 (1MB),如果数据包超过这个,会报错
enableCompression: true, // 🔥 开启压缩!这是省流神器
})
export class AlertGateway {}
开启这个 enableCompression 后,NestJS 和 Socket.IO 会自动对发送的消息进行 gzip 压缩。对于一段包含大量重复字段的 JSON(比如工厂 ID、时间戳),压缩率可以达到 50% 甚至更高。这意味你的带宽成本直接减半。
第十部分:心态建设
好了,同学们,代码讲完了。但我必须告诉你们,写工业级实时系统的痛苦在于“维护”。
当你面对满屏红色的报错日志时,当你发现明明加了 Redis 适配器,但消息还是偶尔延迟了几秒时,不要怀疑人生。这就是分布式系统的复杂性。
几点建议:
- 不要过度设计:刚开始不要搞什么 Actor Model,不要搞什么 CQRS,先把 Redis 适配器配好,把心跳做好了。
- 测试:怎么测万级并发?别真搞 1 万人去点。写个脚本,用 Docker 启动 100 个模拟客户端,疯狂发送心跳。看看你的服务器 CPU 是多少,内存是多少。
- 优雅降级:如果 WebSocket 宕机了,你的 React 页面会白屏吗?有没有一个离线模式,或者提示用户“网络连接异常,请刷新页面”?
总结一下今天的干货:
- NestJS Gateway 是你的核心控制器。
- Redis IoAdapter 是你解决水平扩展问题的救世主。
- Redis Rooms 是你精准推送消息的关键。
- React useReducer + 虚拟滚动 是你处理海量 UI 更新的法宝。
- Nginx Keep-Alive 是你撑起万级连接的基石。
现在,去你的 IDE 里,敲下第一行代码吧。别让你的工厂机器在报警时,只有你自己不知道!
祝大家代码无 Bug,服务器不崩!下课!