Skip to content

Runnable 接口三要素:invoke, stream, batch

在 LangChain V3 中,所有组件都实现了 Runnable 接口,该接口定义了三个核心方法:invokestreambatch。这三个方法构成了 Runnable 的基础,为开发者提供了灵活且强大的执行方式。

invoke(input): 单次执行,返回 Promise<Output>

invoke 方法是最基本的执行方式,用于单次执行一个 Runnable 组件。它接受一个输入参数,并返回一个 Promise,该 Promise 解析为组件的输出结果。

typescript
const result = await runnable.invoke(input);

特点:

  1. 同步语义的异步实现 - 虽然使用 async/await,但语义上是同步的请求-响应模式
  2. 简单直观 - 最容易理解和使用的方式
  3. 类型安全 - 输入和输出都有明确的类型定义

使用场景:

  • 简单的推理任务
  • 一次性的处理需求
  • 当不需要流式输出时的操作

stream(input): 流式输出,返回 AsyncGeneratorReadableStream

stream 方法用于流式处理,它返回一个异步生成器(AsyncGenerator)或可读流(ReadableStream),允许逐步获取结果。

typescript
const stream = await runnable.stream(input);
for await (const chunk of stream) {
  console.log(chunk);
}

特点:

  1. 实时反馈 - 可以在结果生成过程中逐步获取输出
  2. 内存效率 - 不需要等待完整结果就可以开始处理
  3. 用户体验 - 提供更好的交互体验,特别是对于大语言模型的文本生成

使用场景:

  • 大语言模型的文本生成
  • 需要实时展示进度的任务
  • 处理大量数据的场景

batch(inputs[]): 批量处理,返回 Promise<Output[]>

batch 方法用于批量处理多个输入,它接受一个输入数组,并返回一个 Promise,该 Promise 解析为对应输出结果的数组。

typescript
const inputs = [input1, input2, input3];
const results = await runnable.batch(inputs);

特点:

  1. 高效处理 - 可以一次性处理多个任务,提高整体效率
  2. 并行执行 - 内部可能会并行执行多个操作
  3. 资源优化 - 更好地利用系统资源

使用场景:

  • 批量数据处理
  • 并行执行相似任务
  • 需要处理大量独立输入的情况

三者的协同关系

这三个方法并非孤立存在,它们之间有着密切的关系:

  1. 基础构建 - invoke 是最基础的方法,streambatch 可以基于 invoke 实现
  2. 性能优化 - 每个方法都可以针对其特定用途进行优化
  3. 使用选择 - 开发者可以根据具体需求选择最合适的方法

实现层面的考虑

在实现 Runnable 接口时,需要注意:

默认实现 vs 优化实现

typescript
class CustomRunnable extends Runnable {
  // 基础 invoke 实现
  async invoke(input, options) {
    // 核心逻辑
  }
  
  // stream 可以基于 invoke 实现
  async *stream(input, options) {
    const result = await this.invoke(input, options);
    yield result;
  }
  
  // batch 可以并行执行多个 invoke
  async batch(inputs, options) {
    return Promise.all(
      inputs.map(input => this.invoke(input, options))
    );
  }
}

性能优化

对于特定的 Runnable 实现,可能需要对不同方法进行专门优化:

typescript
class OptimizedLLM extends Runnable {
  // invoke 的标准实现
  async invoke(input) {
    // 单次调用逻辑
  }
  
  // stream 的优化实现,支持真正的流式输出
  async *stream(input) {
    // 实现真正的 token-by-token 流式输出
    for await (const token of this.streamTokens(input)) {
      yield token;
    }
  }
  
  // batch 的优化实现,支持批量 API 调用
  async batch(inputs) {
    // 如果 API 支持批量请求,则使用批量接口
    if (this.supportsBatchAPI) {
      return this.batchAPI(inputs);
    }
    // 否则回退到并行执行
    return Promise.all(inputs.map(input => this.invoke(input)));
  }
}

错误处理的一致性

所有三个方法都应该有一致的错误处理机制:

typescript
try {
  const result = await runnable.invoke(input);
} catch (error) {
  // 统一的错误处理逻辑
}

try {
  const stream = await runnable.stream(input);
  for await (const chunk of stream) {
    // 处理流数据
  }
} catch (error) {
  // 流式处理中的错误处理
}

try {
  const results = await runnable.batch([input1, input2]);
} catch (error) {
  // 批量处理中的错误处理
}

总结

Runnable 接口的三个核心方法 invokestreambatch 为 LangChain V3 提供了灵活而强大的执行模型。它们分别适用于不同的使用场景:

  • invoke 适用于简单的单次执行
  • stream 适用于需要实时反馈的场景
  • batch 适用于批量处理任务

这种设计既保证了使用的简便性,又提供了足够的灵活性和性能优化空间。在后续章节中,我们将进一步探讨如何利用这些方法构建复杂的 LLM 应用程序。