JS `Service Worker` `ReadableStream` 响应与 `TransformStream` 的链式处理

各位观众老爷,晚上好!我是你们的老朋友,今天咱们来聊聊 Service Worker 里那些“花里胡哨”但又贼有用的东西:ReadableStream 响应和 TransformStream 的链式处理。

一、Service Worker 与 ReadableStream 的“爱恨情仇”

首先,得明确一点,Service Worker 拦截请求后,它有权决定返回什么。它可以从缓存里捞,可以从网络上拿,当然,也可以自己“生”一个。而 ReadableStream,就是“生”数据的一种方式。

为啥要用 ReadableStream 呢?因为它可以让你一边接收数据,一边处理,一边往外吐。想想看,如果你要处理一个超大的文件,如果一股脑全塞到内存里,那不得炸了?而 ReadableStream 就像一个水龙头,一点一点地放水,你就可以一点一点地处理,内存占用嗖嗖地降。

举个栗子:从 Service Worker 返回一个简单的 ReadableStream

// service-worker.js
self.addEventListener('fetch', event => {
  if (event.request.url.endsWith('/stream')) {
    event.respondWith(
      new Response(
        new ReadableStream({
          start(controller) {
            controller.enqueue('Hello, ');
            controller.enqueue('World!');
            controller.close();
          }
        })
      )
    );
  }
});

这段代码的意思是:当浏览器请求 /stream 这个 URL 的时候,Service Worker 会拦截请求,然后返回一个 ReadableStream。这个 ReadableStream 会先吐出 "Hello, ",再吐出 "World!",最后关闭。

是不是很简单粗暴?

代码解释:

  • self.addEventListener('fetch', ...): 监听 fetch 事件,也就是浏览器发起请求的时候。
  • event.respondWith(...): 告诉浏览器,Service Worker 要接管这次请求的响应。
  • new Response(new ReadableStream(...)): 创建一个 Response 对象,响应体是一个 ReadableStream
  • ReadableStream({ start(controller) { ... } }): 创建一个 ReadableStreamstart 方法是流开始的时候执行的回调函数。
  • controller.enqueue(...): 向流中添加数据块。
  • controller.close(): 关闭流。

二、TransformStream:数据的“变形金刚”

TransformStream 是一个“变形金刚”,它可以把输入的数据转换成另一种形式的输出。它有一个 transform 方法,你可以往里面塞一些逻辑,来改变数据的样子。

TransformStream 的基本结构:

new TransformStream({
  start(controller) {
    // 初始化逻辑,例如发送一些头部信息
  },
  transform(chunk, controller) {
    // 处理每个数据块的逻辑
    controller.enqueue(transformedChunk); // 将处理后的数据块添加到输出流
  },
  flush(controller) {
    // 流结束时的清理逻辑,例如发送一些尾部信息
  }
});
  • start: 流开始时调用,可以用来初始化一些状态。
  • transform: 核心方法,每次接收到一个数据块时都会调用。你可以在这里对数据进行处理,然后通过 controller.enqueue 将处理后的数据块添加到输出流。
  • flush: 流结束时调用,可以用来做一些清理工作。

举个栗子:将字符串转换为大写

const uppercaseTransformStream = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

这个 TransformStream 非常简单,它接收一个字符串,然后把它转换成大写,再输出。

三、ReadableStream + TransformStream:链式处理的“完美搭档”

现在,我们把 ReadableStreamTransformStream 组合起来,实现一个链式处理的效果。

举个栗子:从 Service Worker 返回一个大写的 ReadableStream

// service-worker.js
self.addEventListener('fetch', event => {
  if (event.request.url.endsWith('/uppercase')) {
    const readableStream = new ReadableStream({
      start(controller) {
        controller.enqueue('hello, ');
        controller.enqueue('world!');
        controller.close();
      }
    });

    const uppercaseTransformStream = new TransformStream({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      }
    });

    event.respondWith(
      new Response(readableStream.pipeThrough(uppercaseTransformStream))
    );
  }
});

