Skip to content

| 操作符的实现:pipe() 方法的重载

在 LangChain V3 中,LCEL(LangChain Expression Language)的 | 操作符是通过 pipe() 方法实现的。虽然 JavaScript/TypeScript 本身不支持自定义操作符,但 LangChain 通过巧妙的设计让开发者能够使用链式调用来模拟 | 操作符的效果。本章将深入探讨 pipe() 方法的实现机制和重载方式。

pipe() 方法的基本实现

在 LangChain V3 中,所有 组件都实现了 pipe() 方法,用于将当前组件与另一个组件连接:

typescript
class Runnable<Input, Output> {
  pipe<NewOutput>(
    next: Runnable<Output, NewOutput> | ((input: Output) => NewOutput)
  ): Runnable<Input, NewOutput> {
    // 如果 next 是函数,将其包装为 RunnableLambda
    if (typeof next === 'function') {
      return new RunnableSequence({
        steps: [this, new RunnableLambda(next)]
      });
    }
    
    // 如果 next 是 Runnable,创建 RunnableSequence
    return new RunnableSequence({
      steps: [this, next]
    });
  }
}

RunnableSequence 的实现

RunnableSequencepipe() 方法的核心,它负责管理一系列连接的 Runnable 组件:

typescript
class RunnableSequence<Input = any, Output = any> extends Runnable<Input, Output> {
  private steps: Runnable[];
  
  constructor({ steps }: { steps: Runnable[] }) {
    super();
    this.steps = steps;
  }
  
  async invoke(input: Input, options?: RunnableConfig): Promise<Output> {
    let currentInput: any = input;
    
    // 依次执行每个步骤
    for (const step of this.steps) {
      currentInput = await step.invoke(currentInput, options);
    }
    
    return currentInput as Output;
  }
  
  async batch(
    inputs: Input[], 
    options?: RunnableConfig | RunnableConfig[]
  ): Promise<Output[]> {
    let currentInputs: any[] = inputs;
    
    // 依次批量执行每个步骤
    for (const step of this.steps) {
      currentInputs = await step.batch(currentInputs, options);
    }
    
    return currentInputs as Output[];
  }
  
  async *stream(
    input: Input, 
    options?: RunnableConfig
  ): AsyncGenerator<Output, any, unknown> {
    // 流式处理的实现
    // 这里简化了实现,实际会更复杂
    const result = await this.invoke(input, options);
    yield result as any;
  }
}

方法重载与类型推导

pipe() 方法的强大之处在于其类型重载和自动推导能力:

typescript
class Runnable<Input = any, Output = any> {
  // 基本重载 - 连接另一个 Runnable
  pipe<NewOutput>(
    other: Runnable<Output, NewOutput>
  ): Runnable<Input, NewOutput>;
  
  // 函数重载 - 连接一个普通函数
  pipe<NewOutput>(
    func: (input: Output) => NewOutput | Promise<NewOutput>
  ): Runnable<Input, NewOutput>;
  
  // 条件重载 - 根据输入类型选择不同实现
  pipe<NewOutput>(
    other: Runnable<Output, NewOutput> | ((input: Output) => NewOutput)
  ): Runnable<Input, NewOutput> {
    // 实现细节...
  }
}

类型安全的实现

LangChain V3 利用 TypeScript 的泛型系统确保类型安全:

typescript
// 定义不同类型的 Runnable
const stringToNumber = new RunnableLambda((input: string) => input.length);
const numberToString = new RunnableLambda((input: number) => input.toString());

// 类型会被正确推导
// chain 的类型是 Runnable<string, string>
const chain = stringToNumber.pipe(numberToString);

// TypeScript 会在编译时检查类型兼容性
// 以下代码会产生编译错误:
// const invalidChain = stringToNumber.pipe((input: boolean) => input.toString());

支持函数作为参数

pipe() 方法不仅可以连接 Runnable 组件,还可以直接连接普通函数:

typescript
const llm = new ChatOpenAI();
const uppercase = (text: string) => text.toUpperCase();

// 可以直接连接函数
const chain = llm.pipe(uppercase);

// 内部实现会将函数包装为 RunnableLambda
class RunnableLambda<T, U> extends Runnable<T, U> {
  private func: (input: T) => U | Promise<U>;
  
  constructor(func: (input: T) => U | Promise<U>) {
    super();
    this.func = func;
  }
  
  async invoke(input: T): Promise<U> {
    return this.func(input);
  }
}

复杂管道的构建

通过连续调用 pipe() 方法,可以构建复杂的处理管道:

