为什么异步生成器是处理异步数据流的利器
在 JavaScript 中,同步迭代器(Iterator)让我们能够按需生成数据序列,而不需要一次性把所有数据加载到内存中。这个模式在处理大型数据集、文件流、网络响应等场景时非常有用。
然而,同步迭代器有一个根本限制:它只能处理同步数据源。当数据源本身是异步的时候——比如读取一个网络流、处理数据库查询结果、或者消费一个 WebSocket 消息——同步迭代器就无能为力了。
**异步生成器(Async Generator)**正是为解决这一问题而生的。它结合了生成器的惰性求值特性和 Promise 的异步能力,让开发者可以用同步迭代器的语法,处理异步数据流。
一、异步迭代器协议
1.1 同步迭代器的问题
同步迭代器要求 next() 方法同步返回 { done: boolean, value: any }:
function createSyncIterator(items) {
let index = 0;
return {
next() {
if (index < items.length) {
return { done: false, value: items[index++] };
}
return { done: true, value: undefined };
}
};
}
const it = createSyncIterator([1, 2, 3]);
console.log(it.next()); // { done: false, value: 1 }
console.log(it.next()); // { done: false, value: 2 }
console.log(it.next()); // { done: true, value: undefined }
但如果数据源是异步的,比如从网络读取数据块:
function createAsyncIterator(urls) {
let index = 0;
return {
next() {
// 这里无法同步等待网络响应
return fetch(urls[index++]) // 返回的是 Promise,不是 { done, value }
.then(res => ({ done: false, value: res }));
}
};
}
这种做法的问题是:next() 返回的不是迭代器结果,而是一个 Promise。这意味着调用者需要用 .then() 来处理结果,失去了迭代器的简洁性。
1.2 异步迭代器协议的定义
ES2018 引入了异步迭代器协议(Async Iterator Protocol):
// 异步迭代器必须返回一个对象,该对象的 next 方法返回 Promise
const asyncIterator = {
next() {
return Promise.resolve({ done: false, value: 'some async value' });
}
};
关键区别在于:next() 方法本身不是异步的,但它返回的是一个 Promise,该 Promise 最终 resolve 为迭代器结果。
async function consumeAsyncIterator(asyncIt) {
const result1 = await asyncIt.next(); // result1 = { done: false, value: ... }
const result2 = await asyncIt.next(); // result2 = { done: false, value: ... }
const result3 = await asyncIt.next(); // result3 = { done: true, value: undefined }
}
1.3 Symbol.asyncIterator
同步迭代器使用 Symbol.iterator,异步迭代器使用 Symbol.asyncIterator:
const asyncIterable = {
[Symbol.asyncIterator]() {
return this;
},
next() {
return Promise.resolve({ done: false, value: 'async value' });
}
};
**可迭代对象(Iterable)**指的是实现了 Symbol.iterator 或 Symbol.asyncIterator 方法的对象。当我们用 for...of 遍历时,JavaScript 会调用这个方法获取迭代器。
二、异步生成器函数
2.1 async function* 的基本用法
异步生成器是同时具有 async 和 function* 特性的函数:
async function* asyncGenerator(items) {
for (const item of items) {
// await 会暂停函数执行,等待 Promise resolve
const result = await processAsync(item);
yield result; // yield 返回值,但不终止函数
}
}
// 使用
async function main() {
for await (const value of asyncGenerator([1, 2, 3])) {
console.log(value);
}
}
关键点:
yield在异步生成器中产生一个 Promisefor await...of循环会自动等待每次迭代的 Promise- 异步生成器函数调用后返回的是异步迭代器
2.2 异步生成器的执行模型
async function* fetchAll(urls) {
for (const url of urls) {
const response = await fetch(url); // 等待 fetch 完成
const data = await response.json();
yield data; // 暂停并返回 data,下次迭代从这里继续
}
}
const gen = fetchAll(['/api/1', '/api/2', '/api/3']);
// 第一次调用 next()
const it1 = gen.next();
// it1 是 Promise,因为是异步生成器
// 返回值:{ done: false, value: <data from /api/1> }
// 第二次调用 next()
const it2 = gen.next();
// 返回值:{ done: false, value: <data from /api/2> }
2.3 与普通生成器的对比
// 普通生成器:yield 同步值
function* syncGen(items) {
for (const item of items) {
yield item * 2;
}
}
// 异步生成器:yield Promise,或者 await 后的值
async function* asyncGen(items) {
for (const item of items) {
const result = await Promise.resolve(item * 2);
yield result;
}
}
两者的 next() 返回值也不同:
// 普通生成器
const syncIt = syncGen([1, 2, 3]);
syncIt.next(); // { value: 2, done: false } — 同步返回
// 异步生成器
const asyncIt = asyncGen([1, 2, 3]);
asyncIt.next(); // Promise<{ value: 2, done: false }> — 返回 Promise
三、for await...of 循环
3.1 基本语法
for await...of 是 ES2018 引入的语法,专门用于遍历异步可迭代对象:
async function demo() {
const asyncIterable = {
[Symbol.asyncIterator]() {
let i = 0;
return {
next() {
if (i < 3) {
return Promise.resolve({ value: i++ * 2, done: false });
}
return Promise.resolve({ value: undefined, done: true });
}
};
}
};
// for await...of 自动等待每次迭代的 Promise
for await (const value of asyncIterable) {
console.log(value); // 0, 2, 4
}
}
demo();
3.2 与 Promise.all 的对比
处理多个异步操作时,for await...of 与 Promise.all 有不同的适用场景:
const urls = ['/api/1', '/api/2', '/api/3'];
// Promise.all:并发处理所有请求,等待全部完成
const allResults = await Promise.all(urls.map(url => fetch(url).then(r => r.json())));
// 所有请求并发发出,一次性返回所有结果
// for await...of:串行处理,逐个等待
const results = [];
for await (const url of urls) {
const res = await fetch(url);
results.push(await res.json());
}
// 逐个发出请求,等待每个完成后再发下一个
选择建议:
- 需要并发获取数据,且不依赖顺序 →
Promise.all - 需要逐个处理,且后一个依赖前一个的结果 →
for await...of - 需要控制并发数量 →
for await...of+ 手动控制
3.3 错误处理
for await...of 的错误处理与普通 for 循环类似:
async function withErrorHandling() {
const asyncIterable = {
[Symbol.asyncIterator]() {
let i = 0;
return {
next() {
if (i === 1) {
return Promise.reject(new Error('Error at i=1'));
}
return Promise.resolve({ value: i++, done: false });
}
};
}
};
try {
for await (const value of asyncIterable) {
console.log(value);
}
} catch (err) {
console.error('Caught error:', err.message); // "Error at i=1"
}
}
四、ReadableStream 与异步迭代器
4.1 Web Streams API
现代浏览器提供了 Streams API,其中 ReadableStream 实现了异步迭代器协议:
// 从 fetch 响应体获取 ReadableStream
const response = await fetch('/large-file.txt');
const reader = response.body.getReader();
// ReadableStream 默认不实现 async iterator,需要手动处理
const text = await readStream(reader);
async function readStream(reader) {
let result = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
result += new TextDecoder().decode(value);
}
return result;
}
4.2 ReadableStream as Async Iterable
ES2022 为 ReadableStream 添加了 Symbol.asyncIterator 实现:
// 现代浏览器可以直接用 for await...of
async function readLargeFile(url) {
const response = await fetch(url);
let total = 0;
for await (const chunk of response.body) {
total += chunk.length;
console.log(`Received ${total} bytes`);
}
return total;
}
4.3 实际应用场景
异步生成器和流式处理在以下场景特别有用:
- 大文件下载:边下载边处理,不需要一次性加载到内存
- 实时数据:处理 WebSocket、Server-Sent Events 等流式数据
- 数据库游标:逐批处理查询结果,避免一次性加载大量数据
- AI 流式响应:处理 LLM 的流式输出,逐 token 显示
// 处理 AI 流式响应
async function* streamAIResponse(prompt) {
const response = await fetch('/api/ai', {
method: 'POST',
body: JSON.stringify({ prompt }),
headers: { 'Content-Type': 'application/json' }
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
yield chunk; // 逐块 yield
}
}
// 使用
for await (const token of streamAIResponse('What is AI?')) {
appendToDisplay(token); // 实时显示
}
五、异步生成器的方法
5.1 异步生成器实例方法
异步生成器返回的异步迭代器有以下方法:
| 方法 | 说明 |
|---|---|
next() |
返回 Promise resolves to { done, value } |
return() |
提前终止迭代,返回 Promise |
throw() |
向生成器抛出错误 |
async function* gen() {
yield 1;
yield 2;
yield 3;
}
const it = gen();
// 提前终止
it.return('stopped').then(result => {
console.log(result); // { done: true, value: 'stopped' }
});
// 抛出错误
it.throw(new Error('error')).catch(err => {
console.error(err.message); // 'error'
});
5.2 异步生成器组合
可以使用 yield* 在异步生成器中委托给其他异步迭代器:
async function* gen1() {
yield 'a';
yield 'b';
}
async function* gen2() {
yield 1;
yield 2;
}
async function* combined() {
yield* gen1(); // 委托给另一个异步生成器
yield* gen2();
}
async function main() {
for await (const value of combined()) {
console.log(value); // 'a', 'b', 1, 2
}
}
六、面试高频考点
考点 1:同步迭代器与异步迭代器的区别
- 同步迭代器的
next()同步返回{ done, value } - 异步迭代器的
next()返回Promise<{ done, value }> - 遍历方式不同:同步用
for...of,异步用for await...of
考点 2:异步生成器的执行时机
- 调用异步生成器函数不执行函数体,而是创建异步迭代器
- 每次调用
.next()时,函数体开始执行直到遇到yield yield暂停函数执行,返回值通过 Promise 包装- 下次
.next()调用时,从暂停点继续执行
考点 3:for await...of 的错误处理
for await...of 内部会自动 try-catch,如果迭代过程中 Promise reject,错误会被抛出到外部。
七、与其他主题的关联
| 关联主题 | 关系 |
|---|---|
| async-programming | 异步生成器是处理异步数据流的模式 |
| javascript-iterator-generator | 异步生成器是生成器概念的扩展 |
| javascript-event-loop | 异步操作的调度依赖于事件循环 |
| javascript-promise-all | Promise.all 用于并发等待多个异步操作 |
参考资料
- MDN Web Docs — Async iteration
- ECMAScript Specification — AsyncIterator
- Jake Archibald — Async iterators are worth the trouble
延展阅读
- MDN: for await...of — 官方文档
- Streams API — Web Streams API 官方文档
- Jake Archibald: Are Async Iterators Worth It? — 深入分析异步迭代的权衡
- Web.dev: Streams — 现代 Web 应用中的流处理