Skip to content

分支与条件:RunnableBranch 如何实现路由?

在构建复杂的 LLM 应用时,往往需要根据不同条件选择不同的处理路径。LangChain V3 通过 RunnableBranch 提供了强大的分支和条件路由功能,使得开发者能够根据输入或前序输出选择不同的子链。本章将深入探讨 RunnableBranch 的实现原理和应用方式。

分支路由的基本概念

RunnableBranch 是 LangChain V3 中用于实现条件路由的核心组件。它允许开发者定义多个条件和对应的处理链,根据输入或中间结果选择合适的处理路径:

typescript
const branch = new RunnableBranch([
  // 条件1和对应的处理链
  [(input: string) => input.includes("数学"), mathChain],
  // 条件2和对应的处理链
  [(input: string) => input.includes("历史"), historyChain],
  // 默认处理链
  [(_) => true, defaultChain]
]);

RunnableBranch 的实现原理

RunnableBranch 的核心实现包括条件判断和路由选择:

typescript
class RunnableBranch<Input = any, Output = any> extends Runnable<Input, Output> {
  private branches: Array<[(input: Input) => boolean, Runnable<Input, Output>]>;
  private default?: Runnable<Input, Output>;
  
  constructor(
    branches: Array<[(input: Input) => boolean, Runnable<Input, Output>]>,
    defaultBranch?: Runnable<Input, Output>
  ) {
    super();
    this.branches = branches;
    this.default = defaultBranch;
  }
  
  async invoke(input: Input, options?: RunnableConfig): Promise<Output> {
    // 依次检查每个条件
    for (const [condition, chain] of this.branches) {
      if (await condition(input)) {
        return await chain.invoke(input, options);
      }
    }
    
    // 如果没有条件匹配,使用默认分支
    if (this.default) {
      return await this.default.invoke(input, options);
    }
    
    throw new Error("没有匹配的分支且未设置默认分支");
  }
}

基于输入的路由

最常见的分支路由场景是基于输入内容选择不同的处理链:

