Skip to content

使用 LCEL 声明式构建复杂工作流

ts
const ragChain = retriever
  .pipe(formatDocuments)
  .pipe(prompt)
  .pipe(llm)
  .pipe(parser)
  .withRetry({ maxAttempts: 3 });

在现代 LLM 应用开发中,构建复杂的工作流是一个常见需求。LangChain V3 通过 LCEL(LangChain Expression Language)提供了声明式构建复杂工作流的能力。这种声明式方法不仅使代码更加清晰易读,还提供了强大的组合性和可维护性。本章将深入探讨如何使用 LCEL 构建复杂的工作流。

LCEL 声明式构建的核心概念

LCEL 的核心在于其声明式特性,允许开发者以直观的方式描述工作流,而不需要关注具体的执行细节:

typescript
// 声明式构建 vs 命令式构建
// 命令式(传统方式)
async function commandStyleRAG(query: string) {
  const documents = await retriever.invoke(query);
  const formattedDocs = formatDocuments(documents);
  const promptText = await prompt.format({ context: formattedDocs, question: query });
  const llmResponse = await llm.invoke(promptText);
  const parsedResponse = await parser.parse(llmResponse);
  return parsedResponse;
}

// 声明式(LCEL 方式)
const declarativeRAG = retriever
  .pipe(formatDocuments)
  .pipe(prompt)
  .pipe(llm)
  .pipe(parser);

基础工作流构建

让我们从基础的 RAG 工作流开始:

typescript
// 创建基础 RAG 链
class RAGWorkflowBuilder {
  static createBasicRAG(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel,
    parser: BaseOutputParser<any> = new StringOutputParser()
  ): Runnable<string, any> {
    // 创建提示模板
    const prompt = new PromptTemplate({
      template: `使用以下文档内容回答问题。如果文档中没有相关信息,请说明无法回答。

文档内容:
{context}

问题: {question}

答案:`,
      inputVariables: ["context", "question"]
    });
    
    // 构建 RAG 链
    return RunnableMap.from({
      question: (input: string) => input,
      context: retriever.pipe((docs: Document[]) => 
        docs.map(doc => doc.pageContent).join('\n\n')
      )
    })
    .pipe(prompt)
    .pipe(llm)
    .pipe(parser);
  }
  
  // 带重试机制的 RAG 链
  static createRobustRAG(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel,
    parser: BaseOutputParser<any> = new StringOutputParser()
  ): Runnable<string, any> {
    const prompt = new PromptTemplate({
      template: `使用以下文档内容回答问题。如果文档中没有相关信息,请说明无法回答。

文档内容:
{context}

问题: {question}

答案:`,
      inputVariables: ["context", "question"]
    });
    
    return RunnableMap.from({
      question: (input: string) => input,
      context: retriever.pipe((docs: Document[]) => 
        docs.map(doc => doc.pageContent).join('\n\n')
      )
    })
    .pipe(prompt)
    .pipe(llm)
    .pipe(parser)
    .withRetry({ maxAttempts: 3 });
  }
}

复杂工作流的构建

构建更复杂的工作流,包含多个分支和条件:

typescript
// 多步骤复杂工作流
class ComplexWorkflowBuilder {
  static createMultiStepWorkflow(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel
  ): Runnable<{ question: string; userId: string }, string> {
    // 步骤1: 用户身份验证
    const userAuth = new RunnableLambda(async (input: { question: string; userId: string }) => {
      // 模拟用户身份验证
      console.log(`验证用户: ${input.userId}`);
      return input;
    });
    
    // 步骤2: 查询分类
    const queryClassifier = new PromptTemplate({
      template: `将以下问题分类到最合适的类别中:
类别选项:
1. 技术问题 (technical)
2. 账单问题 (billing)
3. 一般咨询 (general)

问题: {question}

只回答类别名称:`,
      inputVariables: ["question"]
    }).pipe(llm).pipe(new StringOutputParser());
    
    // 步骤3: 基于分类的处理
    const technicalHandler = retriever
      .pipe((docs: Document[]) => docs.map(d => d.pageContent).join('\n\n'))
      .pipe(new PromptTemplate({
        template: `你是一个技术专家。基于以下技术文档回答问题:
        
文档:
{context}

问题: {question}

详细技术解答:`,
        inputVariables: ["context", "question"]
      }))
      .pipe(llm)
      .pipe(new StringOutputParser());
    
    const billingHandler = new PromptTemplate({
      template: `你是一个财务专家。回答以下账单问题:
      
问题: {question}

财务解答:`,
      inputVariables: ["question"]
    }).pipe(llm).pipe(new StringOutputParser());
    
    const generalHandler = new PromptTemplate({
      template: `你是一个客服代表。回答以下一般咨询问题:
      
问题: {question}

解答:`,
      inputVariables: ["question"]
    }).pipe(llm).pipe(new StringOutputParser());
    
    // 步骤4: 分支路由
    const branchRouter = new RunnableBranch([
      [(category: string) => category.toLowerCase().includes('technical'), technicalHandler],
      [(category: string) => category.toLowerCase().includes('billing'), billingHandler],
      [(category: string) => category.toLowerCase().includes('general'), generalHandler]
    ]);
    
    // 组合完整工作流
    return userAuth
      .pipe(RunnableMap.from({
        question: (input: { question: string; userId: string }) => input.question,
        category: (input: { question: string; userId: string }) => 
          queryClassifier.invoke({ question: input.question })
      }))
      .pipe((input: { question: string; category: string }) => ({
        question: input.question,
        category: input.category
      }))
      .pipe(branchRouter);
  }
}

