JavaScript内核与高级编程之:`Node.js`的流(`Stream`):`Readable`、`Writable`和`Transform`的实现。

各位靓仔靓女,晚上好!我是你们今晚的流媒体大师(自封的)。今天咱们聊聊 Node.js 的 Stream,这玩意儿听起来高大上,其实也没那么玄乎,说白了就是处理数据的管道。想象一下,你家自来水管,水龙头拧开,水哗啦啦地来,这水就是数据,水管就是 Stream。

这次咱们要深入到 ReadableWritableTransform 这些“水管”的内部,看看它们是怎么工作的,以及怎么DIY一个属于你自己的“高级定制水管”。

一、Stream 概念:数据的涓涓细流

在Node.js的世界里,Stream就像一条潺潺流淌的小溪,数据不再是一次性加载到内存中,而是像溪水一样,一点一点地流过。这对于处理大文件、网络数据或者需要实时处理的数据来说,简直是救星。

为什么需要Stream? 想象一下,如果你要读取一个 10GB 的大文件,一次性加载到内存,你的电脑可能会直接罢工。而Stream 可以将文件分成小块,逐个读取,处理完一块再读取下一块,内存压力大大减轻。

Stream 的优势:

  • 内存效率: 避免一次性加载大量数据,节省内存。
  • 时间效率: 可以边读取边处理,无需等待所有数据加载完成。
  • 可组合性: 可以将多个 Stream 连接起来,形成复杂的数据处理流程。

二、Stream 的分类:三大水管

Node.js 的 Stream 主要分为四种类型,分别是 Readable (可读流)、Writable (可写流)、Duplex (双工流) 和 Transform (转换流)。 Duplex 相当于同时拥有 ReadableWritable 的能力, 而 TransformDuplex 的基础上增加了数据转换的能力。 咱们重点关注最常用的 ReadableWritableTransform

  1. Readable (可读流): 就像一个水龙头,负责生产数据。你可以从 Readable 流中读取数据。比如,从文件中读取数据,或者从网络请求中接收数据。

  2. Writable (可写流): 就像一个水槽,负责接收数据。你可以将数据写入 Writable 流。比如,将数据写入文件,或者发送到网络请求。

  3. Transform (转换流): 就像一个净水器,负责转换数据。它可以读取数据,进行处理,然后将处理后的数据写入另一个流。比如,压缩数据,加密数据,或者格式化数据。

用表格总结一下:

类型 功能 比喻 例子
Readable 生产数据 水龙头 fs.createReadStream(), http.IncomingMessage
Writable 接收数据 水槽 fs.createWriteStream(), http.ServerResponse
Transform 转换数据 净水器 zlib.createGzip(), crypto.createCipher()

三、Readable Stream:让数据流起来

Readable Stream 的核心在于 _read() 方法。 你需要重写这个方法来告诉 Stream 如何产生数据。

1. 创建一个简单的 Readable Stream:

咱们先来创建一个简单的 Readable Stream,模拟从数组中读取数据。

const { Readable } = require('stream');

class MyReadable extends Readable {
  constructor(options, data) {
    super(options);
    this.data = data;
    this.index = 0;
  }

  _read(size) {
    if (this.index >= this.data.length) {
      this.push(null); // 告诉 Stream 没有更多数据了
      return;
    }

    const chunk = this.data[this.index++];
    this.push(chunk); // 将数据推入 Stream
  }
}

// 使用示例
const data = ['hello', 'world', '!'];
const myReadable = new MyReadable({ encoding: 'utf8' }, data);

myReadable.on('data', (chunk) => {
  console.log('Received:', chunk);
});

myReadable.on('end', () => {
  console.log('End of stream');
});

myReadable.on('error', (err) => {
  console.error('Error:', err);
});

代码解释:

  • MyReadable 类继承自 Readable
  • constructor 接收一个数据数组,并初始化 index
  • _read(size) 方法是核心,它负责从 data 数组中读取数据,并通过 this.push() 方法将数据推入 Stream。
  • this.push(null) 表示没有更多数据了,Stream 将会结束。
  • myReadable.on('data', ...) 监听 ‘data’ 事件,当 Stream 中有数据时,会触发这个事件。
  • myReadable.on('end', ...) 监听 ‘end’ 事件,当 Stream 结束时,会触发这个事件。
  • myReadable.on('error', ...) 监听 ‘error’ 事件,当 Stream 发生错误时,会触发这个事件。

2. _read(size) 方法的 size 参数:

_read(size) 方法接收一个 size 参数,它表示 Stream 期望读取的数据的大小。 但是,你可以忽略这个参数,一次性推送任意大小的数据。 Stream 会自动处理背压(backpressure,稍后会讲到)。

3. 两种读取模式:flowing 和 paused

Readable Stream 有两种读取模式:flowing 模式和 paused 模式。

  • flowing 模式: 这是默认模式。 当你监听 ‘data’ 事件时,Stream 会自动开始读取数据,并将数据推送到监听器。

  • paused 模式: 在 paused 模式下,你需要手动调用 read() 方法来读取数据。

