分支与条件:RunnableBranch 如何实现路由?
在构建复杂的 LLM 应用时,往往需要根据不同条件选择不同的处理路径。LangChain V3 通过 RunnableBranch 提供了强大的分支和条件路由功能,使得开发者能够根据输入或前序输出选择不同的子链。本章将深入探讨 RunnableBranch 的实现原理和应用方式。
分支路由的基本概念
RunnableBranch 是 LangChain V3 中用于实现条件路由的核心组件。它允许开发者定义多个条件和对应的处理链,根据输入或中间结果选择合适的处理路径:
const branch = new RunnableBranch([
// 条件1和对应的处理链
[(input: string) => input.includes("数学"), mathChain],
// 条件2和对应的处理链
[(input: string) => input.includes("历史"), historyChain],
// 默认处理链
[(_) => true, defaultChain]
]);RunnableBranch 的实现原理
RunnableBranch 的核心实现包括条件判断和路由选择:
class RunnableBranch<Input = any, Output = any> extends Runnable<Input, Output> {
private branches: Array<[(input: Input) => boolean, Runnable<Input, Output>]>;
private default?: Runnable<Input, Output>;
constructor(
branches: Array<[(input: Input) => boolean, Runnable<Input, Output>]>,
defaultBranch?: Runnable<Input, Output>
) {
super();
this.branches = branches;
this.default = defaultBranch;
}
async invoke(input: Input, options?: RunnableConfig): Promise<Output> {
// 依次检查每个条件
for (const [condition, chain] of this.branches) {
if (await condition(input)) {
return await chain.invoke(input, options);
}
}
// 如果没有条件匹配,使用默认分支
if (this.default) {
return await this.default.invoke(input, options);
}
throw new Error("没有匹配的分支且未设置默认分支");
}
}基于输入的路由
最常见的分支路由场景是基于输入内容选择不同的处理链:
// 定义不同的处理链
const mathChain = new PromptTemplate({
template: "解决以下数学问题: {question}\n请给出详细的解题步骤。",
inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());
const historyChain = new PromptTemplate({
template: "回答以下历史问题: {question}\n请提供相关的历史背景和事实。",
inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());
const scienceChain = new PromptTemplate({
template: "解释以下科学问题: {question}\n请用通俗易懂的语言说明。",
inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());
// 创建分支路由
const questionRouter = new RunnableBranch([
[(input: { question: string }) => input.question.toLowerCase().includes("计算") ||
input.question.includes("+") ||
input.question.includes("-") ||
input.question.includes("×") ||
input.question.includes("÷"), mathChain],
[(input: { question: string }) => input.question.toLowerCase().includes("历史") ||
input.question.includes("古代") ||
input.question.includes("朝代"), historyChain],
[(input: { question: string }) => input.question.toLowerCase().includes("科学") ||
input.question.includes("物理") ||
input.question.includes("化学") ||
input.question.includes("生物"), scienceChain]
]);
// 使用分支路由
const result = await questionRouter.invoke({
question: "唐朝是哪个世纪建立的?"
});基于前序输出的路由
分支路由也可以基于前序处理的输出结果进行:
// 第一步:分类问题类型
const classifierChain = new PromptTemplate({
template: `请将以下问题分类到最合适的类别中:
1. 数学
2. 历史
3. 科学
4. 其他
问题: {question}
类别:`,
inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());
// 第二步:基于分类结果选择处理链
const categorizedRouter = new RunnableBranch([
[(category: string) => category.toLowerCase().includes("数学"), mathChain],
[(category: string) => category.toLowerCase().includes("历史"), historyChain],
[(category: string) => category.toLowerCase().includes("科学"), scienceChain]
]);
// 组合链:分类 -> 路由 -> 处理
const fullChain = classifierChain.pipe(categorizedRouter);复杂条件的实现
RunnableBranch 支持复杂的条件判断逻辑:
// 复杂条件示例
const complexBranch = new RunnableBranch([
// 多条件组合
[async (input: UserRequest) => {
const user = await getUserInfo(input.userId);
return user.isPremium && input.priority === 'high';
}, premiumChain],
// 异步条件判断
[async (input: UserRequest) => {
const load = await getSystemLoad();
return load < 0.8 && input.complexity === 'high';
}, complexProcessingChain],
// 基于时间的条件
[(input: UserRequest) => {
const hour = new Date().getHours();
return hour >= 9 && hour <= 17 && input.urgent === true;
}, businessHoursChain]
]);嵌套分支路由
分支路由可以嵌套使用,构建更复杂的决策树:
// 第一层分支:按用户类型分类
const userBranch = new RunnableBranch([
[(input: Request) => input.userType === 'admin', adminBranch],
[(input: Request) => input.userType === 'premium', premiumBranch],
[(input: Request) => input.userType === 'basic', basicBranch]
]);
// 管理员分支的子分支
const adminBranch = new RunnableBranch([
[(input: Request) => input.action === 'delete', deleteChain],
[(input: Request) => input.action === 'modify', modifyChain],
[(input: Request) => input.action === 'view', viewChain]
]);
// 高级用户分支的子分支
const premiumBranch = new RunnableBranch([
[(input: Request) => input.resourceType === 'advanced', advancedResourceChain],
[(input: Request) => input.resourceType === 'standard', standardResourceChain]
]);与 LCEL 的集成
RunnableBranch 可以与 LCEL 语法无缝集成:
// 使用 LCEL 语法创建分支
const lcelBranch =
inputParser
.pipe(
new RunnableBranch([
[(data: ParsedInput) => data.type === 'math', mathChain],
[(data: ParsedInput) => data.type === 'history', historyChain],
[(data: ParsedInput) => data.type === 'science', scienceChain]
])
)
.pipe(outputFormatter);错误处理和回退机制
分支路由中可以实现错误处理和回退机制:
class ResilientBranch extends RunnableBranch {
async invoke(input: any, options?: RunnableConfig): Promise<any> {
for (const [condition, chain] of this.branches) {
if (await condition(input)) {
try {
return await chain.invoke(input, options);
} catch (error) {
// 如果当前分支失败,记录日志并尝试下一个分支
console.warn(`分支执行失败: ${error.message}`);
continue;
}
}
}
// 尝试默认分支
if (this.default) {
try {
return await this.default.invoke(input, options);
} catch (error) {
console.error(`默认分支执行失败: ${error.message}`);
}
}
throw new Error("所有分支都执行失败");
}
}实际应用示例:智能客服系统
让我们看一个完整的实际应用示例,展示如何使用 RunnableBranch 构建智能客服系统:
// 定义不同类型问题的处理链
const technicalSupportChain = new PromptTemplate({
template: `您是一名技术支持专家。请帮助用户解决以下技术问题:
用户问题: {question}
解决方案:`,
inputVariables: ["question"]
}).pipe(new ChatOpenAI({ modelName: "gpt-4" })).pipe(new StringOutputParser());
const billingChain = new PromptTemplate({
template: `您是一名财务顾问。请帮助用户解决以下账单问题:
用户问题: {question}
解决方案:`,
inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());
const generalInfoChain = new PromptTemplate({
template: `您是一名客服代表。请回答用户的常见问题:
用户问题: {question}
回答:`,
inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());
// 创建分类器
const classifierChain = new PromptTemplate({
template: `请将以下用户问题分类到最合适的类别中:
类别选项:
1. 技术支持 (technical)
2. 账单问题 (billing)
3. 一般咨询 (general)
用户问题: {question}
请只回答类别名称:`,
inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());
// 创建分支路由
const supportRouter = new RunnableBranch([
[(category: string) => category.toLowerCase().includes("技术") ||
category.toLowerCase().includes("technical"), technicalSupportChain],
[(category: string) => category.toLowerCase().includes("账单") ||
category.toLowerCase().includes("billing"), billingChain],
[(category: string) => category.toLowerCase().includes("一般") ||
category.toLowerCase().includes("general"), generalInfoChain]
]);
// 完整的客服链
const customerServiceChain = classifierChain.pipe(supportRouter);
// 使用示例
const response = await customerServiceChain.invoke({
question: "我无法登录我的账户,总是提示密码错误"
});
console.log(response);性能优化考虑
在实现分支路由时,需要考虑性能优化:
class OptimizedBranch extends RunnableBranch {
private conditionCache = new Map();
async invoke(input: any, options?: RunnableConfig): Promise<any> {
// 缓存条件判断结果以提高性能
const cacheKey = this.getCacheKey(input);
if (this.conditionCache.has(cacheKey)) {
const cachedResult = this.conditionCache.get(cacheKey);
return await cachedResult.chain.invoke(input, options);
}
for (const [condition, chain] of this.branches) {
if (await condition(input)) {
// 缓存结果
this.conditionCache.set(cacheKey, { chain, timestamp: Date.now() });
return await chain.invoke(input, options);
}
}
if (this.default) {
return await this.default.invoke(input, options);
}
throw new Error("没有匹配的分支且未设置默认分支");
}
private getCacheKey(input: any): string {
// 根据输入生成缓存键
return JSON.stringify(input);
}
}总结
RunnableBranch 是 LangChain V3 中实现条件路由和分支处理的核心组件。它通过灵活的条件判断机制,使得开发者能够根据不同输入或中间结果选择不同的处理链。
关键特性包括:
- 灵活的条件定义 - 支持同步和异步条件判断
- 多层嵌套 - 可以构建复杂的决策树
- 与 LCEL 集成 - 可以与管道语法无缝结合
- 错误处理 - 支持错误处理和回退机制
- 性能优化 - 可以通过缓存等技术优化性能
通过 RunnableBranch,开发者可以构建更加智能化和自适应的 LLM 应用,根据不同的上下文选择最合适的处理方式。
在下一章中,我们将探讨循环与递归:RunnableLoop 与 Agent 的自我修正机制,了解如何实现 ReAct 模式中的 Thought → Action → Observation 循环。