Express 原生 WebSocket 与 React 状态机:实现超大规模工业仪表盘的低延迟数据同步

各位下午好,请把你们的咖啡杯放一边,现在我们要开始搞一件比较“硬核”的事情。

你们可能在大厂里见过那种东西——工业仪表盘。成千上万个传感器,像疯狗一样往外吐数据:温度、压力、转速、电压。如果你用传统的 HTTP 轮询,服务器还没来得及把尿撒完,你的浏览器就已经卡成了一幅抽象画。

今天,我们不谈那些花里胡哨的第三方库,我们要搞点“原生”的。我们要用 Express 原生 WebSocket,配合 React 手写状态机,去驯服这头名为“超大规模数据”的野兽。

准备好了吗?让我们把那些花哨的框架都扔进垃圾桶,开始进入工程师的极简主义模式。


第一章:为什么我们要跟 HTTP 过不去?

首先,让我们搞清楚敌人的本质。HTTP 协议就像是一个端着公文包的邮差。你想问“现在温度多少?”,他得跑去办公室,敲开老板的门,问:“老板,现在的温度是多少?”老板写个数字给他,他跑回来告诉你。

如果你有 10,000 个传感器,每隔 1 秒问一次,这个邮差就算是用火箭推进器,跑到世界末日也跑不完。

而 WebSocket 呢?WebSocket 就像是给邮差装了 WiFi,并且允许他在走廊里大喊大叫。一旦握手成功,那就是一条双向的隧道。服务器想吐数据,直接顺着管子流给你,零开销,零等待。

但是! 咱们今天不用 Socket.io。为什么?因为 Socket.io 那个家伙太“保姆”了。为了兼容性,它做了太多封装,对实时性的极致追求上有一定的妥协。作为资深工程师,我们要追求极致的低延迟,我们要用 原生 WebSocket (ws 库),这就好比你要练肌肉,少用健身器械,直接扛哑铃。

第二章:Express 后端的“钢铁洪流”

首先,我们要搭建后端。假设我们有一家工厂,里面运转着 10,000 个虚拟机器人的温度计。我们需要一个 Express 服务器,它能承载巨大的并发连接。

我们的架构是这样的:Express 只负责“服务”。它不处理复杂的逻辑,它只负责连接,然后把数据推送到 Redis,或者直接广播给客户端。为了演示,我们使用 Node.js 原生的 ws 库。

第一步:安装依赖

npm install express ws

第二步:写一个简单的 WebSocket 服务器

这里有个关键点:不要在 connection 事件里处理复杂的业务逻辑。那是业务层的事,WebSocket 层只负责“管道传输”。

const express = require('express');
const { createServer } = require('http');
const { WebSocketServer } = require('ws');
const app = express();
const server = createServer(app);

// 1. 定义 WebSocket 服务器
const wss = new WebSocketServer({ server });

// 2. 连接管理器(假装很高级)
const clients = new Set();

wss.on('connection', (ws) => {
    console.log('有人连进来了,感觉良好。');
    clients.add(ws);

    // 发送初始握手消息,告诉客户端我是谁
    ws.send(JSON.stringify({ type: 'handshake', id: 'server-001' }));

    ws.on('message', (message) => {
        // 处理客户端发来的心跳或控制指令
        // 这里为了演示,我们直接忽略
    });

    ws.on('close', () => {
        console.log('客人走了,真冷清。');
        clients.delete(ws);
    });

    ws.on('error', (error) => {
        console.error('哎呀,管道堵了:', error);
    });
});

// 3. 启动 Express
app.get('/', (req, res) => {
    res.send('工业仪表盘后端服务已启动,请勿用肉眼看屏幕。');
});

server.listen(8080, () => {
    console.log('服务跑在 8080 端口,数据流正在准备...');
});

第三步:模拟数据生成与推送

在工业场景下,数据是源源不断的。我们不能等数据来了再发,我们必须主动推送

这里有个技巧:背压处理。如果你的数据生成速度远快于网络传输速度,Buffer 会爆内存。我们需要一个简单的缓冲队列。

