Node.js 中的 stream (流) 有哪些类型?它们如何实现大文件的读写和数据处理?

各位观众老爷,晚上好!欢迎来到今天的 Node.js 流水线主题讲座。今天咱们不讲虚的,直接上干货,聊聊 Node.js 的 stream,这玩意儿可是处理大文件的神器,能让你的服务器在面对海量数据时,依然坚挺如磐石。

一、Stream 是个啥?为啥需要它?

首先,我们得搞清楚 stream 到底是个什么东西。想象一下,你正在用迅雷下载一部 10G 的电影。如果你必须等到整个文件全部下载完成才能开始观看,那得等到猴年马月? stream 就像一个水管,数据像水一样,可以源源不断地流过来,你一边接收一边看,不用等全部下载完。

在 Node.js 中,stream 是一种处理流式数据的抽象接口。它允许你以片段的方式读取或写入数据,而不是一次性将整个文件加载到内存中。这对于处理大型文件、网络请求、视频流等场景非常有用。

为什么需要 stream? 简单来说,没有 stream,你只能:

  • 一次性加载整个文件: 想象一下,你要读取一个 5G 的日志文件,没有 stream,你需要先把这 5G 的数据全部加载到内存中,才能开始处理。这简直就是内存杀手!
  • 阻塞 I/O 操作: 传统的文件读取操作是阻塞的,也就是说,在读取完成之前,你的程序会一直卡在那里,啥也干不了。

而 stream 可以解决这些问题:

  • 内存效率: stream 每次只读取一小部分数据,然后进行处理,释放内存,然后再读取下一部分。这样可以大大降低内存占用。
  • 非阻塞 I/O: stream 使用异步操作,不会阻塞你的程序。你可以一边读取数据,一边做其他事情。

二、Stream 的四大金刚:四种类型

Node.js 提供了四种基本的 stream 类型,它们就像四个身怀绝技的武林高手,各有所长:

Stream 类型 功能描述 常见应用场景
Readable 用于读取数据的 stream。你可以从中读取数据。 读取文件,发起 HTTP 请求,从数据库读取数据。
Writable 用于写入数据的 stream。你可以向其中写入数据。 写入文件,发送 HTTP 响应,向数据库写入数据。
Duplex 既可以读取,又可以写入的 stream。 网络套接字 (sockets),加密/解密操作。
Transform 一种特殊的 Duplex stream,它可以在读取和写入数据的同时,对数据进行转换。 数据压缩/解压缩,加密/解密,数据格式转换 (例如,将 CSV 转换为 JSON)。

三、代码实战:四大金刚的使用姿势

光说不练假把式,接下来我们用代码来演示这四大金刚的使用方法。

1. Readable stream (读取流)

我们先来创建一个 Readable stream,从一个文件中读取数据:

const fs = require('fs');
const { Readable } = require('stream');

// 创建一个 Readable stream
const myReadableStream = fs.createReadStream('large_file.txt', { highWaterMark: 64 * 1024 }); // 64KB 的缓冲区

// 监听 data 事件,当有数据可读取时触发
myReadableStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  // 在这里处理数据,例如,将数据写入到另一个文件
});

// 监听 end 事件,当所有数据都读取完毕时触发
myReadableStream.on('end', () => {
  console.log('Finished reading the file.');
});

// 监听 error 事件,当发生错误时触发
myReadableStream.on('error', (err) => {
  console.error('An error occurred:', err);
});

//可选,暂停读取流,稍后继续
//myReadableStream.pause();
//setTimeout(() => myReadableStream.resume(), 5000);

在这个例子中,我们使用了 fs.createReadStream 方法创建了一个 Readable stream,它会从 large_file.txt 文件中读取数据。highWaterMark 选项指定了内部缓冲区的大小,也就是说,每次最多读取 64KB 的数据。

我们监听了 data 事件,当有数据可读取时,会触发这个事件。在事件处理函数中,我们可以对数据进行处理,例如,将数据写入到另一个文件中。

我们还监听了 end 事件,当所有数据都读取完毕时,会触发这个事件。error 事件用于处理错误。

2. Writable stream (写入流)

接下来,我们创建一个 Writable stream,将数据写入到一个文件中:

const fs = require('fs');
const { Writable } = require('stream');

// 创建一个 Writable stream
const myWritableStream = fs.createWriteStream('output.txt');

// 监听 drain 事件,当缓冲区为空时触发
myWritableStream.on('drain', () => {
  console.log('Data has been drained.');
  // 可以在这里继续写入数据
});