// paused 模式示例
const myReadable = new MyReadable({ encoding: 'utf8' }, data);

myReadable.on('readable', () => {
  let chunk;
  while ((chunk = myReadable.read()) !== null) {
    console.log('Received:', chunk);
  }
});

myReadable.on('end', () => {
  console.log('End of stream');
});

myReadable.on('error', (err) => {
  console.error('Error:', err);
});

代码解释:

  • 我们监听 ‘readable’ 事件,当 Stream 中有数据可读时,会触发这个事件。
  • 我们使用 myReadable.read() 方法来读取数据。如果 read() 方法返回 null,表示没有更多数据了。
  • myReadable.read() 方法可以接收一个 size 参数,表示期望读取的数据的大小。

四、Writable Stream:将数据写进去

Writable Stream 的核心在于 _write() 方法。 你需要重写这个方法来告诉 Stream 如何处理接收到的数据。

1. 创建一个简单的 Writable Stream:

咱们来创建一个简单的 Writable Stream,模拟将数据写入数组。

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    this.data = [];
  }

  _write(chunk, encoding, callback) {
    this.data.push(chunk.toString()); // 将 Buffer 转换为字符串
    callback(); // 告诉 Stream 数据已处理完毕
  }

  _final(callback) {
    console.log('All data received:', this.data);
    callback(); // 告诉 Stream 写入完成
  }
}

// 使用示例
const myWritable = new MyWritable({ decodeStrings: false });

myWritable.write('hello', (err) => {
  if (err) {
    console.error('Error writing:', err);
  } else {
    console.log('Wrote "hello"');
  }
});

myWritable.write('world', (err) => {
  if (err) {
    console.error('Error writing:', err);
  } else {
    console.log('Wrote "world"');
  }
});

myWritable.end('!', () => {
  console.log('Finished writing');
});

myWritable.on('error', (err) => {
  console.error('Error:', err);
});

代码解释:

  • MyWritable 类继承自 Writable
  • constructor 初始化一个空数组 data
  • _write(chunk, encoding, callback) 方法是核心,它接收三个参数:
    • chunk: 接收到的数据块 (Buffer)。
    • encoding: 数据的编码方式。
    • callback: 一个回调函数,必须调用它来告诉 Stream 数据已处理完毕。
  • _final(callback) 方法在所有数据写入完成后被调用。
  • myWritable.write(chunk, callback) 方法用于向 Stream 写入数据。
  • myWritable.end(chunk, callback) 方法用于结束 Stream 的写入。 可以传入最后一块数据。

2. _write(chunk, encoding, callback) 的重要性:

_write 方法中的 callback 函数非常重要。 你必须在处理完数据后调用 callback,否则 Stream 会认为你还没有处理完数据,会一直等待,导致程序卡住。

3. decodeStrings 选项:

在创建 Writable Stream 时,可以设置 decodeStrings 选项。 如果 decodeStrings 设置为 falsechunk 参数将会是 Buffer 类型。 如果设置为 true(默认值),chunk 参数将会是字符串类型。

五、Transform Stream:数据的变形金刚

Transform Stream 结合了 Readable Stream 和 Writable Stream 的功能,可以读取数据,进行处理,然后将处理后的数据写入另一个流。 它就像一个数据的变形金刚,可以把一种数据变成另一种数据。

1. 创建一个简单的 Transform Stream:

咱们来创建一个简单的 Transform Stream,模拟将数据转换为大写。

const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  constructor(options) {
    super(options);
  }

  _transform(chunk, encoding, callback) {
    const transformedChunk = chunk.toString().toUpperCase();
    this.push(transformedChunk); // 将转换后的数据推入 Stream
    callback(); // 告诉 Stream 数据已处理完毕
  }
}

// 使用示例
const uppercaseTransform = new UppercaseTransform();

uppercaseTransform.on('data', (chunk) => {
  console.log('Transformed:', chunk);
});

uppercaseTransform.on('end', () => {
  console.log('Transformation complete');
});

uppercaseTransform.write('hello');
uppercaseTransform.write('world');
uppercaseTransform.end();

代码解释:

  • UppercaseTransform 类继承自 Transform
  • _transform(chunk, encoding, callback) 方法是核心,它接收三个参数:
    • chunk: 接收到的数据块 (Buffer)。
    • encoding: 数据的编码方式。
    • callback: 一个回调函数,必须调用它来告诉 Stream 数据已处理完毕。
  • _transform 方法将接收到的数据转换为大写,并通过 this.push() 方法将转换后的数据推入 Stream。

2. _flush(callback) 方法:

Transform Stream 还可以定义一个 _flush(callback) 方法。 这个方法在所有数据都经过 _transform 方法处理后被调用。 你可以在 _flush 方法中进行一些最终的处理,例如,添加一些结尾标记。

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    this.total = 0;
  }

  _transform(chunk, encoding, callback) {
    const num = parseInt(chunk.toString());
    this.total += num;
    callback();
  }

  _flush(callback) {
    this.push(`Total: ${this.total}`);
    callback();
  }
}

