使用 LCEL 声明式构建复杂工作流
ts
const ragChain = retriever
.pipe(formatDocuments)
.pipe(prompt)
.pipe(llm)
.pipe(parser)
.withRetry({ maxAttempts: 3 });在现代 LLM 应用开发中,构建复杂的工作流是一个常见需求。LangChain V3 通过 LCEL(LangChain Expression Language)提供了声明式构建复杂工作流的能力。这种声明式方法不仅使代码更加清晰易读,还提供了强大的组合性和可维护性。本章将深入探讨如何使用 LCEL 构建复杂的工作流。
LCEL 声明式构建的核心概念
LCEL 的核心在于其声明式特性,允许开发者以直观的方式描述工作流,而不需要关注具体的执行细节:
typescript
// 声明式构建 vs 命令式构建
// 命令式(传统方式)
async function commandStyleRAG(query: string) {
const documents = await retriever.invoke(query);
const formattedDocs = formatDocuments(documents);
const promptText = await prompt.format({ context: formattedDocs, question: query });
const llmResponse = await llm.invoke(promptText);
const parsedResponse = await parser.parse(llmResponse);
return parsedResponse;
}
// 声明式(LCEL 方式)
const declarativeRAG = retriever
.pipe(formatDocuments)
.pipe(prompt)
.pipe(llm)
.pipe(parser);基础工作流构建
让我们从基础的 RAG 工作流开始:
typescript
// 创建基础 RAG 链
class RAGWorkflowBuilder {
static createBasicRAG(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel,
parser: BaseOutputParser<any> = new StringOutputParser()
): Runnable<string, any> {
// 创建提示模板
const prompt = new PromptTemplate({
template: `使用以下文档内容回答问题。如果文档中没有相关信息,请说明无法回答。
文档内容:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
});
// 构建 RAG 链
return RunnableMap.from({
question: (input: string) => input,
context: retriever.pipe((docs: Document[]) =>
docs.map(doc => doc.pageContent).join('\n\n')
)
})
.pipe(prompt)
.pipe(llm)
.pipe(parser);
}
// 带重试机制的 RAG 链
static createRobustRAG(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel,
parser: BaseOutputParser<any> = new StringOutputParser()
): Runnable<string, any> {
const prompt = new PromptTemplate({
template: `使用以下文档内容回答问题。如果文档中没有相关信息,请说明无法回答。
文档内容:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
});
return RunnableMap.from({
question: (input: string) => input,
context: retriever.pipe((docs: Document[]) =>
docs.map(doc => doc.pageContent).join('\n\n')
)
})
.pipe(prompt)
.pipe(llm)
.pipe(parser)
.withRetry({ maxAttempts: 3 });
}
}复杂工作流的构建
构建更复杂的工作流,包含多个分支和条件:
typescript
// 多步骤复杂工作流
class ComplexWorkflowBuilder {
static createMultiStepWorkflow(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel
): Runnable<{ question: string; userId: string }, string> {
// 步骤1: 用户身份验证
const userAuth = new RunnableLambda(async (input: { question: string; userId: string }) => {
// 模拟用户身份验证
console.log(`验证用户: ${input.userId}`);
return input;
});
// 步骤2: 查询分类
const queryClassifier = new PromptTemplate({
template: `将以下问题分类到最合适的类别中:
类别选项:
1. 技术问题 (technical)
2. 账单问题 (billing)
3. 一般咨询 (general)
问题: {question}
只回答类别名称:`,
inputVariables: ["question"]
}).pipe(llm).pipe(new StringOutputParser());
// 步骤3: 基于分类的处理
const technicalHandler = retriever
.pipe((docs: Document[]) => docs.map(d => d.pageContent).join('\n\n'))
.pipe(new PromptTemplate({
template: `你是一个技术专家。基于以下技术文档回答问题:
文档:
{context}
问题: {question}
详细技术解答:`,
inputVariables: ["context", "question"]
}))
.pipe(llm)
.pipe(new StringOutputParser());
const billingHandler = new PromptTemplate({
template: `你是一个财务专家。回答以下账单问题:
问题: {question}
财务解答:`,
inputVariables: ["question"]
}).pipe(llm).pipe(new StringOutputParser());
const generalHandler = new PromptTemplate({
template: `你是一个客服代表。回答以下一般咨询问题:
问题: {question}
解答:`,
inputVariables: ["question"]
}).pipe(llm).pipe(new StringOutputParser());
// 步骤4: 分支路由
const branchRouter = new RunnableBranch([
[(category: string) => category.toLowerCase().includes('technical'), technicalHandler],
[(category: string) => category.toLowerCase().includes('billing'), billingHandler],
[(category: string) => category.toLowerCase().includes('general'), generalHandler]
]);
// 组合完整工作流
return userAuth
.pipe(RunnableMap.from({
question: (input: { question: string; userId: string }) => input.question,
category: (input: { question: string; userId: string }) =>
queryClassifier.invoke({ question: input.question })
}))
.pipe((input: { question: string; category: string }) => ({
question: input.question,
category: input.category
}))
.pipe(branchRouter);
}
}并行处理工作流
利用 LCEL 构建并行处理的工作流:
typescript
// 并行处理工作流
class ParallelWorkflowBuilder {
static createParallelAnalysis(
llm: BaseLanguageModel
): Runnable<string, {
sentiment: string;
entities: string[];
summary: string
}> {
// 情感分析
const sentimentAnalyzer = new PromptTemplate({
template: `分析以下文本的情感倾向:
文本: {text}
情感:`,
inputVariables: ["text"]
}).pipe(llm).pipe(new StringOutputParser());
// 实体提取
const entityExtractor = new PromptTemplate({
template: `从以下文本中提取重要实体(人名、地名、组织等):
文本: {text}
实体列表(JSON格式):`,
inputVariables: ["text"]
}).pipe(llm).pipe(new JsonOutputParser<string[]>());
// 文本摘要
const summarizer = new PromptTemplate({
template: `为以下文本生成简洁摘要:
文本: {text}
摘要:`,
inputVariables: ["text"]
}).pipe(llm).pipe(new StringOutputParser());
// 并行执行多个分析任务
return RunnableMap.from({
sentiment: sentimentAnalyzer,
entities: entityExtractor,
summary: summarizer
});
}
// 带有依赖关系的并行处理
static createDependentParallelWorkflow(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel
): Runnable<string, {
answer: string;
relatedQuestions: string[];
confidence: number
}> {
// 主要问答链
const qaChain = RunnableMap.from({
question: (input: string) => input,
context: retriever.pipe((docs: Document[]) =>
docs.map(d => d.pageContent).join('\n\n')
)
})
.pipe(new PromptTemplate({
template: `基于以下文档回答问题:
文档:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
}))
.pipe(llm)
.pipe(new StringOutputParser());
// 基于答案生成相关问题
const relatedQuestionsChain = new PromptTemplate({
template: `基于以下答案生成3个相关的后续问题:
答案: {answer}
相关问题(JSON数组格式):`,
inputVariables: ["answer"]
}).pipe(llm).pipe(new JsonOutputParser<string[]>());
// 置信度评估
const confidenceChain = new PromptTemplate({
template: `评估以下答案的置信度(0-1之间的数字):
答案: {answer}
置信度:`,
inputVariables: ["answer"]
}).pipe(llm).pipe(new StringOutputParser())
.pipe((confidence: string) => {
const num = parseFloat(confidence);
return isNaN(num) ? 0.5 : Math.max(0, Math.min(1, num));
});
// 组合并行处理链
return qaChain.pipe(
RunnableMap.from({
answer: (input: string) => input,
relatedQuestions: relatedQuestionsChain,
confidence: confidenceChain
})
);
}
}带有条件逻辑的工作流
构建包含条件逻辑的复杂工作流:
typescript
// 条件逻辑工作流
class ConditionalWorkflowBuilder {
static createAdaptiveWorkflow(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel
): Runnable<{ question: string; complexity: 'simple' | 'complex' }, string> {
// 复杂度评估
const complexityEvaluator = new PromptTemplate({
template: `评估以下问题的复杂度:
问题: {question}
复杂度(简单/复杂):`,
inputVariables: ["question"]
}).pipe(llm).pipe(new StringOutputParser())
.pipe((response: string) =>
response.toLowerCase().includes('复杂') ? 'complex' : 'simple'
);
// 简单问题处理
const simpleHandler = new PromptTemplate({
template: `简洁回答以下问题:
问题: {question}
答案:`,
inputVariables: ["question"]
}).pipe(llm).pipe(new StringOutputParser());
// 复杂问题处理(使用 RAG)
const complexHandler = RunnableMap.from({
question: (input: { question: string; complexity: string }) => input.question,
context: retriever.pipe((docs: Document[]) =>
docs.map(d => d.pageContent).join('\n\n')
)
})
.pipe(new PromptTemplate({
template: `详细回答以下问题,参考提供的文档:
文档:
{context}
问题: {question}
详细答案:`,
inputVariables: ["context", "question"]
}))
.pipe(llm)
.pipe(new StringOutputParser());
// 条件路由
const conditionalRouter = new RunnableBranch([
[(input: { question: string; complexity: string }) => input.complexity === 'simple',
simpleHandler.pipe((result: string) => `简单回答: ${result}`)],
[(input: { question: string; complexity: string }) => input.complexity === 'complex',
complexHandler.pipe((result: string) => `详细回答: ${result}`)]
]);
// 完整工作流
return RunnableMap.from({
question: (input: { question: string; complexity: 'simple' | 'complex' }) => input.question,
complexity: complexityEvaluator
})
.pipe(conditionalRouter);
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何构建企业级文档问答系统:
typescript
// 企业级文档问答系统
class EnterpriseDocumentQA {
private workflow: Runnable<any, any>;
constructor(
private retriever: Runnable<string, Document[]>,
private llm: BaseLanguageModel
) {
this.workflow = this.buildWorkflow();
}
private buildWorkflow(): Runnable<{
question: string;
userId: string;
department?: string
}, {
answer: string;
sources: Document[];
confidence: number;
followUpQuestions: string[]
}> {
// 用户权限检查
const permissionCheck = new RunnableLambda(async (input: {
question: string;
userId: string;
department?: string
}) => {
console.log(`检查用户 ${input.userId} 的权限`);
// 模拟权限检查逻辑
return {
...input,
hasPermission: true
};
});
// 文档检索
const documentRetrieval = this.retriever.pipe(
// 过滤敏感文档
(documents: Document[]) => documents.filter(doc =>
!doc.metadata.sensitive || doc.metadata.department === department
)
);
// 答案生成
const answerGeneration = RunnableMap.from({
question: (input: any) => input.question,
context: documentRetrieval.pipe((docs: Document[]) =>
docs.map(d => d.pageContent).join('\n\n')
)
})
.pipe(new PromptTemplate({
template: `基于以下文档回答问题。如果文档中没有相关信息,请说明无法回答。
文档:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
}))
.pipe(this.llm)
.pipe(new StringOutputParser());
// 置信度评估
const confidenceAssessment = new PromptTemplate({
template: `评估以下答案的置信度(0-1之间的数字):
答案: {answer}
置信度:`,
inputVariables: ["answer"]
})
.pipe(this.llm)
.pipe(new StringOutputParser())
.pipe((confidence: string) => {
const num = parseFloat(confidence);
return isNaN(num) ? 0.5 : Math.max(0, Math.min(1, num));
});
// 相关问题生成
const followUpGeneration = new PromptTemplate({
template: `基于以下问题和答案生成3个相关的后续问题:
问题: {question}
答案: {answer}
后续问题(JSON数组):`,
inputVariables: ["question", "answer"]
})
.pipe(this.llm)
.pipe(new JsonOutputParser<string[]>());
// 源文档收集
const sourceCollection = documentRetrieval;
// 组合完整工作流
return permissionCheck
.pipe(
RunnableMap.from({
answer: answerGeneration,
sources: sourceCollection,
confidence: answerGeneration.pipe(confidenceAssessment),
followUpQuestions: answerGeneration.pipe(followUpGeneration)
})
)
.withRetry({ maxAttempts: 3 })
.withConfig({
metadata: { workflow: "enterprise-document-qa" }
});
}
async askQuestion(input: {
question: string;
userId: string;
department?: string
}): Promise<{
answer: string;
sources: Document[];
confidence: number;
followUpQuestions: string[]
}> {
try {
const result = await this.workflow.invoke(input);
return result;
} catch (error) {
console.error('工作流执行错误:', error);
return {
answer: "抱歉,处理您的问题时出现了错误。",
sources: [],
confidence: 0,
followUpQuestions: []
};
}
}
async *streamAnswer(input: {
question: string;
userId: string;
department?: string
}): AsyncGenerator<{
type: 'answer' | 'confidence' | 'followUp' | 'sources' | 'error';
data: any
}> {
try {
const stream = await this.workflow.stream(input);
for await (const chunk of stream) {
yield { type: 'answer', data: chunk.answer };
yield { type: 'confidence', data: chunk.confidence };
yield { type: 'followUp', data: chunk.followUpQuestions };
yield { type: 'sources', data: chunk.sources };
}
} catch (error) {
yield { type: 'error', data: error.message };
}
}
}
// 创建和使用企业级问答系统
async function createEnterpriseQA() {
// 创建向量存储和检索器
const vectorStore = new MemoryVectorStore();
// 添加企业文档
const enterpriseDocuments: Document[] = [
{
pageContent: `公司休假政策:
员工每年享有20天带薪年假。
病假每年10天。
婚假3天,产假98天,陪产假15天。
请假需提前3天申请,紧急情况除外。`,
metadata: {
department: "hr",
category: "policy",
sensitive: false,
documentId: "policy-001"
}
},
{
pageContent: `IT系统使用规范:
员工应定期更改密码,密码长度不少于8位。
禁止在公司设备上安装未经批准的软件。
重要数据必须存储在公司服务器上,不得私自保存。
违反规定将受到纪律处分。`,
metadata: {
department: "it",
category: "policy",
sensitive: false,
documentId: "policy-002"
}
},
{
pageContent: `财务报销流程:
员工需在费用发生后一周内提交报销申请。
单笔费用超过1000元需部门经理审批。
报销需提供正规发票和费用说明。
审批流程:申请人 → 部门经理 → 财务部 → 出纳。`,
metadata: {
department: "finance",
category: "process",
sensitive: false,
documentId: "process-001"
}
}
];
await vectorStore.addDocuments(enterpriseDocuments);
const retriever = new VectorStoreRetriever({
vectorStore,
k: 3
});
const qaSystem = new EnterpriseDocumentQA(
retriever,
new ChatOpenAI({ modelName: "gpt-3.5-turbo" })
);
return qaSystem;
}
// 使用示例
async function demonstrateEnterpriseQA() {
const qaSystem = await createEnterpriseQA();
console.log('=== 企业级文档问答系统演示 ===\n');
const result = await qaSystem.askQuestion({
question: "年假有多少天?",
userId: "emp001",
department: "hr"
});
console.log('问题: 年假有多少天?');
console.log('答案:', result.answer);
console.log('置信度:', result.confidence.toFixed(2));
console.log('相关问题:', result.followUpQuestions);
console.log('使用的文档数量:', result.sources.length);
console.log();
}工作流监控和调试
为工作流添加监控和调试能力:
typescript
// 带监控的工作流构建器
class MonitoredWorkflowBuilder {
static createMonitoredWorkflow(
baseWorkflow: Runnable<any, any>,
logger: Logger
): Runnable<any, any> {
// 添加输入日志
const loggedInput = new RunnableLambda(async (input: any) => {
logger.info('工作流输入:', { input });
return input;
});
// 添加输出日志
const loggedOutput = new RunnableLambda(async (output: any) => {
logger.info('工作流输出:', { output });
return output;
});
// 添加性能监控
const performanceMonitored = baseWorkflow.withListeners({
onChainStart: (chain: Runnable, inputs: any) => {
logger.info('链开始执行', {
chain: chain.getName?.() || 'unnamed',
inputs
});
},
onChainEnd: (outputs: any) => {
logger.info('链执行完成', { outputs });
},
onChainError: (error: Error) => {
logger.error('链执行错误', { error: error.message });
}
});
return loggedInput
.pipe(performanceMonitored)
.pipe(loggedOutput);
}
}总结
使用 LCEL 声明式构建复杂工作流为 LangChain V3 提供了强大的能力:
- 声明式语法 - 使用管道操作符直观地描述工作流
- 组合性 - 轻松组合不同的组件和链
- 并行处理 - 通过 RunnableMap 支持并行任务执行
- 条件逻辑 - 使用 RunnableBranch 实现条件路由
- 错误处理 - 通过 withRetry 等方法处理错误和重试
- 监控调试 - 内置监听器支持监控和调试
这种声明式方法使得构建和维护复杂的 LLM 应用工作流变得更加简单和直观,大大提高了开发效率和代码可维护性。
在下一章中,我们将探讨中间步骤的日志与监控:通过 RunnableConfig.callbacks 注入,了解如何在工作流中集成监控和日志功能。