// 监听 finish 事件,当所有数据都写入完毕时触发
myWritableStream.on('finish', () => {
  console.log('Finished writing to the file.');
});

// 监听 error 事件,当发生错误时触发
myWritableStream.on('error', (err) => {
  console.error('An error occurred:', err);
});

// 写入数据
myWritableStream.write('This is the first line.n');
myWritableStream.write('This is the second line.n');
myWritableStream.write('This is the third line.n');

// 标记写入完成
myWritableStream.end();

在这个例子中,我们使用了 fs.createWriteStream 方法创建了一个 Writable stream,它会将数据写入到 output.txt 文件中。

我们使用了 write 方法来写入数据。end 方法用于标记写入完成。

我们还监听了 drain 事件,当缓冲区为空时,会触发这个事件。当写入速度超过读取速度时,缓冲区可能会被填满。这时,write 方法会返回 false,表示需要等待缓冲区为空才能继续写入。drain 事件就是用来通知我们可以继续写入数据的。

3. Duplex stream (双工流)

Duplex stream 既可以读取,又可以写入。一个常见的例子是网络套接字 (sockets)。

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.data = [];
  }

  _read(size) {
    // 从内部数据源读取数据
    const chunk = this.data.shift();
    if (chunk) {
      this.push(chunk);
    } else {
      this.push(null); // 表示没有更多数据了
    }
  }

  _write(chunk, encoding, callback) {
    // 将数据写入到内部数据源
    this.data.push(chunk);
    console.log(`Received chunk: ${chunk.toString()}`);
    callback(); // 表示写入完成
  }
}

// 创建一个 Duplex stream
const myDuplexStream = new MyDuplex();

// 监听 data 事件,从流中读取数据
myDuplexStream.on('data', (chunk) => {
  console.log(`Read from stream: ${chunk.toString()}`);
});

// 向流中写入数据
myDuplexStream.write('Hello, ');
myDuplexStream.write('World!');
myDuplexStream.end();

在这个例子中,我们创建了一个自定义的 Duplex stream。_read 方法用于从内部数据源读取数据,_write 方法用于将数据写入到内部数据源。

4. Transform stream (转换流)

Transform stream 是一种特殊的 Duplex stream,它可以在读取和写入数据的同时,对数据进行转换。例如,我们可以创建一个 Transform stream,将数据转换为大写:

const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  constructor(options) {
    super(options);
  }

  _transform(chunk, encoding, callback) {
    // 将数据转换为大写
    const uppercaseChunk = chunk.toString().toUpperCase();
    this.push(uppercaseChunk);
    callback();
  }
}

// 创建一个 Transform stream
const uppercaseTransform = new UppercaseTransform();

// 将数据通过管道传递给 Transform stream
process.stdin.pipe(uppercaseTransform).pipe(process.stdout);

在这个例子中,我们创建了一个自定义的 Transform stream。_transform 方法用于对数据进行转换。

我们使用了 pipe 方法将 process.stdin (标准输入) 的数据传递给 uppercaseTransform stream,然后再将转换后的数据传递给 process.stdout (标准输出)。这样,我们就可以将输入的数据转换为大写并输出到控制台。

四、Stream 的核心方法:pipe()

pipe() 方法是 stream 中最重要的一个方法。它可以将一个 Readable stream 的数据传递给一个 Writable stream,实现数据的流动。

readableStream.pipe(writableStream);

pipe() 方法会自动处理数据的读取和写入,以及错误处理。你只需要简单地将两个 stream 连接起来,就可以实现数据的传输。

例如,我们可以使用 pipe() 方法将一个文件的内容复制到另一个文件中:

const fs = require('fs');

const readableStream = fs.createReadStream('input.txt');
const writableStream = fs.createWriteStream('output.txt');

readableStream.pipe(writableStream);

writableStream.on('finish', () => {
  console.log('File copied successfully!');
});

五、背后的原理:缓冲区和背压 (Backpressure)

Stream 的高效之处在于它使用了缓冲区和背压机制。

  • 缓冲区 (Buffer): Stream 在内部使用缓冲区来存储数据。Readable stream 从数据源读取数据,并将数据存储在缓冲区中。Writable stream 从缓冲区读取数据,并将数据写入到目标。
  • 背压 (Backpressure): 当写入速度超过读取速度时,Writable stream 的缓冲区可能会被填满。这时,Writable stream 会通知 Readable stream 暂停读取数据,直到缓冲区为空。这种机制称为背压,它可以防止内存溢出。

