Skip to content

LangGraph:用有向图构建状态化 Agent

Runnable 连接成图,支持循环、条件跳转

LangGraph 是 LangChain 生态系统中的一个重要组件,它允许开发者使用有向图来构建复杂的 AI Agent。通过将不同的 组件连接成图结构,LangGraph 支持循环、条件跳转等高级功能,使得构建状态化 Agent 变得更加直观和强大。本章将深入探讨 LangGraph 的设计和实现。

LangGraph 的基本概念

LangGraph 使用有向图来表示 Agent 的执行流程:

typescript
// 节点接口
interface Node {
  id: string;
  runnable: Runnable<any, any>;
  metadata?: Record<string, any>;
}

// 边接口
interface Edge {
  source: string;
  target: string;
  condition?: (state: any) => boolean;
  metadata?: Record<string, any>;
}

// 图状态
interface GraphState {
  [key: string]: any;
}

// 消息接口
interface Message {
  role: string;
  content: string;
  timestamp?: number;
}

// LangGraph 基础类
class LangGraph {
  private nodes: Map<string, Node>;
  private edges: Edge[];
  private initialState: GraphState;
  private entryPoint: string;
  private finishPoints: Set<string>;
  
  constructor(initialState: GraphState = {}) {
    this.nodes = new Map();
    this.edges = [];
    this.initialState = initialState;
    this.entryPoint = '';
    this.finishPoints = new Set();
  }
  
  // 添加节点
  addNode(id: string, runnable: Runnable<any, any>): this {
    this.nodes.set(id, { id, runnable });
    return this;
  }
  
  // 添加边
  addEdge(source: string, target: string): this {
    this.edges.push({ source, target });
    return this;
  }
  
  // 添加条件边
  addConditionalEdges(
    source: string,
    conditionalEdges: Record<string, string>
  ): this {
    // 实现条件边逻辑
    for (const [condition, target] of Object.entries(conditionalEdges)) {
      this.edges.push({
        source,
        target,
        condition: (state: any) => {
          // 简化实现,实际应用中需要更复杂的条件判断
          return state.next === condition;
        }
      });
    }
    return this;
  }
  
  // 设置入口点
  setEntryPoint(nodeId: string): this {
    this.entryPoint = nodeId;
    return this;
  }
  
  // 设置结束点
  setFinishPoint(nodeId: string): this {
    this.finishPoints.add(nodeId);
    return this;
  }
  
  // 编译图
  compile(): CompiledGraph {
    if (!this.entryPoint) {
      throw new Error('必须设置入口点');
    }
    
    return new CompiledGraph(
      this.nodes,
      this.edges,
      this.initialState,
      this.entryPoint,
      this.finishPoints
    );
  }
}

编译后的图执行器

实现编译后的图执行器:

typescript
// 编译后的图
class CompiledGraph extends Runnable<GraphState, GraphState> {
  private nodes: Map<string, Node>;
  private edges: Edge[];
  private initialState: GraphState;
  private entryPoint: string;
  private finishPoints: Set<string>;
  
  constructor(
    nodes: Map<string, Node>,
    edges: Edge[],
    initialState: GraphState,
    entryPoint: string,
    finishPoints: Set<string>
  ) {
    super();
    this.nodes = nodes;
    this.edges = edges;
    this.initialState = initialState;
    this.entryPoint = entryPoint;
    this.finishPoints = finishPoints;
  }
  
  async invoke(input: GraphState): Promise<GraphState> {
    let currentState = { ...this.initialState, ...input };
    let currentNodeId = this.entryPoint;
    
    while (!this.finishPoints.has(currentNodeId)) {
      // 获取当前节点
      const node = this.nodes.get(currentNodeId);
      if (!node) {
        throw new Error(`节点 "${currentNodeId}" 不存在`);
      }
      
      // 执行节点
      console.log(`执行节点: ${currentNodeId}`);
      const nodeResult = await node.runnable.invoke(currentState);
      
      // 更新状态
      currentState = { ...currentState, ...nodeResult, lastNode: currentNodeId };
      
      // 确定下一个节点
      currentNodeId = this.getNextNodeId(currentNodeId, currentState);
      
      if (!currentNodeId) {
        throw new Error(`节点 "${currentNodeId}" 没有出边`);
      }
    }
    
    return currentState;
  }
  
  private getNextNodeId(currentNodeId: string, state: GraphState): string {
    // 查找所有从当前节点出发的边
    const outgoingEdges = this.edges.filter(edge => edge.source === currentNodeId);
    
    if (outgoingEdges.length === 0) {
      throw new Error(`节点 "${currentNodeId}" 没有出边`);
    }
    
    // 查找满足条件的边
    for (const edge of outgoingEdges) {
      if (!edge.condition || edge.condition(state)) {
        return edge.target;
      }
    }
    
    // 如果没有满足条件的边,返回第一条边的目标节点
    return outgoingEdges[0].target;
  }
  
