Skip to content

异步迭代器与生成器的融合:async function* 返回什么?如何实现流式数据处理?

"异步迭代是处理异步数据流的强大模式,它将异步编程与迭代协议相结合,为我们提供了一种优雅的方式来处理随时间推移产生的数据序列。"

在现代 JavaScript 应用中,我们经常需要处理异步数据源,如网络请求、文件读取、数据库查询等。异步迭代器和异步生成器为我们提供了一种统一且优雅的方式来处理这些异步数据流。

一、异步迭代基础概念

异步迭代是迭代协议的异步版本,它引入了两个新的概念:

  1. 异步可迭代协议(Async Iterable Protocol) - 使用 Symbol.asyncIterator 而不是 Symbol.iterator
  2. 异步迭代器协议(Async Iterator Protocol) - next() 方法返回 Promise 而不是直接返回值

异步可迭代协议

javascript
const asyncIterable = {
  [Symbol.asyncIterator]() {
    return asyncIterator;
  }
};

异步迭代器协议

javascript
const asyncIterator = {
  next() {
    // 返回一个 Promise,resolve 一个具有 value 和 done 属性的对象
    return Promise.resolve({
      value: 'some value',
      done: false // 或 true 表示完成
    });
  }
};

二、异步生成器函数

异步生成器函数是创建异步迭代器最简单的方式,使用 async function* 语法:

javascript
async function* asyncGenerator() {
  yield 1;
  yield 2;
  yield 3;
}

// 使用 for await...of 遍历
(async () => {
  for await (const value of asyncGenerator()) {
    console.log(value); // 1, 2, 3
  }
})();

三、异步迭代与普通迭代的区别

javascript
// 普通同步迭代器
function* syncGenerator() {
  yield 1;
  yield 2;
  yield 3;
}

// 异步迭代器
async function* asyncGen() {
  yield 1;
  yield 2;
  yield 3;
}

// 同步迭代
for (const value of syncGenerator()) {
  console.log(value);
}

// 异步迭代
(async () => {
  for await (const value of asyncGen()) {
    console.log(value);
  }
})();

四、处理异步数据源

1. 模拟异步数据流

javascript
async function* asyncDataStream() {
  const data = [1, 2, 3, 4, 5];
  
  for (const item of data) {
    // 模拟异步操作
    await new Promise(resolve => setTimeout(resolve, 1000));
    yield item;
  }
}

// 使用异步迭代处理数据流
(async () => {
  console.log('开始处理数据流...');
  
  for await (const value of asyncDataStream()) {
    console.log(`接收到数据: ${value}`);
  }
  
  console.log('数据流处理完成');
})();

2. 从 API 获取数据

javascript
async function* fetchUsers(userIds) {
  for (const userId of userIds) {
    try {
      // 模拟 API 调用
      const response = await fetch(`/api/users/${userId}`);
      const user = await response.json();
      yield user;
    } catch (error) {
      console.error(`获取用户 ${userId} 失败:`, error);
      yield null; // 或者可以选择跳过失败的请求
    }
  }
}

// 使用示例
(async () => {
  const userIds = [1, 2, 3, 4, 5];
  
  for await (const user of fetchUsers(userIds)) {
    if (user) {
      console.log(`用户: ${user.name}`);
    }
  }
})();

五、异步迭代器的实际应用

1. 文件读取流

javascript
const fs = require('fs').promises;

async function* readLines(filePath) {
  const fileHandle = await fs.open(filePath, 'r');
  const buffer = Buffer.alloc(1024);
  let position = 0;
  
  try {
    let remainder = '';
    
    while (true) {
      const { bytesRead } = await fileHandle.read(buffer, 0, buffer.length, position);
      if (bytesRead === 0) break;
      
      position += bytesRead;
      const chunk = remainder + buffer.subarray(0, bytesRead).toString();
      const lines = chunk.split('\n');
      
      // 保留最后一行(可能是不完整的)
      remainder = lines.pop();
      
      for (const line of lines) {
        yield line;
      }
    }
    
    // 处理最后一行
    if (remainder) {
      yield remainder;
    }
  } finally {
    await fileHandle.close();
  }
}

// 使用示例
(async () => {
  try {
    for await (const line of readLines('example.txt')) {
      console.log(line);
    }
  } catch (error) {
    console.error('读取文件出错:', error);
  }
})();

2. 实时数据流处理

javascript
class EventEmitter {
  constructor() {
    this.listeners = {};
  }
  
  on(event, callback) {
    if (!this.listeners[event]) {
      this.listeners[event] = [];
    }
    this.listeners[event].push(callback);
  }
  
  emit(event, data) {
    if (this.listeners[event]) {
      this.listeners[event].forEach(callback => callback(data));
    }
  }
}

// 创建一个异步可迭代的事件流
class EventStream {
  constructor(eventEmitter, event) {
    this.eventEmitter = eventEmitter;
    this.event = event;
  }
  
  async *[Symbol.asyncIterator]() {
    const queue = [];
    let resolveNext;
    
    const pushValue = (value) => {
      if (resolveNext) {
        resolveNext({ value, done: false });
        resolveNext = null;
      } else {
        queue.push(value);
      }
    };
    
    this.eventEmitter.on(this.event, pushValue);
    
    try {
      while (true) {
        if (queue.length > 0) {
          yield queue.shift();
        } else {
          const value = await new Promise(resolve => {
            resolveNext = resolve;
          });
          yield value;
        }
      }
    } finally {
      // 清理工作
      console.log('事件流结束');
    }
  }
}

// 使用示例
const emitter = new EventEmitter();
const stream = new EventStream(emitter, 'data');

// 启动异步迭代
(async () => {
  for await (const data of stream) {
    console.log('接收到数据:', data);
  }
})();

