Skip to content

循环与递归:RunnableLoop 与 Agent 的自我修正机制

ReAct 模式中的 Thought → Action → Observation 循环

在构建智能 Agent 应用时,循环和递归机制是实现自主决策和自我修正的关键。LangChain V3 通过 RunnableLoop 和相关的循环机制,支持实现 ReAct(Reasoning + Acting)模式,使得 Agent 能够在的循环中不断优化其行为。本章将深入探讨循环与递归在 LangChain V3 中的实现和应用。

ReAct 模式简介

ReAct(Reasoning + Acting)是一种结合推理和行动的 Agent 模式,其核心思想是让 Agent 在每一步都进行思考(Thought),然后采取行动(Action),再观察结果(Observation),并根据观察结果调整下一步的行为。

Thought → Action → Observation → Thought → Action → Observation → ...

这种循环模式使得 Agent 能够:

  1. 自主决策 - 根据当前状态决定下一步行动
  2. 自我修正 - 根据观察结果调整策略
  3. 逐步逼近 - 通过多次迭代接近目标

RunnableLoop 的基本实现

RunnableLoop 是 LangChain V3 中实现循环处理的核心组件:

typescript
class RunnableLoop<Input = any, Output = any> extends Runnable<Input, Output> {
  private condition: (input: Input) => boolean | Promise<boolean>;
  private body: Runnable<Input, Input>;
  private maxIterations: number;
  
  constructor(
    condition: (input: Input) => boolean | Promise<boolean>,
    body: Runnable<Input, Input>,
    maxIterations: number = 10
  ) {
    super();
    this.condition = condition;
    this.body = body;
    this.maxIterations = maxIterations;
  }
  
  async invoke(initialInput: Input, options?: RunnableConfig): Promise<Output> {
    let currentInput: any = initialInput;
    let iterations = 0;
    
    while (iterations < this.maxIterations) {
      // 检查循环条件
      const shouldContinue = await this.condition(currentInput);
      if (!shouldContinue) {
        break;
      }
      
      // 执行循环体
      currentInput = await this.body.invoke(currentInput, options);
      iterations++;
    }
    
    return currentInput as Output;
  }
}

Thought-Action-Observation 循环的实现

让我们实现一个完整的 ReAct 模式 Agent:

typescript
// 定义 Agent 的状态
interface AgentState {
  input: string;
  thought: string;
  action: string;
  actionInput: string;
  observation: string;
  finalAnswer?: string;
}

// Thought 步骤:生成思考
const thoughtStep = new PromptTemplate({
  template: `你是一个智能助手,需要解决用户的问题。请思考下一步应该做什么。

问题: {input}
已完成的步骤: {previousSteps}

请提供你的思考过程,以及下一步应该采取的行动。格式如下:
思考: <你的思考过程>
行动: <选择一个行动类型,如 Search, Calculator, Final Answer>
行动输入: <行动需要的参数>

如果问题已经解决,请直接给出最终答案:
思考: 问题已解决
行动: Final Answer
行动输入: <最终答案>`,
  inputVariables: ["input", "previousSteps"]
}).pipe(new ChatOpenAI()).pipe(new StringOutputParser());

// Action 步骤:执行行动
class ActionExecutor extends Runnable<string, string> {
  async invoke(actionDescription: string): Promise<string> {
    // 解析行动描述
    const { action, actionInput } = this.parseAction(actionDescription);
    
    // 根据行动类型执行不同的操作
    switch (action) {
      case 'Search':
        return await this.search(actionInput);
      case 'Calculator':
        return await this.calculate(actionInput);
      case 'Final Answer':
        return actionInput;
      default:
        return `未知行动: ${action}`;
    }
  }
  
  private parseAction(actionDescription: string): { action: string; actionInput: string } {
    // 解析行动描述的逻辑
    const actionMatch = actionDescription.match(/行动:\s*(.*)/);
    const inputMatch = actionDescription.match(/行动输入:\s*(.*)/);
    
    return {
      action: actionMatch ? actionMatch[1].trim() : '',
      actionInput: inputMatch ? inputMatch[1].trim() : ''
    };
  }
  
  private async search(query: string): Promise<string> {
    // 模拟搜索操作
    return `搜索结果: 关于"${query}"的相关信息...`;
  }
  
  private async calculate(expression: string): Promise<string> {
    // 模拟计算操作
    try {
      // 注意:实际应用中应使用安全的表达式求值器
      const result = eval(expression);
      return `计算结果: ${expression} = ${result}`;
    } catch (error) {
      return `计算错误: ${error.message}`;
    }
  }
}

// Observation 步骤:观察结果
const observationStep = new RunnableLambda((actionResult: string) => {
  return `观察结果: ${actionResult}`;
});

