大家好,今天咱们来聊聊JavaScript里一个相当重要的角色——ReadableStreamDefaultController,也就是“可读流默认控制器”。 听这名字就觉得挺霸气,但实际上它负责的事情也确实挺核心:控制数据从源头(比如网络请求、文件读取)流向你的代码,并且管理好这个过程中的节奏,避免你的代码被数据冲垮。
咱们先打个比方,把ReadableStream想象成一条水管,源头是水库(数据源),你的代码是用水的人家,而ReadableStreamDefaultController就是水管的总闸,它控制着水库的水流向你家。 如果水流太快,你家的水缸(内存)可能会溢出来,这就是“背压”;如果水流太慢,你就没水用(程序卡顿)。 所以,ReadableStreamDefaultController的任务就是维持一个平衡,让水流既不会太快也不会太慢。
一、ReadableStreamDefaultController是干啥的?
简单来说,ReadableStreamDefaultController负责以下几个核心任务:
- 管理读取流的状态: 它知道流是“正在读取”、“已关闭”还是“出错”。
- 处理拉取请求: 当你的代码需要数据时(也就是“拉取”),它会通知数据源开始提供数据。
- 处理背压: 当你的代码处理数据的速度跟不上数据流的速度时,它会暂停数据源的推送,避免数据堆积。
- 控制流的关闭和出错: 它可以关闭流(不再提供数据)或者报告流发生的错误。
二、ReadableStreamDefaultController的成员和方法
ReadableStreamDefaultController对象本身并没有太多直接可访问的属性,它更多的是通过方法来与读取流进行交互。 我们重点关注几个最常用的方法:
-
enqueue(chunk): 这是核心方法! 它将一个数据块(chunk)放入读取流的内部队列。 想象一下,水库(数据源)把一桶水(chunk)倒入水管里。chunk可以是任意类型的数据,比如Uint8Array、字符串等等。 -
close(): 关闭读取流。 相当于把水管的总闸关掉,不再允许任何水流进来。 调用这个方法后,读取流会进入“已关闭”状态。 -
error(error): 让读取流进入“出错”状态。 相当于告诉大家水管坏了,不能再用了。error参数通常是一个Error对象,用于描述发生的错误。 -
desiredSize(只读属性): 这个属性非常重要,它表示读取流的内部队列还剩下多少空间。 当desiredSize大于0时,表示还有空间,可以继续往队列里添加数据;当desiredSize小于等于0时,表示队列已满或者接近满了,应该暂停添加数据,这就是背压的体现。
三、代码示例:创建一个简单的可读流
咱们来创建一个最简单的可读流,从一个数组中读取数据:
const data = ['A', 'B', 'C', 'D', 'E'];
let index = 0;
const readableStream = new ReadableStream({
start(controller) {
console.log("开始创建可读流");
// 在 start 方法中,你可以进行一些初始化操作,比如连接数据源
// 定义一个辅助函数,用于将数据放入队列
function pushData() {
if (index < data.length) {
const chunk = data[index++];
controller.enqueue(chunk); // 将数据块放入队列
console.log(`放入数据: ${chunk}, 当前索引: ${index}`);
// 可以选择递归调用pushData,也可以等待拉取请求
// pushData(); //这里先不递归调用,等待拉取信号
} else {
controller.close(); // 数据读取完毕,关闭流
console.log("数据读取完毕,关闭流");
}
}
// 首次启动数据推送
//pushData(); // 这里首次不启动数据推送,等待 pull() 方法触发
},
pull(controller) {
// 当读取流需要更多数据时,会调用 pull 方法
console.log("收到拉取请求");
return new Promise((resolve, reject) => {
setTimeout(() => { // 模拟异步操作
if (index < data.length) {
const chunk = data[index++];
controller.enqueue(chunk); // 将数据块放入队列
console.log(`放入数据: ${chunk}, 当前索引: ${index}, 队列剩余空间: ${controller.desiredSize}`);
resolve(); // 告知拉取操作已完成
} else {
controller.close(); // 数据读取完毕,关闭流
console.log("数据读取完毕,关闭流");
resolve();
}
}, 500); // 模拟耗时操作
});
},
cancel(reason) {
// 当读取流被取消时,会调用 cancel 方法
console.log(`读取流被取消,原因:${reason}`);
// 在 cancel 方法中,你可以进行一些清理操作,比如关闭连接
}
});
// 创建一个读取器,用于从读取流中读取数据
const reader = readableStream.getReader();
// 定义一个异步函数,用于读取数据
async function readData() {
try {
while (true) {
const { done, value } = await reader.read(); // 从读取流中读取数据
if (done) {
console.log("读取完毕");
break;
}
console.log(`读取到数据: ${value}`);
// 模拟处理数据的时间
await new Promise(resolve => setTimeout(resolve, 1000));
}
} catch (error) {
console.error(`读取数据出错:${error}`);
} finally {
reader.releaseLock(); // 释放读取器的锁
}
}
// 开始读取数据
readData();
这个例子中,我们创建了一个ReadableStream,并定义了三个回调函数:
-
start(controller): 在读取流创建时调用,用于初始化操作。 这里我们只是简单地打印一条日志。 -
pull(controller): 当读取流需要更多数据时调用。 这是最重要的回调函数,你需要在里面做以下事情:- 从数据源获取数据。
- 使用
controller.enqueue(chunk)将数据放入队列。 - 如果数据读取完毕,调用
controller.close()关闭流。
-
cancel(reason): 当读取流被取消时调用。 你可以在里面做一些清理工作,比如关闭连接。
我们还创建了一个读取器reader,用于从读取流中读取数据。 reader.read()方法会返回一个Promise,resolve的值是一个对象,包含done和value两个属性。 done表示是否读取完毕,value表示读取到的数据。
注意,这里我们没有在start里立即调用pushData(),而是让pull()方法来触发数据推送。 这样可以更好地演示背压机制。
四、背压机制:desiredSize的应用
背压是ReadableStream的一个重要特性,它可以防止数据源推送数据过快,导致内存溢出。 desiredSize属性是实现背压的关键。
desiredSize表示读取流的内部队列还剩下多少空间。 当desiredSize大于0时,表示还有空间,可以继续往队列里添加数据;当desiredSize小于等于0时,表示队列已满或者接近满了,应该暂停添加数据。
在上面的例子中,我们可以通过controller.desiredSize来判断是否需要暂停数据推送。 如果desiredSize小于等于0,我们可以暂停pull()方法的调用,直到desiredSize再次大于0。
五、更复杂的例子:从网络读取数据
咱们再来一个更复杂的例子,从网络读取数据:
async function fetchStream(url) {
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body.getReader();
const readableStream = new ReadableStream({
start(controller) {
console.log("开始从网络读取数据流");
},
async pull(controller) {
try {
const { done, value } = await reader.read();
if (done) {
controller.close();
console.log("网络数据读取完毕,关闭流");
return;
}
controller.enqueue(value);
console.log(`从网络读取到 ${value.length} 字节数据, 队列剩余空间: ${controller.desiredSize}`);
} catch (error) {
controller.error(error);
console.error(`读取网络数据出错:${error}`);
}
},
cancel(reason) {
console.log(`网络数据流被取消,原因:${reason}`);
reader.cancel(reason); // 取消底层读取器
}
});
return readableStream;
} catch (error) {
console.error(`创建网络数据流出错:${error}`);
throw error;
}
}
// 使用示例
async function processStream(url) {
try {
const stream = await fetchStream(url);
const reader = stream.getReader();
let totalBytes = 0;
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log(`总共读取了 ${totalBytes} 字节数据`);
break;
}
totalBytes += value.length;
console.log(`处理了 ${value.length} 字节数据, 总共处理了 ${totalBytes} 字节`);
// 模拟处理数据的时间
await new Promise(resolve => setTimeout(resolve, 200));
}
reader.releaseLock();
} catch (error) {
console.error(`处理数据流出错:${error}`);
}
}
//processStream('https://www.example.com'); // 将这里的URL替换成一个实际的URL,比如一个大的文本文件
这个例子中,我们使用fetch API从网络读取数据,并将response.body转换为ReadableStream。 然后在pull()方法中,我们使用reader.read()方法从网络读取数据,并将数据放入读取流的队列。
六、ReadableStreamDefaultController与ReadableByteStreamController的区别
ReadableStream有两种控制器:ReadableStreamDefaultController和ReadableByteStreamController。
ReadableStreamDefaultController:用于处理非字节流,比如文本、JSON等。enqueue()方法可以接受任意类型的数据。ReadableByteStreamController:用于处理字节流,比如图片、视频等。enqueue()方法只能接受Uint8Array类型的数据。
它们的区别主要在于处理的数据类型不同。 如果你需要处理字节流,应该使用ReadableByteStreamController。
七、表格总结
| 方法/属性 | 描述 |
|---|---|
enqueue(chunk) |
将一个数据块放入读取流的内部队列。 |
close() |
关闭读取流,不再允许任何数据流进来。 |
error(error) |
让读取流进入出错状态,并报告错误信息。 |
desiredSize |
(只读) 读取流内部队列还剩余的空间大小。正数表示有空间,负数或零表示空间不足,需要暂停数据推送(背压)。 |
start(controller) |
(回调函数) 在ReadableStream创建时调用,用于初始化操作,比如连接数据源。 |
pull(controller) |
(回调函数) 当读取流需要更多数据时调用。 你需要在里面从数据源获取数据,并使用controller.enqueue(chunk)将数据放入队列。 |
cancel(reason) |
(回调函数) 当读取流被取消时调用。 你可以在里面做一些清理工作,比如关闭连接。 |
八、注意事项
- 不要在
start()方法中阻塞线程。start()方法应该尽快返回,避免影响性能。 - 在
pull()方法中要处理错误。 如果从数据源读取数据时发生错误,应该调用controller.error()方法报告错误。 - 在
cancel()方法中要进行清理工作。 如果读取流被取消,应该关闭连接、释放资源等。 - 注意背压。 如果
desiredSize小于等于0,应该暂停数据推送,避免内存溢出。 - 及时释放读取器的锁。 在使用完读取器后,应该调用
reader.releaseLock()方法释放锁,避免其他代码无法访问读取流。
九、总结
ReadableStreamDefaultController是ReadableStream的核心组件,它负责管理读取流的状态、处理拉取请求、处理背压以及控制流的关闭和出错。 通过理解ReadableStreamDefaultController的工作原理,你可以更好地使用ReadableStream来处理各种数据流,比如网络请求、文件读取等。 希望今天的讲解能帮助大家更深入地理解ReadableStream和ReadableStreamDefaultController。 记住,理解背压机制是关键! 实践出真知,多写代码,多调试,才能真正掌握它们。 下课!