// 模拟事件发射
setTimeout(() => emitter.emit('data', { id: 1, value: 'Hello' }), 1000);
setTimeout(() => emitter.emit('data', { id: 2, value: 'World' }), 2000);
setTimeout(() => emitter.emit('data', { id: 3, value: '!' }), 3000);

六、异步生成器的高级用法

1. 异步生成器委托

javascript
async function* fetchPages(url, maxPages = 5) {
  for (let i = 1; i <= maxPages; i++) {
    try {
      const response = await fetch(`${url}?page=${i}`);
      const data = await response.json();
      
      if (data.length === 0) {
        break; // 没有更多数据
      }
      
      yield* data; // 委托给数据数组的迭代器
    } catch (error) {
      console.error(`获取第 ${i} 页失败:`, error);
      break;
    }
  }
}

// 使用示例
(async () => {
  for await (const item of fetchPages('/api/items')) {
    console.log(item);
  }
})();

2. 异步生成器中的错误处理

javascript
async function* robustAsyncGenerator() {
  try {
    yield 1;
    yield 2;
    
    // 模拟异步操作中的错误
    await new Promise((_, reject) => {
      setTimeout(() => reject(new Error('异步操作失败')), 100);
    });
    
    yield 3; // 这不会被执行
  } catch (error) {
    console.log('在生成器内部捕获错误:', error.message);
    yield 'error-recovery';
  }
}

// 使用示例
(async () => {
  try {
    for await (const value of robustAsyncGenerator()) {
      console.log(value);
    }
  } catch (error) {
    console.log('在迭代中捕获错误:', error.message);
  }
})();

3. 可取消的异步生成器

javascript
async function* cancellableAsyncGenerator(signal) {
  let count = 0;
  
  while (true) {
    // 检查是否已取消
    if (signal.aborted) {
      throw new Error('生成器已被取消');
    }
    
    try {
      // 模拟异步工作
      await new Promise(resolve => setTimeout(resolve, 500));
      yield ++count;
    } catch (error) {
      if (signal.aborted) {
        throw new Error('生成器已被取消');
      }
      throw error;
    }
  }
}

// 使用示例
(async () => {
  const controller = new AbortController();
  
  // 5秒后取消
  setTimeout(() => controller.abort(), 5000);
  
  try {
    for await (const value of cancellableAsyncGenerator(controller.signal)) {
      console.log(value);
    }
  } catch (error) {
    console.log('生成器被取消:', error.message);
  }
})();

七、与其他异步模式的对比

1. 与 Promise 数组的对比

javascript
// 使用 Promise.all 处理一组异步操作
async function fetchAllUsers(userIds) {
  const promises = userIds.map(id => fetch(`/api/users/${id}`).then(r => r.json()));
  return Promise.all(promises);
}

// 使用异步生成器逐个处理
async function* fetchUsersOneByOne(userIds) {
  for (const id of userIds) {
    const response = await fetch(`/api/users/${id}`);
    yield await response.json();
  }
}

// 使用示例
(async () => {
  // 一次性获取所有用户
  const allUsers = await fetchAllUsers([1, 2, 3]);
  console.log(allUsers);
  
  // 逐个获取用户
  for await (const user of fetchUsersOneByOne([1, 2, 3])) {
    console.log(user);
  }
})();

2. 与 Observable 的对比

javascript
// 异步生成器版本
async function* dataStream() {
  for (let i = 0; i < 5; i++) {
    await new Promise(resolve => setTimeout(resolve, 1000));
    yield i;
  }
}

// 使用 RxJS Observable 的等效实现
// const { interval } = require('rxjs');
// const { take } = require('rxjs/operators');
// 
// const observable = interval(1000).pipe(take(5));

// 消费数据
(async () => {
  for await (const value of dataStream()) {
    console.log(value);
  }
})();

八、性能考虑和最佳实践

1. 控制并发

javascript
async function* controlledAsyncGenerator(tasks, concurrency = 3) {
  const executing = new Set();
  
  for (const task of tasks) {
    const promise = task().then(result => {
      executing.delete(promise);
      return result;
    });
    
    executing.add(promise);
    yield promise;
    
    // 控制并发数量
    if (executing.size >= concurrency) {
      await Promise.race(executing);
    }
  }
  
  // 等待所有剩余任务完成
  while (executing.size > 0) {
    await Promise.race(executing);
  }
}

2. 背压处理

javascript
async function* backpressureControlledStream(source, bufferSize = 10) {
  const buffer = [];
  let resolveNext;
  
  // 启动数据生产
  (async () => {
    for await (const item of source) {
      if (buffer.length < bufferSize) {
        buffer.push(item);
        if (resolveNext) {
          resolveNext(buffer.shift());
          resolveNext = null;
        }
      } else {
        // 缓冲区满,等待消费者消费
        await new Promise(resolve => {
          resolveNext = resolve;
        });
        buffer.push(item);
      }
    }
  })();
  
  // 数据消费
  while (true) {
    if (buffer.length > 0) {
      yield buffer.shift();
    } else {
      const item = await new Promise(resolve => {
        resolveNext = resolve;
      });
      yield item;
    }
  }
}

一句话总结

异步生成器(async function)返回一个异步可迭代对象,它结合了异步编程和迭代协议的优势,使我们能够以自然的方式处理随时间推移产生的数据序列,是实现流式数据处理和实时数据流的强大工具。*

通过异步迭代,我们可以优雅地处理各种异步数据源,从 API 响应到实时事件流,同时保持代码的可读性和可维护性。异步生成器特别适用于需要逐个处理异步数据项的场景,与传统的 Promise.all 等批处理方式相比,提供了更好的内存效率和响应性。