LangGraph:用有向图构建状态化 Agent
将 Runnable 连接成图,支持循环、条件跳转
LangGraph 是 LangChain 生态系统中的一个重要组件,它允许开发者使用有向图来构建复杂的 AI Agent。通过将不同的 组件连接成图结构,LangGraph 支持循环、条件跳转等高级功能,使得构建状态化 Agent 变得更加直观和强大。本章将深入探讨 LangGraph 的设计和实现。
LangGraph 的基本概念
LangGraph 使用有向图来表示 Agent 的执行流程:
typescript
// 节点接口
interface Node {
id: string;
runnable: Runnable<any, any>;
metadata?: Record<string, any>;
}
// 边接口
interface Edge {
source: string;
target: string;
condition?: (state: any) => boolean;
metadata?: Record<string, any>;
}
// 图状态
interface GraphState {
[key: string]: any;
}
// 消息接口
interface Message {
role: string;
content: string;
timestamp?: number;
}
// LangGraph 基础类
class LangGraph {
private nodes: Map<string, Node>;
private edges: Edge[];
private initialState: GraphState;
private entryPoint: string;
private finishPoints: Set<string>;
constructor(initialState: GraphState = {}) {
this.nodes = new Map();
this.edges = [];
this.initialState = initialState;
this.entryPoint = '';
this.finishPoints = new Set();
}
// 添加节点
addNode(id: string, runnable: Runnable<any, any>): this {
this.nodes.set(id, { id, runnable });
return this;
}
// 添加边
addEdge(source: string, target: string): this {
this.edges.push({ source, target });
return this;
}
// 添加条件边
addConditionalEdges(
source: string,
conditionalEdges: Record<string, string>
): this {
// 实现条件边逻辑
for (const [condition, target] of Object.entries(conditionalEdges)) {
this.edges.push({
source,
target,
condition: (state: any) => {
// 简化实现,实际应用中需要更复杂的条件判断
return state.next === condition;
}
});
}
return this;
}
// 设置入口点
setEntryPoint(nodeId: string): this {
this.entryPoint = nodeId;
return this;
}
// 设置结束点
setFinishPoint(nodeId: string): this {
this.finishPoints.add(nodeId);
return this;
}
// 编译图
compile(): CompiledGraph {
if (!this.entryPoint) {
throw new Error('必须设置入口点');
}
return new CompiledGraph(
this.nodes,
this.edges,
this.initialState,
this.entryPoint,
this.finishPoints
);
}
}编译后的图执行器
实现编译后的图执行器:
typescript
// 编译后的图
class CompiledGraph extends Runnable<GraphState, GraphState> {
private nodes: Map<string, Node>;
private edges: Edge[];
private initialState: GraphState;
private entryPoint: string;
private finishPoints: Set<string>;
constructor(
nodes: Map<string, Node>,
edges: Edge[],
initialState: GraphState,
entryPoint: string,
finishPoints: Set<string>
) {
super();
this.nodes = nodes;
this.edges = edges;
this.initialState = initialState;
this.entryPoint = entryPoint;
this.finishPoints = finishPoints;
}
async invoke(input: GraphState): Promise<GraphState> {
let currentState = { ...this.initialState, ...input };
let currentNodeId = this.entryPoint;
while (!this.finishPoints.has(currentNodeId)) {
// 获取当前节点
const node = this.nodes.get(currentNodeId);
if (!node) {
throw new Error(`节点 "${currentNodeId}" 不存在`);
}
// 执行节点
console.log(`执行节点: ${currentNodeId}`);
const nodeResult = await node.runnable.invoke(currentState);
// 更新状态
currentState = { ...currentState, ...nodeResult, lastNode: currentNodeId };
// 确定下一个节点
currentNodeId = this.getNextNodeId(currentNodeId, currentState);
if (!currentNodeId) {
throw new Error(`节点 "${currentNodeId}" 没有出边`);
}
}
return currentState;
}
private getNextNodeId(currentNodeId: string, state: GraphState): string {
// 查找所有从当前节点出发的边
const outgoingEdges = this.edges.filter(edge => edge.source === currentNodeId);
if (outgoingEdges.length === 0) {
throw new Error(`节点 "${currentNodeId}" 没有出边`);
}
// 查找满足条件的边
for (const edge of outgoingEdges) {
if (!edge.condition || edge.condition(state)) {
return edge.target;
}
}
// 如果没有满足条件的边,返回第一条边的目标节点
return outgoingEdges[0].target;
}
// 获取图的可视化表示
getGraphVisualization(): string {
let graphViz = 'digraph G {\n';
// 添加节点
for (const [id, node] of this.nodes.entries()) {
const label = node.metadata?.label || id;
const shape = this.finishPoints.has(id) ? 'doublecircle' : 'circle';
graphViz += ` "${id}" [label="${label}", shape=${shape}];\n`;
}
// 添加边
for (const edge of this.edges) {
const label = edge.condition ? 'condition' : '';
graphViz += ` "${edge.source}" -> "${edge.target}" [label="${label}"];\n`;
}
graphViz += '}';
return graphViz;
}
}状态管理器
实现状态管理功能:
typescript
// 状态管理器
class StateManager {
private state: GraphState;
private history: GraphState[];
private maxSize: number;
constructor(initialState: GraphState = {}, maxSize: number = 100) {
this.state = initialState;
this.history = [];
this.maxSize = maxSize;
}
getState(): GraphState {
return { ...this.state };
}
updateState(newState: GraphState): void {
// 保存历史状态
this.history.push({ ...this.state });
// 限制历史记录大小
if (this.history.length > this.maxSize) {
this.history.shift();
}
// 更新当前状态
this.state = { ...this.state, ...newState };
}
getHistory(): GraphState[] {
return [...this.history];
}
rollback(steps: number = 1): void {
if (steps > this.history.length) {
throw new Error('无法回滚到指定步骤');
}
for (let i = 0; i < steps; i++) {
this.state = this.history.pop()!;
}
}
reset(): void {
this.state = {};
this.history = [];
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何使用 LangGraph 构建状态化 Agent:
typescript
// 节点实现示例
class InputNode extends Runnable<any, { input: string }> {
async invoke(state: any): Promise<{ input: string }> {
// 在实际应用中,这里可能从用户获取输入
return { input: state.input || '默认输入' };
}
}
class ReasoningNode extends Runnable<{ input: string }, { reasoning: string }> {
private llm: BaseLanguageModel;
constructor(llm: BaseLanguageModel) {
super();
this.llm = llm;
}
async invoke(state: { input: string }): Promise<{ reasoning: string }> {
const prompt = new PromptTemplate({
template: `分析以下输入并提供推理步骤:
输入: {input}
推理:`,
inputVariables: ["input"]
});
const result = await prompt
.pipe(this.llm)
.pipe(new StringOutputParser())
.invoke({ input: state.input });
return { reasoning: result };
}
}
class ActionNode extends Runnable<{ reasoning: string }, { action: string }> {
private llm: BaseLanguageModel;
constructor(llm: BaseLanguageModel) {
super();
this.llm = llm;
}
async invoke(state: { reasoning: string }): Promise<{ action: string }> {
const prompt = new PromptTemplate({
template: `基于以下推理,决定采取什么行动:
推理: {reasoning}
行动选项:
1. search - 搜索更多信息
2. calculate - 进行计算
3. respond - 直接回答
选择的行动:`,
inputVariables: ["reasoning"]
});
const result = await prompt
.pipe(this.llm)
.pipe(new StringOutputParser())
.invoke({ reasoning: state.reasoning });
return { action: result.trim().toLowerCase() };
}
}
class SearchNode extends Runnable<{ input: string }, { searchResult: string }> {
async invoke(state: { input: string }): Promise<{ searchResult: string }> {
// 模拟搜索
return {
searchResult: `关于 "${state.input}" 的搜索结果...`
};
}
}
class CalculateNode extends Runnable<{ input: string }, { calculation: string }> {
async invoke(state: { input: string }): Promise<{ calculation: string }> {
// 模拟计算
return {
calculation: `计算 "${state.input}" 的结果是 42`
};
}
}
class ResponseNode extends Runnable<
{ input: string; reasoning?: string; searchResult?: string; calculation?: string },
{ response: string }
> {
private llm: BaseLanguageModel;
constructor(llm: BaseLanguageModel) {
super();
this.llm = llm;
}
async invoke(state: any): Promise<{ response: string }> {
let context = `原始输入: ${state.input}\n`;
if (state.reasoning) {
context += `推理: ${state.reasoning}\n`;
}
if (state.searchResult) {
context += `搜索结果: ${state.searchResult}\n`;
}
if (state.calculation) {
context += `计算结果: ${state.calculation}\n`;
}
const prompt = new PromptTemplate({
template: `基于以下上下文提供最终回答:
{context}
最终回答:`,
inputVariables: ["context"]
});
const result = await prompt
.pipe(this.llm)
.pipe(new StringOutputParser())
.invoke({ context });
return { response: result };
}
}
// LangGraph 应用示例
class LangGraphDemo {
private graph: CompiledGraph;
constructor() {
this.graph = this.createGraph();
}
private createGraph(): CompiledGraph {
const llm = new ChatOpenAI({ modelName: 'gpt-3.5-turbo' });
const graph = new LangGraph({
messages: [],
next: 'input'
});
// 添加节点
graph
.addNode('input', new InputNode())
.addNode('reasoning', new ReasoningNode(llm))
.addNode('action', new ActionNode(llm))
.addNode('search', new SearchNode())
.addNode('calculate', new CalculateNode())
.addNode('respond', new ResponseNode(llm))
.addNode('end', new RunnableLambda((state: any) => state));
// 添加边
graph
.addEdge('input', 'reasoning')
.addEdge('reasoning', 'action')
.addConditionalEdges('action', {
'search': 'search',
'calculate': 'calculate',
'respond': 'respond'
})
.addEdge('search', 'respond')
.addEdge('calculate', 'respond')
.addEdge('respond', 'end');
// 设置入口和出口点
graph
.setEntryPoint('input')
.setFinishPoint('end');
return graph.compile();
}
async runDemo(input: string): Promise<void> {
console.log('=== LangGraph 演示 ===\n');
console.log(`输入: ${input}\n`);
try {
const result = await this.graph.invoke({ input });
console.log('最终结果:');
console.log(result);
// 显示图的可视化
console.log('\n图结构:');
console.log(this.graph.getGraphVisualization());
} catch (error) {
console.error('执行错误:', error.message);
}
}
}
// 更复杂的示例:客户服务 Agent
class CustomerServiceAgent {
private graph: CompiledGraph;
constructor() {
this.graph = this.createCustomerServiceGraph();
}
private createCustomerServiceGraph(): CompiledGraph {
const llm = new ChatOpenAI({ modelName: 'gpt-3.5-turbo' });
const graph = new LangGraph({
customerQuery: '',
customerInfo: {},
conversationHistory: [],
intent: '',
nextAction: ''
});
// 定义节点
graph
.addNode('input_handler', new RunnableLambda(async (state: any) => {
console.log('处理客户输入...');
return {
customerQuery: state.input,
conversationHistory: [...state.conversationHistory, {
role: 'customer',
content: state.input,
timestamp: Date.now()
}]
};
}))
.addNode('intent_classifier', new RunnableLambda(async (state: any) => {
console.log('分类客户意图...');
// 简化实现,实际应用中会使用 LLM 进行意图分类
const intents = ['billing', 'technical', 'general'];
const intent = intents[Math.floor(Math.random() * intents.length)];
return { intent };
}))
.addNode('billing_handler', new RunnableLambda(async (state: any) => {
console.log('处理账单问题...');
return {
response: '您的账单问题已记录,我们的财务团队将在24小时内回复您。',
nextAction: 'follow_up'
};
}))
.addNode('technical_handler', new RunnableLambda(async (state: any) => {
console.log('处理技术问题...');
return {
response: '技术团队已收到您的问题,我们将尽快为您提供解决方案。',
nextAction: 'follow_up'
};
}))
.addNode('general_handler', new RunnableLambda(async (state: any) => {
console.log('处理一般咨询...');
return {
response: '感谢您的咨询,我们已记录您的问题并将尽快回复。',
nextAction: 'follow_up'
};
}))
.addNode('follow_up', new RunnableLambda(async (state: any) => {
console.log('发送后续跟进...');
return {
followUpMessage: '请问还有其他我可以帮助您的吗?',
status: 'completed'
};
}))
.addNode('end', new RunnableLambda((state: any) => state));
// 定义边
graph
.addEdge('input_handler', 'intent_classifier')
.addConditionalEdges('intent_classifier', {
'billing': 'billing_handler',
'technical': 'technical_handler',
'general': 'general_handler'
})
.addEdge('billing_handler', 'follow_up')
.addEdge('technical_handler', 'follow_up')
.addEdge('general_handler', 'follow_up')
.addEdge('follow_up', 'end');
// 设置入口和出口
graph
.setEntryPoint('input_handler')
.setFinishPoint('end');
return graph.compile();
}
async handleCustomerQuery(query: string): Promise<any> {
console.log('=== 客户服务 Agent 演示 ===\n');
console.log(`客户查询: ${query}\n`);
try {
const result = await this.graph.invoke({ input: query });
console.log('处理结果:');
console.log(JSON.stringify(result, null, 2));
return result;
} catch (error) {
console.error('处理错误:', error.message);
return { error: error.message };
}
}
}
// 状态机 Agent
class StateMachineAgent {
private graph: CompiledGraph;
constructor() {
this.graph = this.createStateMachineGraph();
}
private createStateMachineGraph(): CompiledGraph {
const graph = new LangGraph({
state: 'idle',
task: '',
progress: 0
});
graph
.addNode('idle', new RunnableLambda(async (state: any) => {
console.log('空闲状态,等待任务...');
return { state: 'waiting_for_task' };
}))
.addNode('waiting_for_task', new RunnableLambda(async (state: any) => {
console.log('等待任务分配...');
// 模拟任务分配
return {
state: 'task_assigned',
task: '数据处理任务',
progress: 0
};
}))
.addNode('task_assigned', new RunnableLambda(async (state: any) => {
console.log(`任务已分配: ${state.task}`);
return { state: 'processing' };
}))
.addNode('processing', new RunnableLambda(async (state: any) => {
console.log('处理任务中...');
// 模拟处理进度
const progress = Math.min(100, state.progress + 25);
return {
progress,
state: progress >= 100 ? 'completed' : 'processing'
};
}))
.addNode('completed', new RunnableLambda(async (state: any) => {
console.log('任务完成!');
return { state: 'idle' };
}));
graph
.addEdge('idle', 'waiting_for_task')
.addEdge('waiting_for_task', 'task_assigned')
.addEdge('task_assigned', 'processing')
.addConditionalEdges('processing', {
'processing': 'processing',
'completed': 'completed'
})
.addEdge('completed', 'idle');
graph
.setEntryPoint('idle')
.setFinishPoint('idle'); // 循环图,没有真正的终点
return graph.compile();
}
async runStateMachine(): Promise<void> {
console.log('=== 状态机 Agent 演示 ===\n');
// 运行几步来演示状态转换
let state = {};
for (let i = 0; i < 8; i++) {
state = await this.graph.invoke(state);
console.log(`步骤 ${i + 1} 状态:`, state);
// 添加延迟以便观察
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
}
// 运行演示
async function runLangGraphDemos() {
console.log('🚀 LangGraph 演示\n');
// 基础演示
const basicDemo = new LangGraphDemo();
await basicDemo.runDemo('什么是 LangGraph?');
console.log('\n' + '='.repeat(50) + '\n');
// 客户服务演示
const customerService = new CustomerServiceAgent();
await customerService.handleCustomerQuery('我的账单有问题');
console.log('\n' + '='.repeat(50) + '\n');
// 状态机演示
const stateMachine = new StateMachineAgent();
await stateMachine.runStateMachine();
}
// 图形可视化工具
class GraphVisualizer {
static toMermaid(graph: CompiledGraph): string {
let mermaid = 'graph TD\n';
// 添加节点
// 这里需要访问 CompiledGraph 的内部结构
// 简化实现,实际应用中需要更完整的访问方法
return mermaid;
}
}
// 如果直接运行此文件,则执行演示
if (require.main === module) {
runLangGraphDemos().catch(console.error);
}总结
LangGraph 通过有向图结构为构建复杂的 AI Agent 提供了强大的支持:
- 图形化设计 - 使用节点和边直观地表示 Agent 的执行流程
- 状态管理 - 内置状态管理机制,支持复杂的状态转换
- 条件跳转 - 支持基于条件的分支和跳转
- 循环支持 - 可以构建循环执行的 Agent
- 模块化设计 - 每个节点都是独立的 组件,便于重用和测试
- 可视化 - 支持图结构的可视化表示
LangGraph 使得构建复杂的、状态化的 AI Agent 变得更加直观和可维护,为开发高级 AI 应用提供了坚实的基础。
在下一章中,我们将探讨与 DevOps 集成:CI/CD 流程中测试 Prompt 变更的影响,了解如何在软件开发生命周期中管理 AI 应用。