  // 获取图的可视化表示
  getGraphVisualization(): string {
    let graphViz = 'digraph G {\n';
    
    // 添加节点
    for (const [id, node] of this.nodes.entries()) {
      const label = node.metadata?.label || id;
      const shape = this.finishPoints.has(id) ? 'doublecircle' : 'circle';
      graphViz += `  "${id}" [label="${label}", shape=${shape}];\n`;
    }
    
    // 添加边
    for (const edge of this.edges) {
      const label = edge.condition ? 'condition' : '';
      graphViz += `  "${edge.source}" -> "${edge.target}" [label="${label}"];\n`;
    }
    
    graphViz += '}';
    return graphViz;
  }
}

状态管理器

实现状态管理功能:

typescript
// 状态管理器
class StateManager {
  private state: GraphState;
  private history: GraphState[];
  private maxSize: number;
  
  constructor(initialState: GraphState = {}, maxSize: number = 100) {
    this.state = initialState;
    this.history = [];
    this.maxSize = maxSize;
  }
  
  getState(): GraphState {
    return { ...this.state };
  }
  
  updateState(newState: GraphState): void {
    // 保存历史状态
    this.history.push({ ...this.state });
    
    // 限制历史记录大小
    if (this.history.length > this.maxSize) {
      this.history.shift();
    }
    
    // 更新当前状态
    this.state = { ...this.state, ...newState };
  }
  
  getHistory(): GraphState[] {
    return [...this.history];
  }
  
  rollback(steps: number = 1): void {
    if (steps > this.history.length) {
      throw new Error('无法回滚到指定步骤');
    }
    
    for (let i = 0; i < steps; i++) {
      this.state = this.history.pop()!;
    }
  }
  