// 模拟一个巨大的传感器数据集
const sensors = Array.from({ length: 10000 }, (_, i) => ({
    id: i,
    value: 0,
    target: Math.random() * 100,
    volatility: Math.random() * 2 // 波动性
}));

let lastUpdateTime = 0;

setInterval(() => {
    // 更新所有传感器的值(模拟物理变化)
    sensors.forEach(sensor => {
        // 简单的随机游走算法
        const delta = (Math.random() - 0.5) * sensor.volatility;
        sensor.value += delta;
        // 限制在 0-100 之间
        if (sensor.value > 100) sensor.value = 100;
        if (sensor.value < 0) sensor.value = 0;
    });

    // 2. 找到所有活跃的 WebSocket 连接
    const now = Date.now();
    // 只有当客户端连接数 > 0 时才广播,节省资源
    if (clients.size > 0 && now - lastUpdateTime > 16) { // 限制在 60FPS 左右
        lastUpdateTime = now;

        // 注意:真实场景下,不要一次性发 10000 个 JSON 字符串!
        // 那会把浏览器干炸的。这里我们演示“广播模式”。
        // 实际工程中,我们会使用 Protocol Buffers 或者二进制数据流。

        const payload = JSON.stringify({
            type: 'batch_update',
            timestamp: now,
            count: sensors.length,
            // 这里为了演示,只取前 100 个或者特定 ID
            // data: sensors.slice(0, 100) 
            // 为了演示性能,我们只广播全部,但告诉你怎么优化
            data: sensors
        });

        // 遍历发送(效率较低,适合小规模,生产环境需要集群和消息队列)
        // 使用 buffer 优化可以稍微提升一点
        const buffer = Buffer.from(payload);

        // 假设我们要给 100 个客户端发送
        for (const client of clients) {
            if (client.readyState === client.OPEN) {
                client.send(buffer);
            }
        }
    }
}, 100); // 每 100ms 更新一次

专家吐槽:
上面的代码有个致命伤:for (const client of clients)。这就像你在食堂打饭,一个一个窗口去问人吃不吃,效率极低。
在超大规模场景下,我们要用 Redis Pub/Sub 或者 Kafka。服务器节点 1 发现数据变了,往 Redis 丢一条消息,节点 2、节点 3 只要订阅了这个 Topic,立马就能收到广播。这才是真正的“工业级”水平。

第三章:React 的“疯狗”模式

好了,后端能吐数据了。现在来看前端。React 的核心思想是“声明式”,这很好,但它有个缺点:太啰嗦。每次 setState,React 都要重新渲染整个组件树,还要跑一遍 Virtual DOM Diff 算法。

如果我们要展示 10,000 个仪表盘,每秒 60 帧,React 就会哭着喊妈妈。

我们要放弃 useState,我们要用“命令式”的状态管理。

这听起来很反直觉,但在处理高频实时数据时,这是唯一真理。我们需要一种机制,它不关心“状态是什么”,它只关心“状态变了没”以及“怎么变”。

1. 构建一个高性能的数据缓存层

我们不能把数据直接塞进 React State。React State 是为了“响应式”UI 更新设计的,而不是为了“高频数据流”设计的。

我们需要一个类,一个纯 JavaScript 类,来充当数据的“中枢神经”。

class SensorBuffer {
    constructor() {
        // 使用 Map 存储数据,比数组快,Key 是 ID
        this.data = new Map();
        this.listeners = []; // 订阅者
    }

    // 批量更新数据
    update(dataBatch) {
        let changed = false;
        for (const sensor of dataBatch) {
            const current = this.data.get(sensor.id);
            // 只有数值变化了才标记,减少计算
            if (!current || Math.abs(current.value - sensor.value) > 0.1) {
                this.data.set(sensor.id, sensor);
                changed = true;
            }
        }

        if (changed) {
            this.notify();
        }
    }

    // 获取单个数据
    get(id) {
        return this.data.get(id);
    }

    // 订阅变化
    subscribe(callback) {
        this.listeners.push(callback);
        // 立即执行一次,避免白屏
        callback(this.data);
    }

    notify() {
        // 触发所有订阅者
        this.listeners.forEach(cb => cb(this.data));
    }
}

// 实例化
const sensorBuffer = new SensorBuffer();

2. 封装 WebSocket Hook

