JS `MessageChannel` 在 Web Worker 间实现双向 `Streaming`

各位靓仔靓女,大家好!我是你们的老朋友,今天咱们来聊点刺激的,关于如何在 Web Worker 之间玩转双向 Streaming,让数据像瀑布一样倾泻而下!

开场白:为啥要搞这么复杂?

首先,咱们得搞清楚,为啥要用 Web Worker?答案很简单:为了不卡主线程!如果你的 JavaScript 代码里有大量计算或者耗时操作,一股脑儿扔到主线程里,浏览器就会卡成 PPT。Web Worker 就像一个独立的房间,你可以把脏活累活扔给它,主线程就能腾出手来处理用户交互,提升用户体验。

那为啥还要搞 Streaming?想象一下,你要从 Worker 传一个巨大的 JSON 数据给主线程,如果一次性全部加载到内存,再一股脑儿地发送过去,内存占用会飙升,而且主线程也得等全部数据接收完毕才能开始处理。Streaming 就像水管一样,数据可以分批次地、源源不断地传输,边接收边处理,大大提高效率,降低延迟。

主角登场:MessageChannel

MessageChannel 是一个非常强大的 API,它允许你在不同的执行上下文(比如主线程和 Worker)之间建立一个双向通信通道。它有两个端口:port1port2。你可以把 port1 扔给 Worker,port2 留在主线程,然后它们就可以愉快地互通消息了。

第一幕:搭建舞台(基础代码)

首先,我们创建一个 worker.js 文件,这是我们的 Worker 脚本:

// worker.js
self.addEventListener('message', (event) => {
  if (event.data.type === 'init') {
    const port = event.ports[0]; // 从事件中拿到主线程传过来的 port
    console.log('Worker: 接收到 port', port);

    // 处理接收到的消息
    port.onmessage = (messageEvent) => {
      console.log('Worker: 接收到主线程的消息', messageEvent.data);

      // 模拟处理数据,然后发送回去
      const processedData = `Worker processed: ${messageEvent.data}`;
      port.postMessage(processedData);
    };

    // 发送初始化完成的消息
    port.postMessage('Worker initialized!');
  }
});

然后,在主线程的 JavaScript 文件中,我们创建一个 Worker 实例,并建立 MessageChannel:

// main.js
const worker = new Worker('worker.js');

const channel = new MessageChannel();
const port1 = channel.port1;
const port2 = channel.port2;

// 监听 port1 的消息
port1.onmessage = (event) => {
  console.log('Main: 接收到 Worker 的消息', event.data);
};

// 启动 Worker 并发送 port2
worker.postMessage({ type: 'init' }, [port2]); // 注意:port2 需要通过 transferables 传递

// 发送消息给 Worker
port1.postMessage('Hello from Main!');

这段代码建立了主线程和 Worker 之间的基本通信。主线程创建了一个 MessageChannel,并将 port2 通过 transferables (第二个参数 [port2]) 发送给 Worker。Worker 接收到 port,就可以通过这个 port 与主线程进行通信了。

敲黑板:什么是 transferables?

transferablespostMessage 的一个高级特性。默认情况下,postMessage 会对数据进行拷贝,这意味着会消耗额外的内存和 CPU 资源。但是,对于某些类型的数据(比如 ArrayBuffer),你可以通过 transferables 将数据的所有权直接转移给接收方,而无需进行拷贝。这可以显著提高性能。

在这个例子中,我们将 port2 的所有权转移给了 Worker,这意味着主线程不能再直接使用 port2 了(虽然还可以持有引用,但尝试使用它会导致错误)。

第二幕:Streaming 的奥义

现在,我们来玩点真格的,实现双向 Streaming。核心思想是:将大的数据分割成小的 chunk,然后逐个发送。

Worker 侧:生成数据流

假设 Worker 需要生成一个很大的数据流,比如一个很长的字符串:

// worker.js
self.addEventListener('message', (event) => {
  if (event.data.type === 'stream') {
    const port = event.ports[0];
    console.log('Worker: 接收到 streaming port', port);

    const chunkSize = 1024; // 每个 chunk 的大小
    const totalSize = 1024 * 1024 * 10; // 总大小,10MB
    let offset = 0;

    const sendNextChunk = () => {
      if (offset >= totalSize) {
        console.log('Worker: 数据发送完毕');
        port.postMessage({ type: 'end' }); // 发送结束信号
        return;
      }

      const chunk = `Chunk ${offset / chunkSize}: `.repeat(chunkSize / 10); // 生成一个 chunk
      port.postMessage({ type: 'data', data: chunk });
      offset += chunkSize;

      // 模拟异步发送,防止阻塞 Worker
      setTimeout(sendNextChunk, 10);
    };

    sendNextChunk(); // 开始发送数据
  }
});

这段代码模拟生成一个 10MB 的数据流,并将其分割成 1KB 的 chunk,然后通过 port.postMessage 逐个发送给主线程。setTimeout 用于模拟异步发送,防止阻塞 Worker。

主线程侧:消费数据流

主线程需要监听 port 的消息,并处理接收到的 chunk:

// main.js
const startStreaming = () => {
  const channel = new MessageChannel();
  const port1 = channel.port1;
  const port2 = channel.port2;

  let receivedData = '';

  port1.onmessage = (event) => {
    if (event.data.type === 'data') {
      receivedData += event.data.data;
      console.log('Main: 接收到数据 chunk', event.data.data.length, 'bytes');
    } else if (event.data.type === 'end') {
      console.log('Main: 数据接收完毕,总大小', receivedData.length, 'bytes');
      // 在这里可以对接收到的数据进行处理
    }
  };

  worker.postMessage({ type: 'stream' }, [port2]);
};