这段代码的意思是:当浏览器请求 /uppercase 这个 URL 的时候,Service Worker 会拦截请求,创建一个 ReadableStream,这个 ReadableStream 会吐出 "hello, " 和 "world!"。然后,我们把这个 ReadableStream 通过 pipeThrough 方法连接到一个 uppercaseTransformStream,这个 TransformStream 会把所有的数据转换成大写。最后,我们把这个链式处理的结果返回给浏览器。

代码解释:

  • readableStream.pipeThrough(uppercaseTransformStream): 将 readableStream 的输出作为 uppercaseTransformStream 的输入,形成一个管道。

浏览器接收到的结果将会是:

HELLO, WORLD!

四、更复杂的例子:Gzip 压缩

上面的例子太简单了,我们来一个更复杂的:Gzip 压缩。浏览器本身提供了一些 API 来进行压缩和解压缩,我们可以利用它们来实现一个 Gzip 压缩的 TransformStream

// service-worker.js

const createGzipTransformStream = () => {
  const compressionStream = new CompressionStream('gzip');
  return new TransformStream({
    start(controller) {
      // 将 CompressionStream 的 ReadableStream 传递给 TransformStream 的 controller
      compressionStream.readable.pipeTo(new WritableStream({
        write(chunk) {
          controller.enqueue(chunk);
        },
        close() {
          controller.close();
        },
        abort(error) {
          controller.error(error);
        }
      }));
    },
    transform(chunk) {
      // 将数据写入 CompressionStream
      compressionStream.writable.getWriter().then(writer => {
        writer.write(chunk).then(() => {
          writer.releaseLock();
        });
      });
    },
    flush() {
      // 关闭 CompressionStream
      compressionStream.writable.getWriter().then(writer => {
        writer.close().then(() => {
          writer.releaseLock();
        });
      });
    }
  });
};

self.addEventListener('fetch', event => {
  if (event.request.url.endsWith('/gzip')) {
    const data = 'This is a very long string that needs to be compressed.';
    const encoder = new TextEncoder();
    const dataArray = encoder.encode(data);

    const readableStream = new ReadableStream({
      start(controller) {
        controller.enqueue(dataArray);
        controller.close();
      }
    });

    const gzipTransformStream = createGzipTransformStream();

    event.respondWith(
      new Response(readableStream.pipeThrough(gzipTransformStream), {
        headers: { 'Content-Encoding': 'gzip' } // 告诉浏览器这是 gzip 压缩的数据
      })
    );
  }
});

代码解释:

  • CompressionStream('gzip'): 创建一个 Gzip 压缩流。
  • createGzipTransformStream(): 创建一个 TransformStream,用于将数据压缩成 Gzip 格式。
  • compressionStream.readable.pipeTo(...): 将 CompressionStream 的可读流通过 pipeTo 传递给一个 WritableStream,这个 WritableStream 会将数据写入 TransformStreamcontroller
  • compressionStream.writable.getWriter().then(...): 获取 CompressionStream 的写入器,然后将数据写入。
  • Content-Encoding: gzip: 告诉浏览器这是 Gzip 压缩的数据,浏览器会自动解压缩。

五、实战演练:流式上传

ReadableStreamTransformStream 不仅可以用于响应,还可以用于请求。我们可以利用它们来实现一个流式上传的功能。

前端代码:

<!DOCTYPE html>
<html>
<head>
  <title>Stream Upload</title>
</head>
<body>
  <input type="file" id="fileInput">
  <button id="uploadButton">Upload</button>

  <script>
    const fileInput = document.getElementById('fileInput');
    const uploadButton = document.getElementById('uploadButton');

    uploadButton.addEventListener('click', async () => {
      const file = fileInput.files[0];
      if (!file) {
        alert('Please select a file.');
        return;
      }

      const readableStream = file.stream();

      try {
        const response = await fetch('/upload', {
          method: 'POST',
          body: readableStream,
          headers: {
            'Content-Type': 'application/octet-stream',
            'Content-Length': file.size // 告诉服务器文件大小
          }
        });

        if (response.ok) {
          alert('Upload successful!');
        } else {
          alert('Upload failed: ' + response.status);
        }
      } catch (error) {
        alert('Upload failed: ' + error);
      }
    });
  </script>
</body>
</html>

Service Worker 代码:

// service-worker.js
self.addEventListener('fetch', event => {
  if (event.request.url.endsWith('/upload') && event.request.method === 'POST') {
    event.respondWith(handleUpload(event.request));
  }
});