我们需要一个 Hook 来接管 WebSocket 的连接。

import { useEffect, useRef } from 'react';

export const useWebSocket = (url) => {
    const wsRef = useRef(null);
    const reconnectTimeoutRef = useRef(null);

    useEffect(() => {
        wsRef.current = new WebSocket(url);

        wsRef.current.onopen = () => {
            console.log('WebSocket 连接成功,准备接收数据洪流。');
            if (reconnectTimeoutRef.current) {
                clearTimeout(reconnectTimeoutRef.current);
            }
        };

        wsRef.current.onmessage = (event) => {
            try {
                const message = JSON.parse(event.data);
                if (message.type === 'batch_update') {
                    // 把数据丢给我们的 Buffer 处理
                    sensorBuffer.update(message.data);
                }
            } catch (e) {
                console.error('数据解析失败', e);
            }
        };

        wsRef.current.onclose = () => {
            console.log('连接断开,正在尝试重连...');
            reconnectTimeoutRef.current = setTimeout(() => {
                useWebSocket(url); // 递归重连
            }, 3000);
        };

        return () => {
            if (wsRef.current) wsRef.current.close();
            if (reconnectTimeoutRef.current) clearTimeout(reconnectTimeoutRef.current);
        };
    }, [url]);

    return wsRef.current;
};

3. 视图渲染:Canvas vs DOM

这是一个关键点。渲染 10,000 个 <div> 或者 <canvas> 元素,哪个性价比高?

如果你要用 React 渲染 10,000 个 DOM 节点,哪怕你用了 React.memo,React 的 Diff 算法也会在每帧跑完时累死。requestAnimationFrame 还没开始,React 已经算完了。

方案:使用 Canvas API。

Canvas 是基于像素的,它不在乎有多少个对象,它只在乎画什么。React 渲染一次 Canvas,然后由浏览器负责剩下的绘制工作。

import React, { useEffect, useRef } from 'react';

const IndustrialDashboard = () => {
    const canvasRef = useRef(null);
    // 缓存所有仪表盘的渲染配置(背景、刻度、颜色等)
    const meterConfigs = useRef({}); 

    // 订阅数据
    useEffect(() => {
        const unsubscribe = sensorBuffer.subscribe((allData) => {
            drawDashboard(allData);
        });
        return () => unsubscribe();
    }, []);

    const drawDashboard = (data) => {
        const canvas = canvasRef.current;
        if (!canvas) return;

        const ctx = canvas.getContext('2d');
        const width = canvas.width;
        const height = canvas.height;
        const meterHeight = 40;
        const meterWidth = 300;
        const gap = 20;

        // 清空画布
        ctx.clearRect(0, 0, width, height);

        // 遍历数据绘制
        let x = 20;
        let y = 20;
        const cols = Math.floor(width / (meterWidth + gap));

        let index = 0;
        for (const [id, sensor] of data) {
            if (index >= cols) {
                index = 0;
                x = 20;
                y += meterHeight + gap;
            }

            // 1. 绘制仪表盘背景
            ctx.fillStyle = '#222';
            ctx.fillRect(x, y, meterWidth, meterHeight);

            // 2. 绘制进度条(高亮部分)
            const percentage = sensor.value / 100;
            ctx.fillStyle = percentage > 0.8 ? '#ff4444' : (percentage > 0.5 ? '#ffff44' : '#44ff44');
            ctx.fillRect(x, y, meterWidth * percentage, meterHeight);

            // 3. 绘制文字
            ctx.fillStyle = '#fff';
            ctx.font = '12px Arial';
            ctx.fillText(`ID: ${id}`, x, y - 5);
            ctx.fillText(`${sensor.value.toFixed(1)}`, x + meterWidth - 40, y + 25);

            // 4. 存储配置以便后续可能的交互(比如鼠标悬停)
            meterConfigs.current[id] = { x, y, width: meterWidth, height: meterHeight };

            x += meterWidth + gap;
            index++;
        }
    };

    // 调整大小
    useEffect(() => {
        const handleResize = () => {
            if (canvasRef.current) {
                canvasRef.current.width = window.innerWidth;
                canvasRef.current.height = window.innerHeight;
            }
        };
        window.addEventListener('resize', handleResize);
        handleResize();
        return () => window.removeEventListener('resize', handleResize);
    }, []);

    return (
        <div style={{ background: '#000', width: '100vw', height: '100vh' }}>
            <canvas ref={canvasRef} />
        </div>
    );
};