并行处理工作流

利用 LCEL 构建并行处理的工作流:

typescript
// 并行处理工作流
class ParallelWorkflowBuilder {
  static createParallelAnalysis(
    llm: BaseLanguageModel
  ): Runnable<string, { 
    sentiment: string; 
    entities: string[]; 
    summary: string 
  }> {
    // 情感分析
    const sentimentAnalyzer = new PromptTemplate({
      template: `分析以下文本的情感倾向:
文本: {text}

情感:`,
      inputVariables: ["text"]
    }).pipe(llm).pipe(new StringOutputParser());
    
    // 实体提取
    const entityExtractor = new PromptTemplate({
      template: `从以下文本中提取重要实体(人名、地名、组织等):
文本: {text}

实体列表(JSON格式):`,
      inputVariables: ["text"]
    }).pipe(llm).pipe(new JsonOutputParser<string[]>());
    
    // 文本摘要
    const summarizer = new PromptTemplate({
      template: `为以下文本生成简洁摘要:
文本: {text}

摘要:`,
      inputVariables: ["text"]
    }).pipe(llm).pipe(new StringOutputParser());
    
    // 并行执行多个分析任务
    return RunnableMap.from({
      sentiment: sentimentAnalyzer,
      entities: entityExtractor,
      summary: summarizer
    });
  }
  
  // 带有依赖关系的并行处理
  static createDependentParallelWorkflow(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel
  ): Runnable<string, { 
    answer: string; 
    relatedQuestions: string[]; 
    confidence: number 
  }> {
    // 主要问答链
    const qaChain = RunnableMap.from({
      question: (input: string) => input,
      context: retriever.pipe((docs: Document[]) => 
        docs.map(d => d.pageContent).join('\n\n')
      )
    })
    .pipe(new PromptTemplate({
      template: `基于以下文档回答问题:
      
文档:
{context}

问题: {question}

答案:`,
      inputVariables: ["context", "question"]
    }))
    .pipe(llm)
    .pipe(new StringOutputParser());
    
    // 基于答案生成相关问题
    const relatedQuestionsChain = new PromptTemplate({
      template: `基于以下答案生成3个相关的后续问题:
答案: {answer}

相关问题(JSON数组格式):`,
      inputVariables: ["answer"]
    }).pipe(llm).pipe(new JsonOutputParser<string[]>());
    
    // 置信度评估
    const confidenceChain = new PromptTemplate({
      template: `评估以下答案的置信度(0-1之间的数字):
答案: {answer}

置信度:`,
      inputVariables: ["answer"]
    }).pipe(llm).pipe(new StringOutputParser())
    .pipe((confidence: string) => {
      const num = parseFloat(confidence);
      return isNaN(num) ? 0.5 : Math.max(0, Math.min(1, num));
    });
    
    // 组合并行处理链
    return qaChain.pipe(
      RunnableMap.from({
        answer: (input: string) => input,
        relatedQuestions: relatedQuestionsChain,
        confidence: confidenceChain
      })
    );
  }
}

带有条件逻辑的工作流

构建包含条件逻辑的复杂工作流:

typescript
// 条件逻辑工作流
class ConditionalWorkflowBuilder {
  static createAdaptiveWorkflow(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel
  ): Runnable<{ question: string; complexity: 'simple' | 'complex' }, string> {
    // 复杂度评估
    const complexityEvaluator = new PromptTemplate({
      template: `评估以下问题的复杂度:
问题: {question}

复杂度(简单/复杂):`,
      inputVariables: ["question"]
    }).pipe(llm).pipe(new StringOutputParser())
    .pipe((response: string) => 
      response.toLowerCase().includes('复杂') ? 'complex' : 'simple'
    );
    
    // 简单问题处理
    const simpleHandler = new PromptTemplate({
      template: `简洁回答以下问题:
问题: {question}

答案:`,
      inputVariables: ["question"]
    }).pipe(llm).pipe(new StringOutputParser());
    
    // 复杂问题处理(使用 RAG)
    const complexHandler = RunnableMap.from({
      question: (input: { question: string; complexity: string }) => input.question,
      context: retriever.pipe((docs: Document[]) => 
        docs.map(d => d.pageContent).join('\n\n')
      )
    })
    .pipe(new PromptTemplate({
      template: `详细回答以下问题,参考提供的文档:
      
文档:
{context}

问题: {question}

详细答案:`,
      inputVariables: ["context", "question"]
    }))
    .pipe(llm)
    .pipe(new StringOutputParser());
    
    // 条件路由
    const conditionalRouter = new RunnableBranch([
      [(input: { question: string; complexity: string }) => input.complexity === 'simple', 
       simpleHandler.pipe((result: string) => `简单回答: ${result}`)],
      [(input: { question: string; complexity: string }) => input.complexity === 'complex', 
       complexHandler.pipe((result: string) => `详细回答: ${result}`)]
    ]);
    
    // 完整工作流
    return RunnableMap.from({
      question: (input: { question: string; complexity: 'simple' | 'complex' }) => input.question,
      complexity: complexityEvaluator
    })
    .pipe(conditionalRouter);
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何构建企业级文档问答系统:

typescript
// 企业级文档问答系统
class EnterpriseDocumentQA {
  private workflow: Runnable<any, any>;
  
  constructor(
    private retriever: Runnable<string, Document[]>,
    private llm: BaseLanguageModel
  ) {
    this.workflow = this.buildWorkflow();
  }
  
  private buildWorkflow(): Runnable<{ 
    question: string; 
    userId: string; 
    department?: string 
  }, { 
    answer: string; 
    sources: Document[]; 
    confidence: number; 
    followUpQuestions: string[] 
  }> {
    // 用户权限检查
    const permissionCheck = new RunnableLambda(async (input: { 
      question: string; 
      userId: string; 
      department?: string 
    }) => {
      console.log(`检查用户 ${input.userId} 的权限`);
      // 模拟权限检查逻辑
      return {
        ...input,
        hasPermission: true
      };
    });
    
    // 文档检索
    const documentRetrieval = this.retriever.pipe(
      // 过滤敏感文档
      (documents: Document[]) => documents.filter(doc => 
        !doc.metadata.sensitive || doc.metadata.department === department
      )
    );
    
    // 答案生成
    const answerGeneration = RunnableMap.from({
      question: (input: any) => input.question,
      context: documentRetrieval.pipe((docs: Document[]) => 
        docs.map(d => d.pageContent).join('\n\n')
      )
    })
    .pipe(new PromptTemplate({
      template: `基于以下文档回答问题。如果文档中没有相关信息,请说明无法回答。
      
文档:
{context}

问题: {question}

答案:`,
      inputVariables: ["context", "question"]
    }))
    .pipe(this.llm)
    .pipe(new StringOutputParser());
    
    // 置信度评估
    const confidenceAssessment = new PromptTemplate({
      template: `评估以下答案的置信度(0-1之间的数字):
答案: {answer}

置信度:`,
      inputVariables: ["answer"]
    })
    .pipe(this.llm)
    .pipe(new StringOutputParser())
    .pipe((confidence: string) => {
      const num = parseFloat(confidence);
      return isNaN(num) ? 0.5 : Math.max(0, Math.min(1, num));
    });
    
    // 相关问题生成
    const followUpGeneration = new PromptTemplate({
      template: `基于以下问题和答案生成3个相关的后续问题:
问题: {question}
答案: {answer}

后续问题(JSON数组):`,
      inputVariables: ["question", "answer"]
    })
    .pipe(this.llm)
    .pipe(new JsonOutputParser<string[]>());
    
    // 源文档收集
    const sourceCollection = documentRetrieval;
    
    // 组合完整工作流
    return permissionCheck
      .pipe(
        RunnableMap.from({
          answer: answerGeneration,
          sources: sourceCollection,
          confidence: answerGeneration.pipe(confidenceAssessment),
          followUpQuestions: answerGeneration.pipe(followUpGeneration)
        })
      )
      .withRetry({ maxAttempts: 3 })
      .withConfig({
        metadata: { workflow: "enterprise-document-qa" }
      });
  }
  
  async askQuestion(input: { 
    question: string; 
    userId: string; 
    department?: string 
  }): Promise<{ 
    answer: string; 
    sources: Document[]; 
    confidence: number; 
    followUpQuestions: string[] 
  }> {
    try {
      const result = await this.workflow.invoke(input);
      return result;
    } catch (error) {
      console.error('工作流执行错误:', error);
      return {
        answer: "抱歉,处理您的问题时出现了错误。",
        sources: [],
        confidence: 0,
        followUpQuestions: []
      };
    }
  }
  
  async *streamAnswer(input: { 
    question: string; 
    userId: string; 
    department?: string 
  }): AsyncGenerator<{ 
    type: 'answer' | 'confidence' | 'followUp' | 'sources' | 'error'; 
    data: any 
  }> {
    try {
      const stream = await this.workflow.stream(input);
      
      for await (const chunk of stream) {
        yield { type: 'answer', data: chunk.answer };
        yield { type: 'confidence', data: chunk.confidence };
        yield { type: 'followUp', data: chunk.followUpQuestions };
        yield { type: 'sources', data: chunk.sources };
      }
    } catch (error) {
      yield { type: 'error', data: error.message };
    }
  }
}

// 创建和使用企业级问答系统
async function createEnterpriseQA() {
  // 创建向量存储和检索器
  const vectorStore = new MemoryVectorStore();
  
  // 添加企业文档
  const enterpriseDocuments: Document[] = [
    {
      pageContent: `公司休假政策:
员工每年享有20天带薪年假。
病假每年10天。
婚假3天,产假98天,陪产假15天。
请假需提前3天申请,紧急情况除外。`,
      metadata: { 
        department: "hr", 
        category: "policy", 
        sensitive: false,
        documentId: "policy-001"
      }
    },
    {
      pageContent: `IT系统使用规范:
员工应定期更改密码,密码长度不少于8位。
禁止在公司设备上安装未经批准的软件。
重要数据必须存储在公司服务器上,不得私自保存。
违反规定将受到纪律处分。`,
      metadata: { 
        department: "it", 
        category: "policy", 
        sensitive: false,
        documentId: "policy-002"
      }
    },
    {
      pageContent: `财务报销流程:
员工需在费用发生后一周内提交报销申请。
单笔费用超过1000元需部门经理审批。
报销需提供正规发票和费用说明。
审批流程:申请人 → 部门经理 → 财务部 → 出纳。`,
      metadata: { 
        department: "finance", 
        category: "process", 
        sensitive: false,
        documentId: "process-001"
      }
    }
  ];
  
  await vectorStore.addDocuments(enterpriseDocuments);
  
  const retriever = new VectorStoreRetriever({
    vectorStore,
    k: 3
  });
  
  const qaSystem = new EnterpriseDocumentQA(
    retriever,
    new ChatOpenAI({ modelName: "gpt-3.5-turbo" })
  );
  
  return qaSystem;
}

// 使用示例
async function demonstrateEnterpriseQA() {
  const qaSystem = await createEnterpriseQA();
  
  console.log('=== 企业级文档问答系统演示 ===\n');
  
  const result = await qaSystem.askQuestion({
    question: "年假有多少天?",
    userId: "emp001",
    department: "hr"
  });
  
  console.log('问题: 年假有多少天?');
  console.log('答案:', result.answer);
  console.log('置信度:', result.confidence.toFixed(2));
  console.log('相关问题:', result.followUpQuestions);
  console.log('使用的文档数量:', result.sources.length);
  console.log();
}

工作流监控和调试

为工作流添加监控和调试能力:

typescript
// 带监控的工作流构建器
class MonitoredWorkflowBuilder {
  static createMonitoredWorkflow(
    baseWorkflow: Runnable<any, any>,
    logger: Logger
  ): Runnable<any, any> {
    // 添加输入日志
    const loggedInput = new RunnableLambda(async (input: any) => {
      logger.info('工作流输入:', { input });
      return input;
    });
    
    // 添加输出日志
    const loggedOutput = new RunnableLambda(async (output: any) => {
      logger.info('工作流输出:', { output });
      return output;
    });
    
    // 添加性能监控
    const performanceMonitored = baseWorkflow.withListeners({
      onChainStart: (chain: Runnable, inputs: any) => {
        logger.info('链开始执行', { 
          chain: chain.getName?.() || 'unnamed',
          inputs 
        });
      },
      onChainEnd: (outputs: any) => {
        logger.info('链执行完成', { outputs });
      },
      onChainError: (error: Error) => {
        logger.error('链执行错误', { error: error.message });
      }
    });
    
    return loggedInput
      .pipe(performanceMonitored)
      .pipe(loggedOutput);
  }
}

总结

使用 LCEL 声明式构建复杂工作流为 LangChain V3 提供了强大的能力:

  1. 声明式语法 - 使用管道操作符直观地描述工作流
  2. 组合性 - 轻松组合不同的组件和链
  3. 并行处理 - 通过 RunnableMap 支持并行任务执行
  4. 条件逻辑 - 使用 RunnableBranch 实现条件路由
  5. 错误处理 - 通过 withRetry 等方法处理错误和重试
  6. 监控调试 - 内置监听器支持监控和调试

这种声明式方法使得构建和维护复杂的 LLM 应用工作流变得更加简单和直观,大大提高了开发效率和代码可维护性。

在下一章中,我们将探讨中间步骤的日志与监控:通过 RunnableConfig.callbacks 注入,了解如何在工作流中集成监控和日志功能。