const myTransform = new MyTransform();

myTransform.on('data', (chunk) => {
  console.log('Result:', chunk);
});

myTransform.write('1');
myTransform.write('2');
myTransform.write('3');
myTransform.end();

六、管道 (Piping):连接 Stream 的魔法

管道 (Piping) 是将一个 Readable Stream 的输出连接到另一个 Writable Stream 的输入的一种简单方式。 它使用 pipe() 方法来实现。

readableStream.pipe(writableStream);

1. 使用管道连接多个 Stream:

const fs = require('fs');
const zlib = require('zlib');

// 创建一个 Readable Stream,从文件中读取数据
const fileReadStream = fs.createReadStream('input.txt');

// 创建一个 Transform Stream,使用 gzip 压缩数据
const gzipStream = zlib.createGzip();

// 创建一个 Writable Stream,将压缩后的数据写入文件
const fileWriteStream = fs.createWriteStream('output.gz');

// 使用管道连接 Stream
fileReadStream.pipe(gzipStream).pipe(fileWriteStream);

fileWriteStream.on('finish', () => {
  console.log('Compression complete');
});

fileWriteStream.on('error', (err) => {
  console.error('Error:', err);
});

代码解释:

  • 我们使用 fs.createReadStream() 创建一个 Readable Stream,从 input.txt 文件中读取数据。
  • 我们使用 zlib.createGzip() 创建一个 Transform Stream,使用 gzip 算法压缩数据。
  • 我们使用 fs.createWriteStream() 创建一个 Writable Stream,将压缩后的数据写入 output.gz 文件。
  • 我们使用 pipe() 方法将这三个 Stream 连接起来,形成一个数据处理管道。

2. 管道的优点:

  • 简洁: 使用 pipe() 方法可以轻松地连接多个 Stream,代码简洁易懂。
  • 自动处理背压: 管道会自动处理背压,避免数据溢出。

七、背压 (Backpressure):控制数据流速

背压是指当一个 Stream 的数据生产速度超过了数据消费速度时,数据消费方会向数据生产方发出信号,要求降低生产速度。 Stream 内部会自动处理背压,保证数据的稳定传输。

1. 背压的原理:

当 Writable Stream 的内部缓冲区满了时,write() 方法会返回 false。 这表示 Writable Stream 暂时无法接收更多的数据。 Readable Stream 应该监听 Writable Stream 的 drain 事件,当 drain 事件触发时,表示 Writable Stream 的内部缓冲区已空,可以继续写入数据。

2. 手动处理背压:

const { Readable, Writable } = require('stream');

class SlowWritable extends Writable {
  constructor(options) {
    super(options);
  }

  _write(chunk, encoding, callback) {
    // 模拟缓慢的写入过程
    setTimeout(() => {
      console.log('Writing:', chunk.toString());
      callback();
    }, 100);
  }
}

const readable = new Readable({
  read() {
    this.push('A');
    this.push('B');
    this.push('C');
    this.push(null); // End of stream
  }
});

const writable = new SlowWritable();

readable.on('data', (chunk) => {
  if (!writable.write(chunk)) {
    console.log('Backpressure: Pausing readable stream');
    readable.pause();
    writable.once('drain', () => {
      console.log('Backpressure relieved: Resuming readable stream');
      readable.resume();
    });
  }
});

readable.on('end', () => {
  writable.end();
});

代码解释:

  • SlowWritable 模拟了一个写入速度很慢的 Writable Stream。
  • readable.on('data', ...) 中,我们检查 writable.write() 方法的返回值。 如果返回 false,表示 Writable Stream 无法接收更多的数据,我们需要暂停 Readable Stream。
  • 我们监听 Writable Stream 的 drain 事件,当 drain 事件触发时,表示 Writable Stream 的内部缓冲区已空,我们可以恢复 Readable Stream。

八、Stream 的错误处理:

Stream 的错误处理非常重要。 如果在 Stream 的任何阶段发生错误,都应该及时处理,避免程序崩溃。

1. 监听 ‘error’ 事件:

readableStream.on('error', (err) => {
  console.error('Readable stream error:', err);
});

writableStream.on('error', (err) => {
  console.error('Writable stream error:', err);
});

transformStream.on('error', (err) => {
  console.error('Transform stream error:', err);
});

2. 在 _read_write_transform 方法中处理错误:

如果在 _read_write_transform 方法中发生错误,应该调用 callback 函数,并传入一个 Error 对象。

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    try {
      // 处理数据
      // ...
      callback();
    } catch (err) {
      callback(err); // 传递错误信息
    }
  }
}

总结:

今天咱们深入了解了 Node.js 的 Stream,包括 ReadableWritableTransform 三种类型,以及管道和背压的概念。 掌握 Stream 的原理和使用方法,可以让你编写出更高效、更稳定的 Node.js 程序。 希望今天的课程对你有所帮助!

下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注