export default IndustrialDashboard;

代码解析:
你看,我们没有调用任何 setState
drawDashboard 函数直接操作 DOM (ctx.fillRect)。这是原生的。React 只负责创建一次这个 Canvas 引用,然后把它交给我们的逻辑去统治。

这不仅仅快,这是神速。即使在 5G 网络延迟下,只要数据来了,Canvas 就会瞬间更新。

第四章:手写状态机——不仅仅是渲染

如果你觉得上面的 Canvas 方案太“低端”,觉得 React 必须得在里面,那我们再来点更高级的。

我们要实现一个自定义状态机。它负责维护每个传感器的“历史状态”。

为什么要有历史状态?
因为网络是有抖动的。如果服务器丢了一个包,如果你只有“当前值”,仪表盘就会瞬间跳变。这叫“视觉污染”。我们要做的“工业级”仪表盘,数据应该是平滑过渡的。

核心逻辑:插值

// 伪代码:在 SensorBuffer 类中添加插值逻辑
class SensorBuffer {
    constructor() {
        this.data = new Map();
        this.history = new Map(); // 记录历史趋势
    }

    update(newData) {
        for (const sensor of newData) {
            const current = this.data.get(sensor.id) || { value: sensor.value };

            // 计算差值
            const diff = sensor.value - current.value;

            // 如果变化量很小,直接赋值,避免动画过度
            if (Math.abs(diff) < 0.5) {
                this.data.set(sensor.id, sensor);
            } else {
                // 如果变化量大,触发一个“动画任务”
                // 我们把任务推入一个队列,由主循环处理
                this.pendingUpdates.push({
                    id: sensor.id,
                    start: current.value,
                    end: sensor.value,
                    timestamp: Date.now()
                });
                this.data.set(sensor.id, sensor); // 更新当前显示值
            }
        }
        this.processAnimations();
    }

    processAnimations() {
        // 这里可以是一个 rAF 循环,或者一个 setTimeout 队列
        // 简单演示:线性插值
        this.pendingUpdates.forEach((task, index) => {
            const duration = 300; // 300ms 动画
            const elapsed = Date.now() - task.timestamp;

            if (elapsed < duration) {
                const progress = elapsed / duration;
                // lerp 算法
                const currentValue = task.start + (task.end - task.start) * progress;
                // 更新 current 显示值,但不更新 target,这样动画结束就停住
                this.data.set(task.id, { ...task, value: currentValue });
            } else {
                this.pendingUpdates.splice(index, 1);
            }
        });
    }
}

通过这种方式,React(或者 Canvas)渲染的永远是趋势,而不是突变。这就给了用户一种“掌控感”,即使底层数据在疯狂跳动,UI 看起来依然是优雅的。

第五章:故障处理与心跳机制

工业环境不可靠。服务器可能会断电,网络会抖动。我们的系统必须具备“鲁棒性”。

1. 心跳检测

不要等 30 秒超时才知道服务器挂了。我们要每秒发一个 Ping。

后端:

// 在连接建立后
const pingInterval = setInterval(() => {
    if (ws.readyState === ws.OPEN) {
        ws.ping();
    }
}, 1000);

// 监听 pong
ws.on('pong', () => {
    // 心跳存活,重置定时器或记录健康状态
    ws.isAlive = true;
});

前端:

// 在 useWebSocket Hook 中
wsRef.current.on('pong', () => {
    wsRef.current.isAlive = true;
});

// 检查存活,如果 10 秒没收到 pong,就重连
setInterval(() => {
    if (wsRef.current && !wsRef.current.isAlive) {
        wsRef.current.terminate();
        // 触发重连逻辑
    }
}, 10000);

2. 断点续传

如果网络中断,重连后怎么办?你会看到一串乱码。

解决方案:断开期间的数据缓存。

class DataSnapshotStore {
    constructor() {
        this.snapshots = []; // 缓存最近 N 秒的数据
    }

