| 操作符的实现:pipe() 方法的重载
在 LangChain V3 中,LCEL(LangChain Expression Language)的 | 操作符是通过 pipe() 方法实现的。虽然 JavaScript/TypeScript 本身不支持自定义操作符,但 LangChain 通过巧妙的设计让开发者能够使用链式调用来模拟 | 操作符的效果。本章将深入探讨 pipe() 方法的实现机制和重载方式。
pipe() 方法的基本实现
在 LangChain V3 中,所有 组件都实现了 pipe() 方法,用于将当前组件与另一个组件连接:
typescript
class Runnable<Input, Output> {
pipe<NewOutput>(
next: Runnable<Output, NewOutput> | ((input: Output) => NewOutput)
): Runnable<Input, NewOutput> {
// 如果 next 是函数,将其包装为 RunnableLambda
if (typeof next === 'function') {
return new RunnableSequence({
steps: [this, new RunnableLambda(next)]
});
}
// 如果 next 是 Runnable,创建 RunnableSequence
return new RunnableSequence({
steps: [this, next]
});
}
}RunnableSequence 的实现
RunnableSequence 是 pipe() 方法的核心,它负责管理一系列连接的 Runnable 组件:
typescript
class RunnableSequence<Input = any, Output = any> extends Runnable<Input, Output> {
private steps: Runnable[];
constructor({ steps }: { steps: Runnable[] }) {
super();
this.steps = steps;
}
async invoke(input: Input, options?: RunnableConfig): Promise<Output> {
let currentInput: any = input;
// 依次执行每个步骤
for (const step of this.steps) {
currentInput = await step.invoke(currentInput, options);
}
return currentInput as Output;
}
async batch(
inputs: Input[],
options?: RunnableConfig | RunnableConfig[]
): Promise<Output[]> {
let currentInputs: any[] = inputs;
// 依次批量执行每个步骤
for (const step of this.steps) {
currentInputs = await step.batch(currentInputs, options);
}
return currentInputs as Output[];
}
async *stream(
input: Input,
options?: RunnableConfig
): AsyncGenerator<Output, any, unknown> {
// 流式处理的实现
// 这里简化了实现,实际会更复杂
const result = await this.invoke(input, options);
yield result as any;
}
}方法重载与类型推导
pipe() 方法的强大之处在于其类型重载和自动推导能力:
typescript
class Runnable<Input = any, Output = any> {
// 基本重载 - 连接另一个 Runnable
pipe<NewOutput>(
other: Runnable<Output, NewOutput>
): Runnable<Input, NewOutput>;
// 函数重载 - 连接一个普通函数
pipe<NewOutput>(
func: (input: Output) => NewOutput | Promise<NewOutput>
): Runnable<Input, NewOutput>;
// 条件重载 - 根据输入类型选择不同实现
pipe<NewOutput>(
other: Runnable<Output, NewOutput> | ((input: Output) => NewOutput)
): Runnable<Input, NewOutput> {
// 实现细节...
}
}类型安全的实现
LangChain V3 利用 TypeScript 的泛型系统确保类型安全:
typescript
// 定义不同类型的 Runnable
const stringToNumber = new RunnableLambda((input: string) => input.length);
const numberToString = new RunnableLambda((input: number) => input.toString());
// 类型会被正确推导
// chain 的类型是 Runnable<string, string>
const chain = stringToNumber.pipe(numberToString);
// TypeScript 会在编译时检查类型兼容性
// 以下代码会产生编译错误:
// const invalidChain = stringToNumber.pipe((input: boolean) => input.toString());支持函数作为参数
pipe() 方法不仅可以连接 Runnable 组件,还可以直接连接普通函数:
typescript
const llm = new ChatOpenAI();
const uppercase = (text: string) => text.toUpperCase();
// 可以直接连接函数
const chain = llm.pipe(uppercase);
// 内部实现会将函数包装为 RunnableLambda
class RunnableLambda<T, U> extends Runnable<T, U> {
private func: (input: T) => U | Promise<U>;
constructor(func: (input: T) => U | Promise<U>) {
super();
this.func = func;
}
async invoke(input: T): Promise<U> {
return this.func(input);
}
}复杂管道的构建
通过连续调用 pipe() 方法,可以构建复杂的处理管道:
typescript
const complexChain =
promptTemplate
.pipe(llm)
.pipe((result: string) => result.trim())
.pipe(new JsonOutputParser())
.pipe((data: any) => ({
...data,
processedAt: new Date().toISOString()
}));错误处理和中间件
pipe() 方法还支持错误处理和中间件模式:
typescript
class RunnableWithRetry<Input, Output> extends Runnable<Input, Output> {
constructor(
private runnable: Runnable<Input, Output>,
private maxRetries: number = 3
) {
super();
}
async invoke(input: Input, options?: RunnableConfig): Promise<Output> {
for (let i = 0; i < this.maxRetries; i++) {
try {
return await this.runnable.invoke(input, options);
} catch (error) {
if (i === this.maxRetries - 1) throw error;
await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(2, i)));
}
}
throw new Error("Unreachable");
}
// 重写 pipe 方法以保持链式调用
pipe<NewOutput>(
other: Runnable<Output, NewOutput> | ((input: Output) => NewOutput)
): Runnable<Input, NewOutput> {
const next = typeof other === 'function'
? new RunnableLambda(other)
: other;
return new RunnableSequence({
steps: [this, next]
});
}
}
// 使用示例
const resilientChain = llm
.pipe(new RunnableWithRetry(llm, 3))
.pipe(parser);性能优化
pipe() 方法的实现还考虑了性能优化:
typescript
class RunnableSequence<Input, Output> extends Runnable<Input, Output> {
// 缓存优化 - 避免重复创建相同的序列
private static sequenceCache = new WeakMap();
static fromSteps<Input, Output>(steps: Runnable<any, any>[]): Runnable<Input, Output> {
// 检查缓存
const cacheKey = steps.join('|');
if (this.sequenceCache.has(cacheKey)) {
return this.sequenceCache.get(cacheKey);
}
const sequence = new RunnableSequence({ steps });
this.sequenceCache.set(cacheKey, sequence);
return sequence;
}
}流式处理支持
pipe() 方法还需要支持流式处理:
typescript
class RunnableSequence<Input, Output> extends Runnable<Input, Output> {
async *stream(input: Input, options?: RunnableConfig): AsyncGenerator<any, any, unknown> {
let currentStream: AsyncGenerator<any> = (async function*() {
yield input;
})();
// 依次流式处理每个步骤
for (const step of this.steps) {
const items: any[] = [];
for await (const item of currentStream) {
items.push(item);
}
// 如果是最后一个步骤,直接 yield 结果
if (step === this.steps[this.steps.length - 1]) {
const finalStream = await step.stream(items[0], options);
for await (const chunk of finalStream) {
yield chunk;
}
} else {
// 中间步骤,继续处理
currentStream = await step.stream(items[0], options);
}
}
}
}实际应用示例
让我们看一个完整的实际应用示例:
typescript
// 定义组件
const template = new PromptTemplate({
template: "将以下文本翻译为 {language}: {text}",
inputVariables: ["text", "language"]
});
const llm = new ChatOpenAI({
temperature: 0.7,
modelName: "gpt-3.5-turbo"
});
const parser = new StringOutputParser();
// 使用 pipe 构建链
const translationChain = template
.pipe(llm)
.pipe(parser)
.pipe((text: string) => text.trim())
.pipe(new RunnableLambda((text: string) => ({
original: "",
translated: text,
timestamp: new Date()
})));
// 使用链
const result = await translationChain.invoke({
text: "Hello, world!",
language: "中文"
});
console.log(result);
// {
// original: "Hello, world!",
// translated: "你好,世界!",
// timestamp: 2023-...
// }总结
pipe() 方法的实现是 LangChain V3 中一个精巧而强大的设计。通过方法重载、类型推导、函数包装和序列管理,它成功地模拟了 | 操作符的效果,为开发者提供了流畅而类型安全的链式调用体验。
关键实现要点包括:
- 使用
RunnableSequence管理多个连接的组件 - 通过 TypeScript 泛型实现类型安全和自动推导
- 支持函数和 Runnable 组件作为参数
- 保持所有 Runnable 接口的一致性(invoke, batch, stream)
- 支持错误处理和中间件模式
在下一章中,我们将探讨输入/输出 Schema 的自动推导,了解 LangChain V3 如何利用 TypeScript 泛型和 Zod 风格校验实现类型安全的链式调用。