async function handleUpload(request) {
  const reader = request.body.getReader();
  let receivedData = new Uint8Array();

  try {
    while (true) {
      const { done, value } = await reader.read();

      if (done) {
        console.log('Upload complete. Received data length:', receivedData.length);
        // 在这里你可以将 receivedData 发送到服务器或者进行其他处理
        // 例如:
        // const blob = new Blob([receivedData], { type: 'application/octet-stream' });
        // const formData = new FormData();
        // formData.append('file', blob, 'uploaded_file.bin');
        // await fetch('/api/save_file', { method: 'POST', body: formData });

        return new Response('Upload successful!', { status: 200 });
      }

      // 将接收到的数据追加到 receivedData
      const newReceivedData = new Uint8Array(receivedData.length + value.length);
      newReceivedData.set(receivedData);
      newReceivedData.set(value, receivedData.length);
      receivedData = newReceivedData;
    }
  } catch (error) {
    console.error('Upload failed:', error);
    return new Response('Upload failed!', { status: 500 });
  } finally {
    reader.releaseLock();
  }
}

代码解释:

  • 前端:

    • 获取文件对象,然后通过 file.stream() 方法创建一个 ReadableStream
    • ReadableStream 作为 fetchbody 发送到服务器。
    • 设置 Content-Typeapplication/octet-stream,表示这是一个二进制数据流。
    • 设置 Content-Length 为文件大小,告诉服务器期望接收多少数据。
  • Service Worker:

    • 通过 request.body.getReader() 获取 ReadableStream 的 reader。
    • 循环读取 ReadableStream 中的数据块,直到 done 为 true。
    • 将接收到的数据块拼接起来,形成一个完整的 Uint8Array
    • 上传完成后,你可以将 receivedData 发送到服务器或者进行其他处理。

六、一些需要注意的点

  • 错误处理:ReadableStreamTransformStream 中,一定要注意错误处理。如果发生错误,要及时通过 controller.error 将错误传递出去。
  • 背压 (Backpressure): 当输出流的处理速度慢于输入流的产生速度时,就会产生背压。你需要正确处理背压,避免内存溢出。ReadableStream 提供了 cancel 方法来处理背压。
  • 兼容性: ReadableStreamTransformStream 是比较新的 API,有些浏览器可能不支持。你需要做好兼容性处理,可以使用 polyfill。

七、总结

ReadableStreamTransformStream 是 Service Worker 中非常有用的工具,它们可以让你更灵活地处理数据,提高应用的性能。掌握它们,你的 Service Worker 就能“上天入地,无所不能”!

表格总结:

特性 ReadableStream TransformStream
功能 提供一个可以被读取的数据流。 提供一个转换数据流的机制,可以改变数据的形式。
用途 创建自定义的响应体,流式上传,处理大型文件等。 数据压缩,数据格式转换,数据加密解密等。
主要方法 start(controller), cancel(reason) start(controller), transform(chunk, controller), flush(controller)
背压处理 需要手动处理背压,通过 cancel 方法来停止数据的产生。 自动处理背压,当输出流的处理速度慢于输入流的产生速度时,会自动暂停输入流的读取。
链式处理 可以通过 pipeThrough 方法连接多个 TransformStream 本身就是为了链式处理而设计的。
适用场景 需要控制数据产生的速度和方式,或者需要从一个数据源读取数据并进行处理。 需要对数据进行转换,例如压缩、解压缩、加密、解密、格式转换等。
核心思想 异步、分块处理数据,避免一次性加载大量数据到内存中。 通过管道的方式连接多个流,实现数据的流水线式处理。
关键 API ReadableStream, ReadableStreamController TransformStream, TransformStreamDefaultController
与 Response 关系 可以作为 Response 对象的 body。 不能直接作为 Response 对象的 body,需要通过 pipeThrough 连接到 ReadableStream
与 Request 关系 可以作为 Request 对象的 body (流式上传)。 不直接用于 Request,主要用于处理数据流,例如在上传前对数据进行压缩。

好了,今天的讲座就到这里。希望大家能够掌握 ReadableStreamTransformStream 的使用,让你的 Service Worker 更加强大!下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注