各位靓仔靓女,晚上好!我是你们今晚的流媒体大师(自封的)。今天咱们聊聊 Node.js 的 Stream,这玩意儿听起来高大上,其实也没那么玄乎,说白了就是处理数据的管道。想象一下,你家自来水管,水龙头拧开,水哗啦啦地来,这水就是数据,水管就是 Stream。
这次咱们要深入到 Readable
、Writable
和 Transform
这些“水管”的内部,看看它们是怎么工作的,以及怎么DIY一个属于你自己的“高级定制水管”。
一、Stream 概念:数据的涓涓细流
在Node.js的世界里,Stream就像一条潺潺流淌的小溪,数据不再是一次性加载到内存中,而是像溪水一样,一点一点地流过。这对于处理大文件、网络数据或者需要实时处理的数据来说,简直是救星。
为什么需要Stream? 想象一下,如果你要读取一个 10GB 的大文件,一次性加载到内存,你的电脑可能会直接罢工。而Stream 可以将文件分成小块,逐个读取,处理完一块再读取下一块,内存压力大大减轻。
Stream 的优势:
- 内存效率: 避免一次性加载大量数据,节省内存。
- 时间效率: 可以边读取边处理,无需等待所有数据加载完成。
- 可组合性: 可以将多个 Stream 连接起来,形成复杂的数据处理流程。
二、Stream 的分类:三大水管
Node.js 的 Stream 主要分为四种类型,分别是 Readable
(可读流)、Writable
(可写流)、Duplex
(双工流) 和 Transform
(转换流)。 Duplex
相当于同时拥有 Readable
和 Writable
的能力, 而 Transform
在 Duplex
的基础上增加了数据转换的能力。 咱们重点关注最常用的 Readable
、 Writable
和 Transform
。
-
Readable (可读流): 就像一个水龙头,负责生产数据。你可以从 Readable 流中读取数据。比如,从文件中读取数据,或者从网络请求中接收数据。
-
Writable (可写流): 就像一个水槽,负责接收数据。你可以将数据写入 Writable 流。比如,将数据写入文件,或者发送到网络请求。
-
Transform (转换流): 就像一个净水器,负责转换数据。它可以读取数据,进行处理,然后将处理后的数据写入另一个流。比如,压缩数据,加密数据,或者格式化数据。
用表格总结一下:
类型 | 功能 | 比喻 | 例子 |
---|---|---|---|
Readable | 生产数据 | 水龙头 | fs.createReadStream() , http.IncomingMessage |
Writable | 接收数据 | 水槽 | fs.createWriteStream() , http.ServerResponse |
Transform | 转换数据 | 净水器 | zlib.createGzip() , crypto.createCipher() |
三、Readable Stream:让数据流起来
Readable Stream 的核心在于 _read()
方法。 你需要重写这个方法来告诉 Stream 如何产生数据。
1. 创建一个简单的 Readable Stream:
咱们先来创建一个简单的 Readable Stream,模拟从数组中读取数据。
const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(options, data) {
super(options);
this.data = data;
this.index = 0;
}
_read(size) {
if (this.index >= this.data.length) {
this.push(null); // 告诉 Stream 没有更多数据了
return;
}
const chunk = this.data[this.index++];
this.push(chunk); // 将数据推入 Stream
}
}
// 使用示例
const data = ['hello', 'world', '!'];
const myReadable = new MyReadable({ encoding: 'utf8' }, data);
myReadable.on('data', (chunk) => {
console.log('Received:', chunk);
});
myReadable.on('end', () => {
console.log('End of stream');
});
myReadable.on('error', (err) => {
console.error('Error:', err);
});
代码解释:
MyReadable
类继承自Readable
。constructor
接收一个数据数组,并初始化index
。_read(size)
方法是核心,它负责从data
数组中读取数据,并通过this.push()
方法将数据推入 Stream。this.push(null)
表示没有更多数据了,Stream 将会结束。myReadable.on('data', ...)
监听 ‘data’ 事件,当 Stream 中有数据时,会触发这个事件。myReadable.on('end', ...)
监听 ‘end’ 事件,当 Stream 结束时,会触发这个事件。myReadable.on('error', ...)
监听 ‘error’ 事件,当 Stream 发生错误时,会触发这个事件。
2. _read(size)
方法的 size
参数:
_read(size)
方法接收一个 size
参数,它表示 Stream 期望读取的数据的大小。 但是,你可以忽略这个参数,一次性推送任意大小的数据。 Stream 会自动处理背压(backpressure,稍后会讲到)。
3. 两种读取模式:flowing 和 paused
Readable Stream 有两种读取模式:flowing 模式和 paused 模式。
-
flowing 模式: 这是默认模式。 当你监听 ‘data’ 事件时,Stream 会自动开始读取数据,并将数据推送到监听器。
-
paused 模式: 在 paused 模式下,你需要手动调用
read()
方法来读取数据。
// paused 模式示例
const myReadable = new MyReadable({ encoding: 'utf8' }, data);
myReadable.on('readable', () => {
let chunk;
while ((chunk = myReadable.read()) !== null) {
console.log('Received:', chunk);
}
});
myReadable.on('end', () => {
console.log('End of stream');
});
myReadable.on('error', (err) => {
console.error('Error:', err);
});
代码解释:
- 我们监听 ‘readable’ 事件,当 Stream 中有数据可读时,会触发这个事件。
- 我们使用
myReadable.read()
方法来读取数据。如果read()
方法返回null
,表示没有更多数据了。 myReadable.read()
方法可以接收一个size
参数,表示期望读取的数据的大小。
四、Writable Stream:将数据写进去
Writable Stream 的核心在于 _write()
方法。 你需要重写这个方法来告诉 Stream 如何处理接收到的数据。
1. 创建一个简单的 Writable Stream:
咱们来创建一个简单的 Writable Stream,模拟将数据写入数组。
const { Writable } = require('stream');
class MyWritable extends Writable {
constructor(options) {
super(options);
this.data = [];
}
_write(chunk, encoding, callback) {
this.data.push(chunk.toString()); // 将 Buffer 转换为字符串
callback(); // 告诉 Stream 数据已处理完毕
}
_final(callback) {
console.log('All data received:', this.data);
callback(); // 告诉 Stream 写入完成
}
}
// 使用示例
const myWritable = new MyWritable({ decodeStrings: false });
myWritable.write('hello', (err) => {
if (err) {
console.error('Error writing:', err);
} else {
console.log('Wrote "hello"');
}
});
myWritable.write('world', (err) => {
if (err) {
console.error('Error writing:', err);
} else {
console.log('Wrote "world"');
}
});
myWritable.end('!', () => {
console.log('Finished writing');
});
myWritable.on('error', (err) => {
console.error('Error:', err);
});
代码解释:
MyWritable
类继承自Writable
。constructor
初始化一个空数组data
。_write(chunk, encoding, callback)
方法是核心,它接收三个参数:chunk
: 接收到的数据块 (Buffer)。encoding
: 数据的编码方式。callback
: 一个回调函数,必须调用它来告诉 Stream 数据已处理完毕。
_final(callback)
方法在所有数据写入完成后被调用。myWritable.write(chunk, callback)
方法用于向 Stream 写入数据。myWritable.end(chunk, callback)
方法用于结束 Stream 的写入。 可以传入最后一块数据。
2. _write(chunk, encoding, callback)
的重要性:
_write
方法中的 callback
函数非常重要。 你必须在处理完数据后调用 callback
,否则 Stream 会认为你还没有处理完数据,会一直等待,导致程序卡住。
3. decodeStrings
选项:
在创建 Writable Stream 时,可以设置 decodeStrings
选项。 如果 decodeStrings
设置为 false
,chunk
参数将会是 Buffer 类型。 如果设置为 true
(默认值),chunk
参数将会是字符串类型。
五、Transform Stream:数据的变形金刚
Transform Stream 结合了 Readable Stream 和 Writable Stream 的功能,可以读取数据,进行处理,然后将处理后的数据写入另一个流。 它就像一个数据的变形金刚,可以把一种数据变成另一种数据。
1. 创建一个简单的 Transform Stream:
咱们来创建一个简单的 Transform Stream,模拟将数据转换为大写。
const { Transform } = require('stream');
class UppercaseTransform extends Transform {
constructor(options) {
super(options);
}
_transform(chunk, encoding, callback) {
const transformedChunk = chunk.toString().toUpperCase();
this.push(transformedChunk); // 将转换后的数据推入 Stream
callback(); // 告诉 Stream 数据已处理完毕
}
}
// 使用示例
const uppercaseTransform = new UppercaseTransform();
uppercaseTransform.on('data', (chunk) => {
console.log('Transformed:', chunk);
});
uppercaseTransform.on('end', () => {
console.log('Transformation complete');
});
uppercaseTransform.write('hello');
uppercaseTransform.write('world');
uppercaseTransform.end();
代码解释:
UppercaseTransform
类继承自Transform
。_transform(chunk, encoding, callback)
方法是核心,它接收三个参数:chunk
: 接收到的数据块 (Buffer)。encoding
: 数据的编码方式。callback
: 一个回调函数,必须调用它来告诉 Stream 数据已处理完毕。
_transform
方法将接收到的数据转换为大写,并通过this.push()
方法将转换后的数据推入 Stream。
2. _flush(callback)
方法:
Transform Stream 还可以定义一个 _flush(callback)
方法。 这个方法在所有数据都经过 _transform
方法处理后被调用。 你可以在 _flush
方法中进行一些最终的处理,例如,添加一些结尾标记。
class MyTransform extends Transform {
constructor(options) {
super(options);
this.total = 0;
}
_transform(chunk, encoding, callback) {
const num = parseInt(chunk.toString());
this.total += num;
callback();
}
_flush(callback) {
this.push(`Total: ${this.total}`);
callback();
}
}
const myTransform = new MyTransform();
myTransform.on('data', (chunk) => {
console.log('Result:', chunk);
});
myTransform.write('1');
myTransform.write('2');
myTransform.write('3');
myTransform.end();
六、管道 (Piping):连接 Stream 的魔法
管道 (Piping) 是将一个 Readable Stream 的输出连接到另一个 Writable Stream 的输入的一种简单方式。 它使用 pipe()
方法来实现。
readableStream.pipe(writableStream);
1. 使用管道连接多个 Stream:
const fs = require('fs');
const zlib = require('zlib');
// 创建一个 Readable Stream,从文件中读取数据
const fileReadStream = fs.createReadStream('input.txt');
// 创建一个 Transform Stream,使用 gzip 压缩数据
const gzipStream = zlib.createGzip();
// 创建一个 Writable Stream,将压缩后的数据写入文件
const fileWriteStream = fs.createWriteStream('output.gz');
// 使用管道连接 Stream
fileReadStream.pipe(gzipStream).pipe(fileWriteStream);
fileWriteStream.on('finish', () => {
console.log('Compression complete');
});
fileWriteStream.on('error', (err) => {
console.error('Error:', err);
});
代码解释:
- 我们使用
fs.createReadStream()
创建一个 Readable Stream,从input.txt
文件中读取数据。 - 我们使用
zlib.createGzip()
创建一个 Transform Stream,使用 gzip 算法压缩数据。 - 我们使用
fs.createWriteStream()
创建一个 Writable Stream,将压缩后的数据写入output.gz
文件。 - 我们使用
pipe()
方法将这三个 Stream 连接起来,形成一个数据处理管道。
2. 管道的优点:
- 简洁: 使用
pipe()
方法可以轻松地连接多个 Stream,代码简洁易懂。 - 自动处理背压: 管道会自动处理背压,避免数据溢出。
七、背压 (Backpressure):控制数据流速
背压是指当一个 Stream 的数据生产速度超过了数据消费速度时,数据消费方会向数据生产方发出信号,要求降低生产速度。 Stream 内部会自动处理背压,保证数据的稳定传输。
1. 背压的原理:
当 Writable Stream 的内部缓冲区满了时,write()
方法会返回 false
。 这表示 Writable Stream 暂时无法接收更多的数据。 Readable Stream 应该监听 Writable Stream 的 drain
事件,当 drain
事件触发时,表示 Writable Stream 的内部缓冲区已空,可以继续写入数据。
2. 手动处理背压:
const { Readable, Writable } = require('stream');
class SlowWritable extends Writable {
constructor(options) {
super(options);
}
_write(chunk, encoding, callback) {
// 模拟缓慢的写入过程
setTimeout(() => {
console.log('Writing:', chunk.toString());
callback();
}, 100);
}
}
const readable = new Readable({
read() {
this.push('A');
this.push('B');
this.push('C');
this.push(null); // End of stream
}
});
const writable = new SlowWritable();
readable.on('data', (chunk) => {
if (!writable.write(chunk)) {
console.log('Backpressure: Pausing readable stream');
readable.pause();
writable.once('drain', () => {
console.log('Backpressure relieved: Resuming readable stream');
readable.resume();
});
}
});
readable.on('end', () => {
writable.end();
});
代码解释:
SlowWritable
模拟了一个写入速度很慢的 Writable Stream。- 在
readable.on('data', ...)
中,我们检查writable.write()
方法的返回值。 如果返回false
,表示 Writable Stream 无法接收更多的数据,我们需要暂停 Readable Stream。 - 我们监听 Writable Stream 的
drain
事件,当drain
事件触发时,表示 Writable Stream 的内部缓冲区已空,我们可以恢复 Readable Stream。
八、Stream 的错误处理:
Stream 的错误处理非常重要。 如果在 Stream 的任何阶段发生错误,都应该及时处理,避免程序崩溃。
1. 监听 ‘error’ 事件:
readableStream.on('error', (err) => {
console.error('Readable stream error:', err);
});
writableStream.on('error', (err) => {
console.error('Writable stream error:', err);
});
transformStream.on('error', (err) => {
console.error('Transform stream error:', err);
});
2. 在 _read
、_write
和 _transform
方法中处理错误:
如果在 _read
、_write
或 _transform
方法中发生错误,应该调用 callback
函数,并传入一个 Error
对象。
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
try {
// 处理数据
// ...
callback();
} catch (err) {
callback(err); // 传递错误信息
}
}
}
总结:
今天咱们深入了解了 Node.js 的 Stream,包括 Readable
、Writable
和 Transform
三种类型,以及管道和背压的概念。 掌握 Stream 的原理和使用方法,可以让你编写出更高效、更稳定的 Node.js 程序。 希望今天的课程对你有所帮助!
下次再见!