// 在某个事件触发时开始 streaming
document.getElementById('startStreamButton').addEventListener('click', startStreaming);

这段代码监听 port1 的消息,如果接收到 data 类型的消息,就将其添加到 receivedData 中。如果接收到 end 类型的消息,就表示数据流已经结束,可以在这里对 receivedData 进行处理。

第三幕:进阶技巧

  • Backpressure: 如果主线程处理数据的速度比 Worker 发送数据的速度慢,就会出现 Backpressure。为了避免内存溢出,你需要实现 Backpressure 机制,比如让 Worker 在接收到主线程的 "ready" 信号后才发送下一个 chunk。
  • 错误处理: 在 Streaming 过程中,可能会出现各种错误,比如网络错误、数据损坏等。你需要添加适当的错误处理机制,保证程序的健壮性。
  • 数据格式: Streaming 的数据格式不一定是字符串,也可以是 ArrayBuffer 或其他类型的数据。你需要根据实际情况选择合适的数据格式。
  • Transform Streams: Transform Streams API 提供了一种更优雅的方式来处理数据流的转换和处理。你可以使用 Transform Streams 来对接收到的数据进行解码、压缩、加密等操作。

代码示例:Backpressure

Worker 侧:

// worker.js
self.addEventListener('message', (event) => {
  if (event.data.type === 'stream') {
    const port = event.ports[0];
    console.log('Worker: 接收到 streaming port', port);

    const chunkSize = 1024;
    const totalSize = 1024 * 1024 * 10;
    let offset = 0;
    let isReady = true; // 是否准备好发送下一个 chunk

    port.onmessage = (messageEvent) => {
      if (messageEvent.data.type === 'ready') {
        isReady = true;
        sendNextChunk();
      }
    };

    const sendNextChunk = () => {
      if (!isReady) {
        return; // 等待主线程准备好
      }

      if (offset >= totalSize) {
        console.log('Worker: 数据发送完毕');
        port.postMessage({ type: 'end' });
        return;
      }

      const chunk = `Chunk ${offset / chunkSize}: `.repeat(chunkSize / 10);
      port.postMessage({ type: 'data', data: chunk });
      offset += chunkSize;
      isReady = false; // 设置为未准备好

      // 模拟异步发送,防止阻塞 Worker
      setTimeout(() => {
          if(isReady) return;
          port.postMessage({type:'needReady'});
      },1000);
    };

    sendNextChunk(); // 开始发送数据
  }
});

主线程侧:

// main.js
const startStreaming = () => {
  const channel = new MessageChannel();
  const port1 = channel.port1;
  const port2 = channel.port2;

  let receivedData = '';

  port1.onmessage = (event) => {
    if (event.data.type === 'data') {
      receivedData += event.data.data;
      console.log('Main: 接收到数据 chunk', event.data.data.length, 'bytes');

      // 处理完数据后,发送 "ready" 信号
      setTimeout(() => {
        port1.postMessage({ type: 'ready' });
      }, 500); // 模拟处理时间
    } else if (event.data.type === 'end') {
      console.log('Main: 数据接收完毕,总大小', receivedData.length, 'bytes');
      // 在这里可以对接收到的数据进行处理
    } else if(event.data.type === 'needReady'){
        console.log("Worker need ready")
        port1.postMessage({ type: 'ready' });
    }
  };

  worker.postMessage({ type: 'stream' }, [port2]);
  port1.postMessage({ type: 'ready' }); // 首次发送 ready 信号
};

// 在某个事件触发时开始 streaming
document.getElementById('startStreamButton').addEventListener('click', startStreaming);

在这个例子中,主线程在接收到每个 chunk 后,模拟一个处理时间(500ms),然后发送一个 "ready" 信号给 Worker。Worker 只有在接收到 "ready" 信号后才会发送下一个 chunk。这样就实现了一个简单的 Backpressure 机制。

总结:MessageChannel + Streaming = 性能起飞!

通过 MessageChannel,我们可以在 Web Worker 之间建立双向通信通道。结合 Streaming 技术,我们可以高效地传输大量数据,避免阻塞主线程,提升用户体验。

表格:MessageChannel 的优缺点

特性 优点 缺点
双向通信 允许主线程和 Worker 之间进行双向通信,方便灵活。 需要手动管理端口,代码相对繁琐。
Transferables 支持 Transferables,可以避免数据拷贝,提高性能。 Transferables 会转移数据的所有权,需要注意使用。
Streaming 可以将大量数据分割成小的 chunk 进行传输,避免阻塞主线程,降低内存占用。 需要手动实现 Streaming 逻辑,比如分片、Backpressure 等。
兼容性 现代浏览器都支持 MessageChannel。 老旧浏览器可能不支持,需要进行兼容性处理。
错误处理 需要手动添加错误处理机制,保证程序的健壮性。 如果不进行错误处理,可能会导致程序崩溃。
复杂性 相比简单的 postMessage,MessageChannel 的代码更复杂,需要理解端口的概念。 对于简单的通信场景,可能显得过于复杂。

温馨提示:

  • 在实际开发中,你需要根据具体的业务场景选择合适的 Streaming 大小。
  • Backpressure 机制可以有效防止内存溢出,但也会增加代码的复杂性。
  • Transform Streams API 可以简化数据流的处理,但也会增加学习成本。

希望今天的分享对大家有所帮助!下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注