// 完整的 ReAct 循环
class ReActAgent extends Runnable<string, string> {
  private thoughtChain: Runnable<any, string>;
  private actionExecutor: ActionExecutor;
  private observationChain: Runnable<string, string>;
  
  constructor() {
    super();
    this.thoughtChain = thoughtStep;
    this.actionExecutor = new ActionExecutor();
    this.observationChain = observationStep;
  }
  
  async invoke(input: string): Promise<string> {
    let state: AgentState = {
      input,
      thought: '',
      action: '',
      actionInput: '',
      observation: ''
    };
    
    let previousSteps = '';
    const maxIterations = 10;
    
    for (let i = 0; i < maxIterations; i++) {
      // Thought 步骤
      const thoughtInput = {
        input: state.input,
        previousSteps
      };
      
      state.thought = await this.thoughtChain.invoke(thoughtInput);
      
      // 检查是否是最终答案
      if (state.thought.includes('Final Answer')) {
        const finalAnswer = state.thought.split('行动输入:')[1]?.trim();
        return finalAnswer || '无法生成最终答案';
      }
      
      // Action 步骤
      const actionResult = await this.actionExecutor.invoke(state.thought);
      
      // Observation 步骤
      state.observation = await this.observationChain.invoke(actionResult);
      
      // 更新 previousSteps
      previousSteps += `
思考: ${state.thought}
观察: ${state.observation}
`;
    }
    
    return '达到最大迭代次数,未能找到解决方案';
  }
}

递归处理的实现

除了循环,LangChain V3 还支持递归处理模式:

typescript
class RecursiveRunnable<Input, Output> extends Runnable<Input, Output> {
  private baseCase: (input: Input) => boolean;
  private recursiveCase: Runnable<Input, Input>;
  private solutionCase: Runnable<Input, Output>;
  private maxDepth: number;
  
  constructor(
    baseCase: (input: Input) => boolean,
    recursiveCase: Runnable<Input, Input>,
    solutionCase: Runnable<Input, Output>,
    maxDepth: number = 5
  ) {
    super();
    this.baseCase = baseCase;
    this.recursiveCase = recursiveCase;
    this.solutionCase = solutionCase;
    this.maxDepth = maxDepth;
  }
  
  async invoke(input: Input, options?: RunnableConfig, depth: number = 0): Promise<Output> {
    // 检查递归深度
    if (depth > this.maxDepth) {
      throw new Error("超过最大递归深度");
    }
    
    // 检查基础情况
    if (await this.baseCase(input)) {
      return await this.solutionCase.invoke(input, options);
    }
    
    // 递归情况
    const nextInput = await this.recursiveCase.invoke(input, options);
    return await this.invoke(nextInput, options, depth + 1);
  }
}

// 示例:递归处理数学表达式
const mathSolver = new RecursiveRunnable(
  // 基础情况:简单表达式
  (input: string) => {
    // 检查是否为简单表达式(如 "2+3")
    return /^[\d+\-*/. ]+$/.test(input) && !input.includes('(');
  },
  
  // 递归情况:简化复杂表达式
  new RunnableLambda((input: string) => {
    // 简化括号表达式
    return input.replace(/\(([^()]+)\)/, (_, expr) => {
      // 这里应该实际计算 expr,简化示例
      return `simplified_${expr}`;
    });
  }),
  
  // 解决方案:计算简单表达式
  new RunnableLambda((input: string) => {
    try {
      return eval(input).toString();
    } catch {
      return "无法计算";
    }
  })
);

与工具调用的集成

循环机制与工具调用紧密结合:

typescript
// 定义工具
interface Tool {
  name: string;
  description: string;
  execute: (input: string) => Promise<string>;
}

class ToolUsingAgent extends Runnable<string, string> {
  private tools: Tool[];
  private maxSteps: number;
  
  constructor(tools: Tool[], maxSteps: number = 5) {
    super();
    this.tools = tools;
    this.maxSteps = maxSteps;
  }
  
  async invoke(input: string): Promise<string> {
    let intermediateSteps: string[] = [];
    let currentInput = input;
    
    for (let step = 0; step < this.maxSteps; step++) {
      // 决定是否使用工具
      const decision = await this.decideOnTool(currentInput, intermediateSteps);
      
      if (decision.toolName === 'Final Answer') {
        return decision.input;
      }
      
      // 执行工具
      const tool = this.tools.find(t => t.name === decision.toolName);
      if (!tool) {
        intermediateSteps.push(`未知工具: ${decision.toolName}`);
        continue;
      }
      
      try {
        const result = await tool.execute(decision.input);
        intermediateSteps.push(`使用工具 ${tool.name}: ${result}`);
        currentInput = result;
      } catch (error) {
        intermediateSteps.push(`工具执行失败 ${tool.name}: ${error.message}`);
      }
    }
    
    return "未能在最大步骤内找到答案";
  }
  