typescript
// 定义不同的处理链
const mathChain = new PromptTemplate({
  template: "解决以下数学问题: {question}\n请给出详细的解题步骤。",
  inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());

const historyChain = new PromptTemplate({
  template: "回答以下历史问题: {question}\n请提供相关的历史背景和事实。",
  inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());

const scienceChain = new PromptTemplate({
  template: "解释以下科学问题: {question}\n请用通俗易懂的语言说明。",
  inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());

// 创建分支路由
const questionRouter = new RunnableBranch([
  [(input: { question: string }) => input.question.toLowerCase().includes("计算") || 
                                   input.question.includes("+") || 
                                   input.question.includes("-") ||
                                   input.question.includes("×") ||
                                   input.question.includes("÷"), mathChain],
  
  [(input: { question: string }) => input.question.toLowerCase().includes("历史") ||
                                   input.question.includes("古代") ||
                                   input.question.includes("朝代"), historyChain],
  
  [(input: { question: string }) => input.question.toLowerCase().includes("科学") ||
                                   input.question.includes("物理") ||
                                   input.question.includes("化学") ||
                                   input.question.includes("生物"), scienceChain]
]);

// 使用分支路由
const result = await questionRouter.invoke({
  question: "唐朝是哪个世纪建立的?"
});

基于前序输出的路由

分支路由也可以基于前序处理的输出结果进行:

typescript
// 第一步:分类问题类型
const classifierChain = new PromptTemplate({
  template: `请将以下问题分类到最合适的类别中:
1. 数学
2. 历史
3. 科学
4. 其他

问题: {question}
类别:`,
  inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());

// 第二步:基于分类结果选择处理链
const categorizedRouter = new RunnableBranch([
  [(category: string) => category.toLowerCase().includes("数学"), mathChain],
  [(category: string) => category.toLowerCase().includes("历史"), historyChain],
  [(category: string) => category.toLowerCase().includes("科学"), scienceChain]
]);

// 组合链:分类 -> 路由 -> 处理
const fullChain = classifierChain.pipe(categorizedRouter);

复杂条件的实现

RunnableBranch 支持复杂的条件判断逻辑:

typescript
// 复杂条件示例
const complexBranch = new RunnableBranch([
  // 多条件组合
  [async (input: UserRequest) => {
    const user = await getUserInfo(input.userId);
    return user.isPremium && input.priority === 'high';
  }, premiumChain],
  
  // 异步条件判断
  [async (input: UserRequest) => {
    const load = await getSystemLoad();
    return load < 0.8 && input.complexity === 'high';
  }, complexProcessingChain],
  
  // 基于时间的条件
  [(input: UserRequest) => {
    const hour = new Date().getHours();
    return hour >= 9 && hour <= 17 && input.urgent === true;
  }, businessHoursChain]
]);

嵌套分支路由

分支路由可以嵌套使用,构建更复杂的决策树:

typescript
// 第一层分支:按用户类型分类
const userBranch = new RunnableBranch([
  [(input: Request) => input.userType === 'admin', adminBranch],
  [(input: Request) => input.userType === 'premium', premiumBranch],
  [(input: Request) => input.userType === 'basic', basicBranch]
]);

// 管理员分支的子分支
const adminBranch = new RunnableBranch([
  [(input: Request) => input.action === 'delete', deleteChain],
  [(input: Request) => input.action === 'modify', modifyChain],
  [(input: Request) => input.action === 'view', viewChain]
]);

// 高级用户分支的子分支
const premiumBranch = new RunnableBranch([
  [(input: Request) => input.resourceType === 'advanced', advancedResourceChain],
  [(input: Request) => input.resourceType === 'standard', standardResourceChain]
]);

与 LCEL 的集成

RunnableBranch 可以与 LCEL 语法无缝集成:

typescript
// 使用 LCEL 语法创建分支
const lcelBranch = 
  inputParser
  .pipe(
    new RunnableBranch([
      [(data: ParsedInput) => data.type === 'math', mathChain],
      [(data: ParsedInput) => data.type === 'history', historyChain],
      [(data: ParsedInput) => data.type === 'science', scienceChain]
    ])
  )
  .pipe(outputFormatter);

错误处理和回退机制

分支路由中可以实现错误处理和回退机制:

typescript
class ResilientBranch extends RunnableBranch {
  async invoke(input: any, options?: RunnableConfig): Promise<any> {
    for (const [condition, chain] of this.branches) {
      if (await condition(input)) {
        try {
          return await chain.invoke(input, options);
        } catch (error) {
          // 如果当前分支失败,记录日志并尝试下一个分支
          console.warn(`分支执行失败: ${error.message}`);
          continue;
        }
      }
    }
    
    // 尝试默认分支
    if (this.default) {
      try {
        return await this.default.invoke(input, options);
      } catch (error) {
        console.error(`默认分支执行失败: ${error.message}`);
      }
    }
    
    throw new Error("所有分支都执行失败");
  }
}

实际应用示例:智能客服系统

让我们看一个完整的实际应用示例,展示如何使用 RunnableBranch 构建智能客服系统:

typescript
// 定义不同类型问题的处理链
const technicalSupportChain = new PromptTemplate({
  template: `您是一名技术支持专家。请帮助用户解决以下技术问题:
用户问题: {question}
解决方案:`,
  inputVariables: ["question"]
}).pipe(new ChatOpenAI({ modelName: "gpt-4" })).pipe(new StringOutputParser());

const billingChain = new PromptTemplate({
  template: `您是一名财务顾问。请帮助用户解决以下账单问题:
用户问题: {question}
解决方案:`,
  inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());

const generalInfoChain = new PromptTemplate({
  template: `您是一名客服代表。请回答用户的常见问题:
用户问题: {question}
回答:`,
  inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());

// 创建分类器
const classifierChain = new PromptTemplate({
  template: `请将以下用户问题分类到最合适的类别中:
类别选项:
1. 技术支持 (technical)
2. 账单问题 (billing)
3. 一般咨询 (general)

用户问题: {question}

请只回答类别名称:`,
  inputVariables: ["question"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());

// 创建分支路由
const supportRouter = new RunnableBranch([
  [(category: string) => category.toLowerCase().includes("技术") || 
                         category.toLowerCase().includes("technical"), technicalSupportChain],
  
  [(category: string) => category.toLowerCase().includes("账单") || 
                         category.toLowerCase().includes("billing"), billingChain],
  
  [(category: string) => category.toLowerCase().includes("一般") || 
                         category.toLowerCase().includes("general"), generalInfoChain]
]);

// 完整的客服链
const customerServiceChain = classifierChain.pipe(supportRouter);

// 使用示例
const response = await customerServiceChain.invoke({
  question: "我无法登录我的账户,总是提示密码错误"
});

console.log(response);

性能优化考虑

在实现分支路由时,需要考虑性能优化:

typescript
class OptimizedBranch extends RunnableBranch {
  private conditionCache = new Map();
  
  async invoke(input: any, options?: RunnableConfig): Promise<any> {
    // 缓存条件判断结果以提高性能
    const cacheKey = this.getCacheKey(input);
    if (this.conditionCache.has(cacheKey)) {
      const cachedResult = this.conditionCache.get(cacheKey);
      return await cachedResult.chain.invoke(input, options);
    }
    
    for (const [condition, chain] of this.branches) {
      if (await condition(input)) {
        // 缓存结果
        this.conditionCache.set(cacheKey, { chain, timestamp: Date.now() });
        return await chain.invoke(input, options);
      }
    }
    
    if (this.default) {
      return await this.default.invoke(input, options);
    }
    
    throw new Error("没有匹配的分支且未设置默认分支");
  }
  
  private getCacheKey(input: any): string {
    // 根据输入生成缓存键
    return JSON.stringify(input);
  }
}

总结

RunnableBranch 是 LangChain V3 中实现条件路由和分支处理的核心组件。它通过灵活的条件判断机制,使得开发者能够根据不同输入或中间结果选择不同的处理链。

关键特性包括:

  1. 灵活的条件定义 - 支持同步和异步条件判断
  2. 多层嵌套 - 可以构建复杂的决策树
  3. 与 LCEL 集成 - 可以与管道语法无缝结合
  4. 错误处理 - 支持错误处理和回退机制
  5. 性能优化 - 可以通过缓存等技术优化性能

通过 RunnableBranch,开发者可以构建更加智能化和自适应的 LLM 应用,根据不同的上下文选择最合适的处理方式。

在下一章中,我们将探讨循环与递归:RunnableLoop 与 Agent 的自我修正机制,了解如何实现 ReAct 模式中的 Thought → Action → Observation 循环。