六、Stream 的应用场景:处理大文件、网络请求、数据转换

Stream 在 Node.js 中有着广泛的应用场景:

  • 处理大文件: 读取、写入、转换大型文件。
  • 网络请求: 处理 HTTP 请求和响应,例如,上传和下载文件。
  • 数据转换: 压缩、解压缩、加密、解密、数据格式转换。
  • 视频流: 处理视频流数据。

七、自定义 Stream:打造你的专属流水线

除了使用 Node.js 提供的内置 stream 类型,你还可以自定义 stream,以满足特定的需求。

const { Readable, Writable, Transform } = require('stream');

// 自定义 Readable stream
class MyReadable extends Readable {
  constructor(options) {
    super(options);
    this.data = ['Hello', 'World', '!'];
  }

  _read() {
    const chunk = this.data.shift();
    if (chunk) {
      this.push(chunk);
    } else {
      this.push(null); // 表示没有更多数据了
    }
  }
}

// 自定义 Writable stream
class MyWritable extends Writable {
  constructor(options) {
    super(options);
  }

  _write(chunk, encoding, callback) {
    console.log(`Received chunk: ${chunk.toString()}`);
    callback(); // 表示写入完成
  }
}

// 自定义 Transform stream
class MyTransform extends Transform {
  constructor(options) {
    super(options);
  }

  _transform(chunk, encoding, callback) {
    const transformedChunk = chunk.toString().toUpperCase();
    this.push(transformedChunk);
    callback();
  }
}

// 创建 stream 实例
const myReadable = new MyReadable();
const myWritable = new MyWritable();
const myTransform = new MyTransform();

// 使用 pipe 方法连接 stream
myReadable.pipe(myTransform).pipe(myWritable);

八、总结:Stream 是 Node.js 的核心概念

Stream 是 Node.js 中一个非常重要的概念。它允许你以高效的方式处理流式数据,避免一次性加载整个文件到内存中。掌握 stream 的使用方法,可以让你编写出更高效、更健壮的 Node.js 程序。

好了,今天的讲座就到这里。希望大家有所收获! 记住,熟能生巧,多写代码,才能真正掌握 stream 的精髓。 感谢大家的收听!

九、补充说明:一些常见的 Stream 操作

除了上述基本用法,Stream 还有一些常用的操作,例如:

  • read():Readable stream 中读取指定大小的数据。
  • write():Writable stream 中写入数据。
  • end(): 结束 Writable stream 的写入操作。
  • pause(): 暂停 Readable stream 的读取操作。
  • resume(): 恢复 Readable stream 的读取操作。
  • unpipe(): 断开 pipe() 连接。

十、Stream 的错误处理

Stream 的错误处理非常重要。你需要监听 error 事件,并处理发生的错误。

readableStream.on('error', (err) => {
  console.error('An error occurred:', err);
  // 处理错误,例如,关闭 stream
  readableStream.destroy();
  writableStream.destroy();
});

writableStream.on('error', (err) => {
  console.error('An error occurred:', err);
  // 处理错误,例如,关闭 stream
  readableStream.destroy();
  writableStream.destroy();
});

十一、使用 async/await 简化 Stream 操作

从 Node.js 14 开始,你可以使用 stream.promises API,使用 async/await 简化 Stream 操作。

const fs = require('fs');
const { pipeline } = require('stream/promises');

async function copyFile(input, output) {
  try {
    await pipeline(
      fs.createReadStream(input),
      fs.createWriteStream(output)
    );
    console.log('Pipeline succeeded.');
  } catch (err) {
    console.error('Pipeline failed.', err);
  }
}

copyFile('input.txt', 'output.txt');

pipeline 函数可以自动处理 stream 的错误和关闭,使代码更加简洁易懂。

十二、一些建议和最佳实践

  • 选择合适的 stream 类型: 根据你的需求选择合适的 stream 类型。
  • 合理设置 highWaterMark: highWaterMark 决定了内部缓冲区的大小。设置合适的值可以提高性能。
  • 处理背压: 当写入速度超过读取速度时,要处理背压,防止内存溢出。
  • 进行错误处理: 监听 error 事件,并处理发生的错误。
  • 使用 pipe() 方法: pipe() 方法可以简化 stream 的连接。
  • 使用 async/await: 使用 stream.promises API 可以简化 Stream 操作。
  • 考虑使用第三方库: 有很多第三方库提供了更高级的 stream 功能,例如,through2pump 等。

最后,希望大家能够灵活运用 stream,编写出更高效、更强大的 Node.js 程序!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注