大家好,今天咱们来聊聊JavaScript里一个相当重要的角色——ReadableStreamDefaultController
,也就是“可读流默认控制器”。 听这名字就觉得挺霸气,但实际上它负责的事情也确实挺核心:控制数据从源头(比如网络请求、文件读取)流向你的代码,并且管理好这个过程中的节奏,避免你的代码被数据冲垮。
咱们先打个比方,把ReadableStream
想象成一条水管,源头是水库(数据源),你的代码是用水的人家,而ReadableStreamDefaultController
就是水管的总闸,它控制着水库的水流向你家。 如果水流太快,你家的水缸(内存)可能会溢出来,这就是“背压”;如果水流太慢,你就没水用(程序卡顿)。 所以,ReadableStreamDefaultController
的任务就是维持一个平衡,让水流既不会太快也不会太慢。
一、ReadableStreamDefaultController
是干啥的?
简单来说,ReadableStreamDefaultController
负责以下几个核心任务:
- 管理读取流的状态: 它知道流是“正在读取”、“已关闭”还是“出错”。
- 处理拉取请求: 当你的代码需要数据时(也就是“拉取”),它会通知数据源开始提供数据。
- 处理背压: 当你的代码处理数据的速度跟不上数据流的速度时,它会暂停数据源的推送,避免数据堆积。
- 控制流的关闭和出错: 它可以关闭流(不再提供数据)或者报告流发生的错误。
二、ReadableStreamDefaultController
的成员和方法
ReadableStreamDefaultController
对象本身并没有太多直接可访问的属性,它更多的是通过方法来与读取流进行交互。 我们重点关注几个最常用的方法:
-
enqueue(chunk)
: 这是核心方法! 它将一个数据块(chunk
)放入读取流的内部队列。 想象一下,水库(数据源)把一桶水(chunk
)倒入水管里。chunk
可以是任意类型的数据,比如Uint8Array
、字符串等等。 -
close()
: 关闭读取流。 相当于把水管的总闸关掉,不再允许任何水流进来。 调用这个方法后,读取流会进入“已关闭”状态。 -
error(error)
: 让读取流进入“出错”状态。 相当于告诉大家水管坏了,不能再用了。error
参数通常是一个Error
对象,用于描述发生的错误。 -
desiredSize
(只读属性): 这个属性非常重要,它表示读取流的内部队列还剩下多少空间。 当desiredSize
大于0时,表示还有空间,可以继续往队列里添加数据;当desiredSize
小于等于0时,表示队列已满或者接近满了,应该暂停添加数据,这就是背压的体现。
三、代码示例:创建一个简单的可读流
咱们来创建一个最简单的可读流,从一个数组中读取数据:
const data = ['A', 'B', 'C', 'D', 'E'];
let index = 0;
const readableStream = new ReadableStream({
start(controller) {
console.log("开始创建可读流");
// 在 start 方法中,你可以进行一些初始化操作,比如连接数据源
// 定义一个辅助函数,用于将数据放入队列
function pushData() {
if (index < data.length) {
const chunk = data[index++];
controller.enqueue(chunk); // 将数据块放入队列
console.log(`放入数据: ${chunk}, 当前索引: ${index}`);
// 可以选择递归调用pushData,也可以等待拉取请求
// pushData(); //这里先不递归调用,等待拉取信号
} else {
controller.close(); // 数据读取完毕,关闭流
console.log("数据读取完毕,关闭流");
}
}
// 首次启动数据推送
//pushData(); // 这里首次不启动数据推送,等待 pull() 方法触发
},
pull(controller) {
// 当读取流需要更多数据时,会调用 pull 方法
console.log("收到拉取请求");
return new Promise((resolve, reject) => {
setTimeout(() => { // 模拟异步操作
if (index < data.length) {
const chunk = data[index++];
controller.enqueue(chunk); // 将数据块放入队列
console.log(`放入数据: ${chunk}, 当前索引: ${index}, 队列剩余空间: ${controller.desiredSize}`);
resolve(); // 告知拉取操作已完成
} else {
controller.close(); // 数据读取完毕,关闭流
console.log("数据读取完毕,关闭流");
resolve();
}
}, 500); // 模拟耗时操作
});
},
cancel(reason) {
// 当读取流被取消时,会调用 cancel 方法
console.log(`读取流被取消,原因:${reason}`);
// 在 cancel 方法中,你可以进行一些清理操作,比如关闭连接
}
});
// 创建一个读取器,用于从读取流中读取数据
const reader = readableStream.getReader();
// 定义一个异步函数,用于读取数据
async function readData() {
try {
while (true) {
const { done, value } = await reader.read(); // 从读取流中读取数据
if (done) {
console.log("读取完毕");
break;
}
console.log(`读取到数据: ${value}`);
// 模拟处理数据的时间
await new Promise(resolve => setTimeout(resolve, 1000));
}
} catch (error) {
console.error(`读取数据出错:${error}`);
} finally {
reader.releaseLock(); // 释放读取器的锁
}
}
// 开始读取数据
readData();
这个例子中,我们创建了一个ReadableStream
,并定义了三个回调函数:
-
start(controller)
: 在读取流创建时调用,用于初始化操作。 这里我们只是简单地打印一条日志。 -
pull(controller)
: 当读取流需要更多数据时调用。 这是最重要的回调函数,你需要在里面做以下事情:- 从数据源获取数据。
- 使用
controller.enqueue(chunk)
将数据放入队列。 - 如果数据读取完毕,调用
controller.close()
关闭流。
-
cancel(reason)
: 当读取流被取消时调用。 你可以在里面做一些清理工作,比如关闭连接。
我们还创建了一个读取器reader
,用于从读取流中读取数据。 reader.read()
方法会返回一个Promise,resolve的值是一个对象,包含done
和value
两个属性。 done
表示是否读取完毕,value
表示读取到的数据。
注意,这里我们没有在start
里立即调用pushData()
,而是让pull()
方法来触发数据推送。 这样可以更好地演示背压机制。
四、背压机制:desiredSize
的应用
背压是ReadableStream
的一个重要特性,它可以防止数据源推送数据过快,导致内存溢出。 desiredSize
属性是实现背压的关键。
desiredSize
表示读取流的内部队列还剩下多少空间。 当desiredSize
大于0时,表示还有空间,可以继续往队列里添加数据;当desiredSize
小于等于0时,表示队列已满或者接近满了,应该暂停添加数据。
在上面的例子中,我们可以通过controller.desiredSize
来判断是否需要暂停数据推送。 如果desiredSize
小于等于0,我们可以暂停pull()
方法的调用,直到desiredSize
再次大于0。
五、更复杂的例子:从网络读取数据
咱们再来一个更复杂的例子,从网络读取数据:
async function fetchStream(url) {
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body.getReader();
const readableStream = new ReadableStream({
start(controller) {
console.log("开始从网络读取数据流");
},
async pull(controller) {
try {
const { done, value } = await reader.read();
if (done) {
controller.close();
console.log("网络数据读取完毕,关闭流");
return;
}
controller.enqueue(value);
console.log(`从网络读取到 ${value.length} 字节数据, 队列剩余空间: ${controller.desiredSize}`);
} catch (error) {
controller.error(error);
console.error(`读取网络数据出错:${error}`);
}
},
cancel(reason) {
console.log(`网络数据流被取消,原因:${reason}`);
reader.cancel(reason); // 取消底层读取器
}
});
return readableStream;
} catch (error) {
console.error(`创建网络数据流出错:${error}`);
throw error;
}
}
// 使用示例
async function processStream(url) {
try {
const stream = await fetchStream(url);
const reader = stream.getReader();
let totalBytes = 0;
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log(`总共读取了 ${totalBytes} 字节数据`);
break;
}
totalBytes += value.length;
console.log(`处理了 ${value.length} 字节数据, 总共处理了 ${totalBytes} 字节`);
// 模拟处理数据的时间
await new Promise(resolve => setTimeout(resolve, 200));
}
reader.releaseLock();
} catch (error) {
console.error(`处理数据流出错:${error}`);
}
}
//processStream('https://www.example.com'); // 将这里的URL替换成一个实际的URL,比如一个大的文本文件
这个例子中,我们使用fetch
API从网络读取数据,并将response.body
转换为ReadableStream
。 然后在pull()
方法中,我们使用reader.read()
方法从网络读取数据,并将数据放入读取流的队列。
六、ReadableStreamDefaultController
与ReadableByteStreamController
的区别
ReadableStream
有两种控制器:ReadableStreamDefaultController
和ReadableByteStreamController
。
ReadableStreamDefaultController
:用于处理非字节流,比如文本、JSON等。enqueue()
方法可以接受任意类型的数据。ReadableByteStreamController
:用于处理字节流,比如图片、视频等。enqueue()
方法只能接受Uint8Array
类型的数据。
它们的区别主要在于处理的数据类型不同。 如果你需要处理字节流,应该使用ReadableByteStreamController
。
七、表格总结
方法/属性 | 描述 |
---|---|
enqueue(chunk) |
将一个数据块放入读取流的内部队列。 |
close() |
关闭读取流,不再允许任何数据流进来。 |
error(error) |
让读取流进入出错状态,并报告错误信息。 |
desiredSize |
(只读) 读取流内部队列还剩余的空间大小。正数表示有空间,负数或零表示空间不足,需要暂停数据推送(背压)。 |
start(controller) |
(回调函数) 在ReadableStream 创建时调用,用于初始化操作,比如连接数据源。 |
pull(controller) |
(回调函数) 当读取流需要更多数据时调用。 你需要在里面从数据源获取数据,并使用controller.enqueue(chunk) 将数据放入队列。 |
cancel(reason) |
(回调函数) 当读取流被取消时调用。 你可以在里面做一些清理工作,比如关闭连接。 |
八、注意事项
- 不要在
start()
方法中阻塞线程。start()
方法应该尽快返回,避免影响性能。 - 在
pull()
方法中要处理错误。 如果从数据源读取数据时发生错误,应该调用controller.error()
方法报告错误。 - 在
cancel()
方法中要进行清理工作。 如果读取流被取消,应该关闭连接、释放资源等。 - 注意背压。 如果
desiredSize
小于等于0,应该暂停数据推送,避免内存溢出。 - 及时释放读取器的锁。 在使用完读取器后,应该调用
reader.releaseLock()
方法释放锁,避免其他代码无法访问读取流。
九、总结
ReadableStreamDefaultController
是ReadableStream
的核心组件,它负责管理读取流的状态、处理拉取请求、处理背压以及控制流的关闭和出错。 通过理解ReadableStreamDefaultController
的工作原理,你可以更好地使用ReadableStream
来处理各种数据流,比如网络请求、文件读取等。 希望今天的讲解能帮助大家更深入地理解ReadableStream
和ReadableStreamDefaultController
。 记住,理解背压机制是关键! 实践出真知,多写代码,多调试,才能真正掌握它们。 下课!