typescript
const complexChain = 
  promptTemplate
    .pipe(llm)
    .pipe((result: string) => result.trim())
    .pipe(new JsonOutputParser())
    .pipe((data: any) => ({
      ...data,
      processedAt: new Date().toISOString()
    }));

错误处理和中间件

pipe() 方法还支持错误处理和中间件模式:

typescript
class RunnableWithRetry<Input, Output> extends Runnable<Input, Output> {
  constructor(
    private runnable: Runnable<Input, Output>,
    private maxRetries: number = 3
  ) {
    super();
  }
  
  async invoke(input: Input, options?: RunnableConfig): Promise<Output> {
    for (let i = 0; i < this.maxRetries; i++) {
      try {
        return await this.runnable.invoke(input, options);
      } catch (error) {
        if (i === this.maxRetries - 1) throw error;
        await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(2, i)));
      }
    }
    throw new Error("Unreachable");
  }
  
  // 重写 pipe 方法以保持链式调用
  pipe<NewOutput>(
    other: Runnable<Output, NewOutput> | ((input: Output) => NewOutput)
  ): Runnable<Input, NewOutput> {
    const next = typeof other === 'function' 
      ? new RunnableLambda(other) 
      : other;
    
    return new RunnableSequence({
      steps: [this, next]
    });
  }
}

// 使用示例
const resilientChain = llm
  .pipe(new RunnableWithRetry(llm, 3))
  .pipe(parser);

性能优化

pipe() 方法的实现还考虑了性能优化:

typescript
class RunnableSequence<Input, Output> extends Runnable<Input, Output> {
  // 缓存优化 - 避免重复创建相同的序列
  private static sequenceCache = new WeakMap();
  
  static fromSteps<Input, Output>(steps: Runnable<any, any>[]): Runnable<Input, Output> {
    // 检查缓存
    const cacheKey = steps.join('|');
    if (this.sequenceCache.has(cacheKey)) {
      return this.sequenceCache.get(cacheKey);
    }
    
    const sequence = new RunnableSequence({ steps });
    this.sequenceCache.set(cacheKey, sequence);
    return sequence;
  }
}

流式处理支持

pipe() 方法还需要支持流式处理:

typescript
class RunnableSequence<Input, Output> extends Runnable<Input, Output> {
  async *stream(input: Input, options?: RunnableConfig): AsyncGenerator<any, any, unknown> {
    let currentStream: AsyncGenerator<any> = (async function*() { 
      yield input; 
    })();
    
    // 依次流式处理每个步骤
    for (const step of this.steps) {
      const items: any[] = [];
      for await (const item of currentStream) {
        items.push(item);
      }
      
      // 如果是最后一个步骤,直接 yield 结果
      if (step === this.steps[this.steps.length - 1]) {
        const finalStream = await step.stream(items[0], options);
        for await (const chunk of finalStream) {
          yield chunk;
        }
      } else {
        // 中间步骤,继续处理
        currentStream = await step.stream(items[0], options);
      }
    }
  }
}

实际应用示例

让我们看一个完整的实际应用示例:

typescript
// 定义组件
const template = new PromptTemplate({
  template: "将以下文本翻译为 {language}: {text}",
  inputVariables: ["text", "language"]
});

const llm = new ChatOpenAI({
  temperature: 0.7,
  modelName: "gpt-3.5-turbo"
});

const parser = new StringOutputParser();

// 使用 pipe 构建链
const translationChain = template
  .pipe(llm)
  .pipe(parser)
  .pipe((text: string) => text.trim())
  .pipe(new RunnableLambda((text: string) => ({
    original: "",
    translated: text,
    timestamp: new Date()
  })));

// 使用链
const result = await translationChain.invoke({
  text: "Hello, world!",
  language: "中文"
});

console.log(result); 
// {
//   original: "Hello, world!",
//   translated: "你好,世界!",
//   timestamp: 2023-...
// }

总结

pipe() 方法的实现是 LangChain V3 中一个精巧而强大的设计。通过方法重载、类型推导、函数包装和序列管理,它成功地模拟了 | 操作符的效果,为开发者提供了流畅而类型安全的链式调用体验。

关键实现要点包括:

  1. 使用 RunnableSequence 管理多个连接的组件
  2. 通过 TypeScript 泛型实现类型安全和自动推导
  3. 支持函数和 Runnable 组件作为参数
  4. 保持所有 Runnable 接口的一致性(invoke, batch, stream)
  5. 支持错误处理和中间件模式

在下一章中,我们将探讨输入/输出 Schema 的自动推导,了解 LangChain V3 如何利用 TypeScript 泛型和 Zod 风格校验实现类型安全的链式调用。