  private async decideOnTool(
    input: string, 
    intermediateSteps: string[]
  ): Promise<{ toolName: string; input: string }> {
    // 使用 LLM 决定下一步使用哪个工具
    const prompt = new PromptTemplate({
      template: `你有以下工具可以使用:
{tools}

之前的步骤:
{intermediateSteps}

当前输入: {input}

请选择下一步使用的工具,或者给出最终答案。格式:
工具: <工具名>
输入: <工具输入>

或者:
工具: Final Answer
输入: <最终答案>`,
      inputVariables: ["tools", "intermediateSteps", "input"]
    });
    
    const toolsDescription = this.tools.map(t => 
      `${t.name}: ${t.description}`
    ).join('\n');
    
    const llm = new ChatOpenAI();
    const result = await prompt
      .pipe(llm)
      .pipe(new StringOutputParser())
      .invoke({
        tools: toolsDescription,
        intermediateSteps: intermediateSteps.join('\n'),
        input
      });
    
    // 解析结果
    const toolMatch = result.match(/工具:\s*(.*)/);
    const inputMatch = result.match(/输入:\s*(.*)/);
    
    return {
      toolName: toolMatch ? toolMatch[1].trim() : 'Final Answer',
      input: inputMatch ? inputMatch[1].trim() : result
    };
  }
}

实际应用示例:数学问题求解器

让我们实现一个完整的数学问题求解器:

typescript
// 定义计算器工具
const calculatorTool: Tool = {
  name: "Calculator",
  description: "用于执行数学计算",
  execute: async (input: string): Promise<string> => {
    try {
      // 安全的数学表达式求值
      const result = Function(`"use strict"; return (${input})`)();
      return result.toString();
    } catch (error) {
      return `计算错误: ${error.message}`;
    }
  }
};

// 定义搜索工具
const searchTool: Tool = {
  name: "Search",
  description: "用于搜索数学公式和概念",
  execute: async (input: string): Promise<string> => {
    // 模拟搜索
    return `搜索到关于"${input}"的信息: 这是一个数学概念...`;
  }
};

// 创建 Agent
const mathSolverAgent = new ToolUsingAgent([
  calculatorTool,
  searchTool
], 8);

// 使用示例
const problem = "计算 (2 + 3) * 4 - 5 的值";
const solution = await mathSolverAgent.invoke(problem);
console.log(solution);

错误处理和终止条件

在循环和递归中,适当的错误处理和终止条件非常重要:

typescript
class SafeLoopAgent extends Runnable<any, any> {
  async invoke(input: any): Promise<any> {
    let currentState = input;
    let iterations = 0;
    const maxIterations = 10;
    const errorCount = 0;
    const maxErrors = 3;
    
    while (iterations < maxIterations && errorCount < maxErrors) {
      try {
        // 执行一步操作
        currentState = await this.executeStep(currentState);
        iterations++;
        
        // 检查终止条件
        if (this.shouldTerminate(currentState)) {
          break;
        }
      } catch (error) {
        errorCount++;
        console.warn(`步骤 ${iterations} 执行出错: ${error.message}`);
        
        if (errorCount >= maxErrors) {
          throw new Error(`连续 ${maxErrors} 次错误,终止执行`);
        }
        
        // 尝试恢复
        currentState = this.recoverFromError(currentState, error);
      }
    }
    
    return currentState;
  }
  
  private async executeStep(state: any): Promise<any> {
    // 执行具体步骤
  }
  
  private shouldTerminate(state: any): boolean {
    // 检查是否应该终止循环
    return false;
  }
  
  private recoverFromError(state: any, error: Error): any {
    // 从错误中恢复
    return state;
  }
}

总结

循环与递归机制是构建智能 Agent 应用的核心技术。LangChain V3 通过 和相关的循环机制,支持实现 ReAct 模式,使得 Agent 能够在 Thought → Action → Observation 的循环中不断优化其行为。

关键要点包括:

  1. ReAct 模式 - 结合推理和行动的智能决策模式
  2. 循环实现 - 通过条件判断和迭代执行实现循环处理
  3. 递归处理 - 支持基于基础情况和递归情况的处理模式
  4. 工具集成 - 与工具调用紧密结合,扩展 Agent 能力
  5. 错误处理 - 适当的错误处理和终止条件确保系统稳定性

通过这些机制,开发者可以构建具有自主决策和自我修正能力的智能 Agent 应用。

在下一章中,我们将探讨 PromptTemplate 的现代化实现,了解它如何从字符串模板演变为 Runnable<string, string>