  reset(): void {
    this.state = {};
    this.history = [];
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何使用 LangGraph 构建状态化 Agent:

typescript
// 节点实现示例
class InputNode extends Runnable<any, { input: string }> {
  async invoke(state: any): Promise<{ input: string }> {
    // 在实际应用中,这里可能从用户获取输入
    return { input: state.input || '默认输入' };
  }
}

class ReasoningNode extends Runnable<{ input: string }, { reasoning: string }> {
  private llm: BaseLanguageModel;
  
  constructor(llm: BaseLanguageModel) {
    super();
    this.llm = llm;
  }
  
  async invoke(state: { input: string }): Promise<{ reasoning: string }> {
    const prompt = new PromptTemplate({
      template: `分析以下输入并提供推理步骤:
      
输入: {input}

推理:`,
      inputVariables: ["input"]
    });
    
    const result = await prompt
      .pipe(this.llm)
      .pipe(new StringOutputParser())
      .invoke({ input: state.input });
    
    return { reasoning: result };
  }
}

class ActionNode extends Runnable<{ reasoning: string }, { action: string }> {
  private llm: BaseLanguageModel;
  
  constructor(llm: BaseLanguageModel) {
    super();
    this.llm = llm;
  }
  
  async invoke(state: { reasoning: string }): Promise<{ action: string }> {
    const prompt = new PromptTemplate({
      template: `基于以下推理,决定采取什么行动:
      
推理: {reasoning}

行动选项:
1. search - 搜索更多信息
2. calculate - 进行计算
3. respond - 直接回答

选择的行动:`,
      inputVariables: ["reasoning"]
    });
    
    const result = await prompt
      .pipe(this.llm)
      .pipe(new StringOutputParser())
      .invoke({ reasoning: state.reasoning });
    
    return { action: result.trim().toLowerCase() };
  }
}

class SearchNode extends Runnable<{ input: string }, { searchResult: string }> {
  async invoke(state: { input: string }): Promise<{ searchResult: string }> {
    // 模拟搜索
    return { 
      searchResult: `关于 "${state.input}" 的搜索结果...` 
    };
  }
}

class CalculateNode extends Runnable<{ input: string }, { calculation: string }> {
  async invoke(state: { input: string }): Promise<{ calculation: string }> {
    // 模拟计算
    return { 
      calculation: `计算 "${state.input}" 的结果是 42` 
    };
  }
}

class ResponseNode extends Runnable<
  { input: string; reasoning?: string; searchResult?: string; calculation?: string }, 
  { response: string }
> {
  private llm: BaseLanguageModel;
  
  constructor(llm: BaseLanguageModel) {
    super();
    this.llm = llm;
  }
  
  async invoke(state: any): Promise<{ response: string }> {
    let context = `原始输入: ${state.input}\n`;
    
    if (state.reasoning) {
      context += `推理: ${state.reasoning}\n`;
    }
    
    if (state.searchResult) {
      context += `搜索结果: ${state.searchResult}\n`;
    }
    
    if (state.calculation) {
      context += `计算结果: ${state.calculation}\n`;
    }
    
    const prompt = new PromptTemplate({
      template: `基于以下上下文提供最终回答:
      
{context}

最终回答:`,
      inputVariables: ["context"]
    });
    
    const result = await prompt
      .pipe(this.llm)
      .pipe(new StringOutputParser())
      .invoke({ context });
    
    return { response: result };
  }
}

// LangGraph 应用示例
class LangGraphDemo {
  private graph: CompiledGraph;
  
  constructor() {
    this.graph = this.createGraph();
  }
  
  private createGraph(): CompiledGraph {
    const llm = new ChatOpenAI({ modelName: 'gpt-3.5-turbo' });
    
    const graph = new LangGraph({
      messages: [],
      next: 'input'
    });
    
    // 添加节点
    graph
      .addNode('input', new InputNode())
      .addNode('reasoning', new ReasoningNode(llm))
      .addNode('action', new ActionNode(llm))
      .addNode('search', new SearchNode())
      .addNode('calculate', new CalculateNode())
      .addNode('respond', new ResponseNode(llm))
      .addNode('end', new RunnableLambda((state: any) => state));
    
    // 添加边
    graph
      .addEdge('input', 'reasoning')
      .addEdge('reasoning', 'action')
      .addConditionalEdges('action', {
        'search': 'search',
        'calculate': 'calculate',
        'respond': 'respond'
      })
      .addEdge('search', 'respond')
      .addEdge('calculate', 'respond')
      .addEdge('respond', 'end');
    
    // 设置入口和出口点
    graph
      .setEntryPoint('input')
      .setFinishPoint('end');
    
    return graph.compile();
  }
  
  async runDemo(input: string): Promise<void> {
    console.log('=== LangGraph 演示 ===\n');
    console.log(`输入: ${input}\n`);
    
    try {
      const result = await this.graph.invoke({ input });
      console.log('最终结果:');
      console.log(result);
      
      // 显示图的可视化
      console.log('\n图结构:');
      console.log(this.graph.getGraphVisualization());
    } catch (error) {
      console.error('执行错误:', error.message);
    }
  }
}

// 更复杂的示例:客户服务 Agent
class CustomerServiceAgent {
  private graph: CompiledGraph;
  
  constructor() {
    this.graph = this.createCustomerServiceGraph();
  }
  
  private createCustomerServiceGraph(): CompiledGraph {
    const llm = new ChatOpenAI({ modelName: 'gpt-3.5-turbo' });
    
    const graph = new LangGraph({
      customerQuery: '',
      customerInfo: {},
      conversationHistory: [],
      intent: '',
      nextAction: ''
    });
    
    // 定义节点
    graph
      .addNode('input_handler', new RunnableLambda(async (state: any) => {
        console.log('处理客户输入...');
        return {
          customerQuery: state.input,
          conversationHistory: [...state.conversationHistory, {
            role: 'customer',
            content: state.input,
            timestamp: Date.now()
          }]
        };
      }))
      .addNode('intent_classifier', new RunnableLambda(async (state: any) => {
        console.log('分类客户意图...');
        // 简化实现,实际应用中会使用 LLM 进行意图分类
        const intents = ['billing', 'technical', 'general'];
        const intent = intents[Math.floor(Math.random() * intents.length)];
        return { intent };
      }))
      .addNode('billing_handler', new RunnableLambda(async (state: any) => {
        console.log('处理账单问题...');
        return {
          response: '您的账单问题已记录,我们的财务团队将在24小时内回复您。',
          nextAction: 'follow_up'
        };
      }))
      .addNode('technical_handler', new RunnableLambda(async (state: any) => {
        console.log('处理技术问题...');
        return {
          response: '技术团队已收到您的问题,我们将尽快为您提供解决方案。',
          nextAction: 'follow_up'
        };
      }))
      .addNode('general_handler', new RunnableLambda(async (state: any) => {
        console.log('处理一般咨询...');
        return {
          response: '感谢您的咨询,我们已记录您的问题并将尽快回复。',
          nextAction: 'follow_up'
        };
      }))
      .addNode('follow_up', new RunnableLambda(async (state: any) => {
        console.log('发送后续跟进...');
        return {
          followUpMessage: '请问还有其他我可以帮助您的吗?',
          status: 'completed'
        };
      }))
      .addNode('end', new RunnableLambda((state: any) => state));
    
    // 定义边
    graph
      .addEdge('input_handler', 'intent_classifier')
      .addConditionalEdges('intent_classifier', {
        'billing': 'billing_handler',
        'technical': 'technical_handler',
        'general': 'general_handler'
      })
      .addEdge('billing_handler', 'follow_up')
      .addEdge('technical_handler', 'follow_up')
      .addEdge('general_handler', 'follow_up')
      .addEdge('follow_up', 'end');
    
    // 设置入口和出口
    graph
      .setEntryPoint('input_handler')
      .setFinishPoint('end');
    
    return graph.compile();
  }
  
  async handleCustomerQuery(query: string): Promise<any> {
    console.log('=== 客户服务 Agent 演示 ===\n');
    console.log(`客户查询: ${query}\n`);
    
    try {
      const result = await this.graph.invoke({ input: query });
      console.log('处理结果:');
      console.log(JSON.stringify(result, null, 2));
      return result;
    } catch (error) {
      console.error('处理错误:', error.message);
      return { error: error.message };
    }
  }
}

// 状态机 Agent
class StateMachineAgent {
  private graph: CompiledGraph;
  
  constructor() {
    this.graph = this.createStateMachineGraph();
  }
  
  private createStateMachineGraph(): CompiledGraph {
    const graph = new LangGraph({
      state: 'idle',
      task: '',
      progress: 0
    });
    
    graph
      .addNode('idle', new RunnableLambda(async (state: any) => {
        console.log('空闲状态,等待任务...');
        return { state: 'waiting_for_task' };
      }))
      .addNode('waiting_for_task', new RunnableLambda(async (state: any) => {
        console.log('等待任务分配...');
        // 模拟任务分配
        return { 
          state: 'task_assigned',
          task: '数据处理任务',
          progress: 0
        };
      }))
      .addNode('task_assigned', new RunnableLambda(async (state: any) => {
        console.log(`任务已分配: ${state.task}`);
        return { state: 'processing' };
      }))
      .addNode('processing', new RunnableLambda(async (state: any) => {
        console.log('处理任务中...');
        // 模拟处理进度
        const progress = Math.min(100, state.progress + 25);
        return { 
          progress,
          state: progress >= 100 ? 'completed' : 'processing'
        };
      }))
      .addNode('completed', new RunnableLambda(async (state: any) => {
        console.log('任务完成!');
        return { state: 'idle' };
      }));
    
    graph
      .addEdge('idle', 'waiting_for_task')
      .addEdge('waiting_for_task', 'task_assigned')
      .addEdge('task_assigned', 'processing')
      .addConditionalEdges('processing', {
        'processing': 'processing',
        'completed': 'completed'
      })
      .addEdge('completed', 'idle');
    
    graph
      .setEntryPoint('idle')
      .setFinishPoint('idle'); // 循环图,没有真正的终点
    
    return graph.compile();
  }
  
  async runStateMachine(): Promise<void> {
    console.log('=== 状态机 Agent 演示 ===\n');
    
    // 运行几步来演示状态转换
    let state = {};
    for (let i = 0; i < 8; i++) {
      state = await this.graph.invoke(state);
      console.log(`步骤 ${i + 1} 状态:`, state);
      // 添加延迟以便观察
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  }
}

// 运行演示
async function runLangGraphDemos() {
  console.log('🚀 LangGraph 演示\n');
  
  // 基础演示
  const basicDemo = new LangGraphDemo();
  await basicDemo.runDemo('什么是 LangGraph?');
  
  console.log('\n' + '='.repeat(50) + '\n');
  
  // 客户服务演示
  const customerService = new CustomerServiceAgent();
  await customerService.handleCustomerQuery('我的账单有问题');
  
  console.log('\n' + '='.repeat(50) + '\n');
  
  // 状态机演示
  const stateMachine = new StateMachineAgent();
  await stateMachine.runStateMachine();
}

// 图形可视化工具
class GraphVisualizer {
  static toMermaid(graph: CompiledGraph): string {
    let mermaid = 'graph TD\n';
    
    // 添加节点
    // 这里需要访问 CompiledGraph 的内部结构
    // 简化实现,实际应用中需要更完整的访问方法
    
    return mermaid;
  }
}

// 如果直接运行此文件,则执行演示
if (require.main === module) {
  runLangGraphDemos().catch(console.error);
}

总结

LangGraph 通过有向图结构为构建复杂的 AI Agent 提供了强大的支持:

  1. 图形化设计 - 使用节点和边直观地表示 Agent 的执行流程
  2. 状态管理 - 内置状态管理机制,支持复杂的状态转换
  3. 条件跳转 - 支持基于条件的分支和跳转
  4. 循环支持 - 可以构建循环执行的 Agent
  5. 模块化设计 - 每个节点都是独立的 组件,便于重用和测试
  6. 可视化 - 支持图结构的可视化表示

LangGraph 使得构建复杂的、状态化的 AI Agent 变得更加直观和可维护,为开发高级 AI 应用提供了坚实的基础。

在下一章中,我们将探讨与 DevOps 集成:CI/CD 流程中测试 Prompt 变更的影响,了解如何在软件开发生命周期中管理 AI 应用。