    push(data) {
        this.snapshots.push({ data, time: Date.now() });
        // 保持最多 1 秒的数据
        this.snapshots = this.snapshots.filter(s => Date.now() - s.time < 1000);
    }

    getRecent() {
        return this.snapshots.length > 0 ? this.snapshots[this.snapshots.length - 1].data : null;
    }
}

// 连接重获时
wsRef.current.onopen = () => {
    // 1. 尝试从缓存恢复
    const recentData = dataStore.getRecent();
    if (recentData) {
        sensorBuffer.update(recentData); // 恢复现场
        // 2. 请求服务器发送断点前的数据(如果服务器支持)
        wsRef.current.send(JSON.stringify({ type: 'recover', lastId: recentData[recentData.length-1].id }));
    }
};

第六章:架构的终极形态

好了,现在我们有了:

  1. Express 原生 WebSocket 服务(轻量、极速)。
  2. 自定义 React Canvas 渲染(零开销)。
  3. 插值状态机(平滑动画)。
  4. 心跳与断点续传(高可用)。

但这还不够“超大规模”。如果 100 万个用户都在看这个仪表盘,单台服务器扛不住。

我们需要 Node.js 集群。

const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log(`Master ${process.pid} is running`);
    // Fork 机制
    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
} else {
    // 启动 Express 和 WebSocket
    const app = require('./app'); // 引入上面的逻辑
    app.listen(8080);
}

问题来了: 如果你有 4 个 Node 进程,客户端连接了进程 1,数据更新在进程 2,客户端怎么知道?
这就是 Redis Pub/Sub 大显身手的时候。

架构图(脑补):
[ 客户端 ] <—> [ Nginx 反向代理 ] <—> [ Node.js Cluster A ] <—> [ Redis ]
|
+–> [ Node.js Cluster B ] <—> [ Redis ]

逻辑是这样的:

  1. 客户端连上 Node A。
  2. 传感器数据更新。
  3. Node A 把数据发给 Redis。
  4. Redis 把数据广播给 Node A、Node B、Node C。
  5. Node A、Node B、Node C 收到数据,推送给各自的客户端。

第七章:性能优化的“黑魔法”

在最后,我必须传授几个压箱底的技巧。

1. 二进制协议

JSON 是文本,占用带宽大。工业数据通常就是数字。
123.45 变成二进制:0x40 0x5C 0x3E
如果你用 WebSocket 发送 10,000 个 float,用 JSON 大概占用几百 KB,用二进制可能只要几十 KB。对于低带宽环境,这能省下 80% 的流量,意味着 80% 的延迟降低。

2. 帧率控制

你不需要 60FPS。60FPS 对于数据展示来说太浪费了。
人的眼睛很难分辨超过 30FPS 的变化。而且 30FPS 能让你省下 CPU 资源去处理其他逻辑。
setInterval 里,把频率从 1000ms 降到 33ms(30FPS),你会发现整个系统顺滑得像黄油。

3. 内存泄漏的噩梦

在讲座快结束时,我要警告大家:不要在组件里缓存 WebSocket 实例!
如果你在一个组件里存了 ws,然后你把这个组件卸载了(比如切换路由),ws 就没有销毁。服务器那边还觉得这个连接活着,拼命发数据,但客户端不在乎,数据堆积在内存里,直到内存溢出。

正确的做法是:把 WebSocket 的管理逻辑抽离到一个 Provider 组件里,放在最外层,常驻内存。

结语

所以,朋友们,这就是我们今天的课程。

我们抛弃了沉重的 Redux,抛弃了臃肿的 Socket.io,甚至抛弃了纯 React 的 DOM 渲染。我们用原生的 TCP Socket 作为管道,用 Canvas 作为画笔,用自定义的逻辑作为大脑。

这就是超大规模工业仪表盘的真相:它不依赖库,它依赖对底层原理的深刻理解。数据同步不仅仅是把数据传过去,它是关于如何在混乱的信号中建立秩序,如何在网络抖动中保持冷静,如何在海量并发下维持优雅。

现在,去写代码吧,让那些服务器冒烟,让那些仪表盘疯狂跳动!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注