中间步骤的日志与监控:通过 RunnableConfig.callbacks 注入
集成 LangSmith、自定义 Logger
在构建生产级 LLM 应用时,监控和日志记录是确保系统稳定性和可观察性的关键要素。LangChain V3 通过 RunnableConfig.callbacks 提供了强大的监控和日志注入机制,使得开发者能够在工作流的每个步骤中收集详细的执行信息。本章将深入探讨如何通过回调机制实现全面的日志和监控。
Callbacks 机制的基本概念
RunnableConfig.callbacks 是 LangChain V3 中用于监控和日志记录的核心机制:
typescript
interface RunnableConfig {
callbacks?: Callbacks;
metadata?: Record<string, any>;
tags?: string[];
signal?: AbortSignal;
timeout?: number;
}
interface Callbacks {
onChainStart?: (chain: Runnable, inputs: any, runId: string) => Promise<void> | void;
onChainEnd?: (outputs: any, runId: string) => Promise<void> | void;
onChainError?: (error: Error, runId: string) => Promise<void> | void;
onLLMStart?: (llm: BaseLanguageModel, prompts: string[], runId: string) => Promise<void> | void;
onLLMEnd?: (output: LLMResult, runId: string) => Promise<void> | void;
onLLMError?: (error: Error, runId: string) => Promise<void> | void;
onToolStart?: (tool: Tool, input: string, runId: string) => Promise<void> | void;
onToolEnd?: (output: string, runId: string) => Promise<void> | void;
onToolError?: (error: Error, runId: string) => Promise<void> | void;
}基础日志回调实现
让我们实现一个基础的日志回调系统:
typescript
class LoggingCallbackHandler {
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger;
}
async onChainStart(chain: Runnable, inputs: any, runId: string): Promise<void> {
this.logger.info('链开始执行', {
chainName: chain.constructor.name,
inputs,
runId
});
}
async onChainEnd(outputs: any, runId: string): Promise<void> {
this.logger.info('链执行完成', {
outputs,
runId
});
}
async onChainError(error: Error, runId: string): Promise<void> {
this.logger.error('链执行错误', {
error: error.message,
stack: error.stack,
runId
});
}
async onLLMStart(llm: BaseLanguageModel, prompts: string[], runId: string): Promise<void> {
this.logger.info('LLM 开始调用', {
modelName: llm.constructor.name,
promptCount: prompts.length,
runId
});
}
async onLLMEnd(output: LLMResult, runId: string): Promise<void> {
this.logger.info('LLM 调用完成', {
tokenUsage: output.llmOutput?.tokenUsage,
generationCount: output.generations.length,
runId
});
}
async onLLMError(error: Error, runId: string): Promise<void> {
this.logger.error('LLM 调用错误', {
error: error.message,
runId
});
}
async onToolStart(tool: Tool, input: string, runId: string): Promise<void> {
this.logger.info('工具开始执行', {
toolName: tool.name,
input,
runId
});
}
async onToolEnd(output: string, runId: string): Promise<void> {
this.logger.info('工具执行完成', {
output,
runId
});
}
async onToolError(error: Error, runId: string): Promise<void> {
this.logger.error('工具执行错误', {
error: error.message,
toolName: (error as any).toolName,
runId
});
}
}高级监控回调实现
实现更高级的监控回调,包括性能指标和分布式追踪:
typescript
interface PerformanceMetrics {
executionTime: number;
tokenCount?: number;
retryCount?: number;
errorCount?: number;
}
class MonitoringCallbackHandler {
private metricsCollector: MetricsCollector;
private tracer: Tracer;
private activeRuns: Map<string, { startTime: number; type: string }>;
constructor(metricsCollector: MetricsCollector, tracer: Tracer) {
this.metricsCollector = metricsCollector;
this.tracer = tracer;
this.activeRuns = new Map();
}
async onChainStart(chain: Runnable, inputs: any, runId: string): Promise<void> {
const startTime = Date.now();
this.activeRuns.set(runId, { startTime, type: 'chain' });
// 开始追踪
this.tracer.startSpan(`chain.${chain.constructor.name}`, {
runId,
startTime
});
// 记录指标
this.metricsCollector.increment('chain.started', {
chainType: chain.constructor.name
});
}
async onChainEnd(outputs: any, runId: string): Promise<void> {
const runInfo = this.activeRuns.get(runId);
if (runInfo) {
const executionTime = Date.now() - runInfo.startTime;
// 结束追踪
this.tracer.endSpan(runId, { executionTime });
// 记录性能指标
this.metricsCollector.timing('chain.execution_time', executionTime, {
chainType: runInfo.type
});
this.metricsCollector.increment('chain.completed', {
chainType: runInfo.type
});
this.activeRuns.delete(runId);
}
}
async onChainError(error: Error, runId: string): Promise<void> {
const runInfo = this.activeRuns.get(runId);
if (runInfo) {
const executionTime = Date.now() - runInfo.startTime;
// 记录错误追踪
this.tracer.errorSpan(runId, error, { executionTime });
// 记录错误指标
this.metricsCollector.increment('chain.failed', {
chainType: runInfo.type,
errorType: error.constructor.name
});
this.metricsCollector.increment('chain.error_count');
this.activeRuns.delete(runId);
}
}
async onLLMStart(llm: BaseLanguageModel, prompts: string[], runId: string): Promise<void> {
const startTime = Date.now();
this.activeRuns.set(runId, { startTime, type: 'llm' });
// 开始追踪
this.tracer.startSpan(`llm.${llm.constructor.name}`, {
runId,
startTime,
promptCount: prompts.length
});
// 记录指标
this.metricsCollector.increment('llm.started', {
modelType: llm.constructor.name
});
}
async onLLMEnd(output: LLMResult, runId: string): Promise<void> {
const runInfo = this.activeRuns.get(runId);
if (runInfo) {
const executionTime = Date.now() - runInfo.startTime;
const tokenUsage = output.llmOutput?.tokenUsage;
// 结束追踪
this.tracer.endSpan(runId, {
executionTime,
tokenUsage
});
// 记录性能指标
this.metricsCollector.timing('llm.execution_time', executionTime, {
modelType: runInfo.type
});
if (tokenUsage) {
this.metricsCollector.increment('llm.total_tokens', tokenUsage.totalTokens || 0);
this.metricsCollector.increment('llm.prompt_tokens', tokenUsage.promptTokens || 0);
this.metricsCollector.increment('llm.completion_tokens', tokenUsage.completionTokens || 0);
}
this.metricsCollector.increment('llm.completed', {
modelType: runInfo.type
});
this.activeRuns.delete(runId);
}
}
async onLLMError(error: Error, runId: string): Promise<void> {
const runInfo = this.activeRuns.get(runId);
if (runInfo) {
const executionTime = Date.now() - runInfo.startTime;
// 记录错误追踪
this.tracer.errorSpan(runId, error, { executionTime });
// 记录错误指标
this.metricsCollector.increment('llm.failed', {
modelType: runInfo.type,
errorType: error.constructor.name
});
this.metricsCollector.increment('llm.error_count');
this.activeRuns.delete(runId);
}
}
}LangSmith 集成
LangSmith 是 LangChain 官方提供的监控和评估平台,可以与回调机制无缝集成:
typescript
class LangSmithCallbackHandler {
private client: LangSmithClient;
private projectId: string;
constructor(client: LangSmithClient, projectId: string) {
this.client = client;
this.projectId = projectId;
}
async onChainStart(chain: Runnable, inputs: any, runId: string): Promise<void> {
try {
await this.client.createRun({
id: runId,
name: chain.constructor.name,
start_time: new Date().toISOString(),
run_type: "chain",
inputs,
project_id: this.projectId
});
} catch (error) {
console.warn('LangSmith 记录失败:', error);
}
}
async onChainEnd(outputs: any, runId: string): Promise<void> {
try {
await this.client.updateRun(runId, {
end_time: new Date().toISOString(),
outputs,
status: "success"
});
} catch (error) {
console.warn('LangSmith 更新失败:', error);
}
}
async onChainError(error: Error, runId: string): Promise<void> {
try {
await this.client.updateRun(runId, {
end_time: new Date().toISOString(),
error: {
message: error.message,
stack: error.stack
},
status: "error"
});
} catch (error) {
console.warn('LangSmith 错误更新失败:', error);
}
}
async onLLMStart(llm: BaseLanguageModel, prompts: string[], runId: string): Promise<void> {
try {
await this.client.createRun({
id: runId,
name: llm.constructor.name,
start_time: new Date().toISOString(),
run_type: "llm",
inputs: { prompts },
project_id: this.projectId
});
} catch (error) {
console.warn('LangSmith LLM 记录失败:', error);
}
}
async onLLMEnd(output: LLMResult, runId: string): Promise<void> {
try {
await this.client.updateRun(runId, {
end_time: new Date().toISOString(),
outputs: {
generations: output.generations,
llm_output: output.llmOutput
},
status: "success"
});
} catch (error) {
console.warn('LangSmith LLM 更新失败:', error);
}
}
async onLLMError(error: Error, runId: string): Promise<void> {
try {
await this.client.updateRun(runId, {
end_time: new Date().toISOString(),
error: {
message: error.message,
stack: error.stack
},
status: "error"
});
} catch (error) {
console.warn('LangSmith LLM 错误更新失败:', error);
}
}
}自定义业务监控
实现针对特定业务需求的监控回调:
typescript
interface BusinessMetrics {
userSatisfaction?: number;
cost?: number;
accuracy?: number;
}
class BusinessMonitoringCallbackHandler {
private businessMetricsCollector: BusinessMetricsCollector;
private costCalculator: CostCalculator;
constructor(
businessMetricsCollector: BusinessMetricsCollector,
costCalculator: CostCalculator
) {
this.businessMetricsCollector = businessMetricsCollector;
this.costCalculator = costCalculator;
}
async onChainEnd(outputs: any, runId: string): Promise<void> {
// 计算业务指标
const metrics = this.calculateBusinessMetrics(outputs);
// 记录业务指标
if (metrics.userSatisfaction !== undefined) {
this.businessMetricsCollector.record('user_satisfaction', metrics.userSatisfaction);
}
if (metrics.cost !== undefined) {
this.businessMetricsCollector.record('operation_cost', metrics.cost);
}
if (metrics.accuracy !== undefined) {
this.businessMetricsCollector.record('answer_accuracy', metrics.accuracy);
}
}
private calculateBusinessMetrics(outputs: any): BusinessMetrics {
const metrics: BusinessMetrics = {};
// 计算用户满意度(基于置信度)
if (outputs.confidence !== undefined) {
metrics.userSatisfaction = outputs.confidence;
}
// 计算成本(基于 token 使用)
if (outputs.llmOutput?.tokenUsage) {
metrics.cost = this.costCalculator.calculateCost(outputs.llmOutput.tokenUsage);
}
// 计算准确性(如果有标注数据)
if (outputs.expectedAnswer && outputs.generatedAnswer) {
metrics.accuracy = this.calculateAccuracy(
outputs.expectedAnswer,
outputs.generatedAnswer
);
}
return metrics;
}
private calculateAccuracy(expected: string, generated: string): number {
// 简化的准确性计算
// 实际应用中可以使用更复杂的相似度算法
const expectedWords = expected.toLowerCase().split(/\s+/);
const generatedWords = generated.toLowerCase().split(/\s+/);
const commonWords = expectedWords.filter(word => generatedWords.includes(word));
return commonWords.length / Math.max(expectedWords.length, generatedWords.length);
}
}多回调管理器
实现一个回调管理器来协调多个回调处理器:
typescript
class CallbackManager {
private handlers: Callbacks[];
constructor(handlers: Callbacks[]) {
this.handlers = handlers;
}
async onChainStart(chain: Runnable, inputs: any, runId: string): Promise<void> {
await Promise.all(
this.handlers.map(handler =>
handler.onChainStart?.(chain, inputs, runId)
)
);
}
async onChainEnd(outputs: any, runId: string): Promise<void> {
await Promise.all(
this.handlers.map(handler =>
handler.onChainEnd?.(outputs, runId)
)
);
}
async onChainError(error: Error, runId: string): Promise<void> {
await Promise.all(
this.handlers.map(handler =>
handler.onChainError?.(error, runId)
)
);
}
// 其他回调方法的实现...
static fromHandlers(...handlers: Callbacks[]): CallbackManager {
return new CallbackManager(handlers);
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何在问答系统中集成多种监控和日志功能:
typescript
// 完整的监控问答系统
class MonitoredQASystem {
private workflow: Runnable<string, string>;
private callbackManager: CallbackManager;
constructor(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel
) {
this.workflow = this.buildWorkflow(retriever, llm);
this.callbackManager = this.buildCallbackManager();
}
private buildWorkflow(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel
): Runnable<string, string> {
const prompt = new PromptTemplate({
template: `使用以下文档回答问题。如果文档中没有相关信息,请说明无法回答。
文档:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
});
return RunnableMap.from({
question: (input: string) => input,
context: retriever.pipe((docs: Document[]) =>
docs.map(d => d.pageContent).join('\n\n')
)
})
.pipe(prompt)
.pipe(llm)
.pipe(new StringOutputParser())
.withRetry({ maxAttempts: 3 });
}
private buildCallbackManager(): CallbackManager {
// 创建不同的回调处理器
const loggingHandler = new LoggingCallbackHandler(console);
const monitoringHandler = new MonitoringCallbackHandler(
new MetricsCollector(),
new Tracer()
);
// 如果配置了 LangSmith,则添加 LangSmith 处理器
let langSmithHandler: LangSmithCallbackHandler | null = null;
if (process.env.LANGSMITH_API_KEY) {
const langSmithClient = new LangSmithClient({
apiKey: process.env.LANGSMITH_API_KEY
});
langSmithHandler = new LangSmithCallbackHandler(
langSmithClient,
process.env.LANGSMITH_PROJECT_ID || 'default'
);
}
// 创建业务监控处理器
const businessHandler = new BusinessMonitoringCallbackHandler(
new BusinessMetricsCollector(),
new CostCalculator()
);
// 组合所有处理器
const handlers = [loggingHandler, monitoringHandler, businessHandler];
if (langSmithHandler) {
handlers.push(langSmithHandler);
}
return CallbackManager.fromHandlers(...handlers);
}
async answerQuestion(question: string): Promise<string> {
try {
// 使用回调配置执行工作流
const config: RunnableConfig = {
callbacks: this.callbackManager,
metadata: {
userId: 'user123',
sessionId: this.generateSessionId(),
questionType: this.classifyQuestion(question)
},
tags: ['qa-system', 'production']
};
const answer = await this.workflow.invoke(question, config);
return answer;
} catch (error) {
console.error('问答系统错误:', error);
return "抱歉,处理您的问题时出现了错误。";
}
}
private generateSessionId(): string {
return Math.random().toString(36).substring(2, 15) +
Math.random().toString(36).substring(2, 15);
}
private classifyQuestion(question: string): string {
const lowerQuestion = question.toLowerCase();
if (lowerQuestion.includes('如何') || lowerQuestion.includes('怎么')) {
return 'how-to';
} else if (lowerQuestion.includes('什么') || lowerQuestion.includes('谁')) {
return 'what';
} else if (lowerQuestion.includes('为什么') || lowerQuestion.includes('原因')) {
return 'why';
} else {
return 'other';
}
}
}
// 模拟的监控组件
class MetricsCollector {
increment(metric: string, value: number = 1, tags: Record<string, string> = {}): void {
console.log(`指标增加: ${metric} += ${value}`, tags);
}
timing(metric: string, value: number, tags: Record<string, string> = {}): void {
console.log(`时间指标: ${metric} = ${value}ms`, tags);
}
record(metric: string, value: number, tags: Record<string, string> = {}): void {
console.log(`记录指标: ${metric} = ${value}`, tags);
}
}
class Tracer {
startSpan(name: string, attributes: any = {}): void {
console.log(`开始追踪: ${name}`, attributes);
}
endSpan(spanId: string, attributes: any = {}): void {
console.log(`结束追踪: ${spanId}`, attributes);
}
errorSpan(spanId: string, error: Error, attributes: any = {}): void {
console.log(`错误追踪: ${spanId}`, { ...attributes, error: error.message });
}
}
class BusinessMetricsCollector {
record(metric: string, value: number): void {
console.log(`业务指标: ${metric} = ${value}`);
}
}
class CostCalculator {
calculateCost(tokenUsage: any): number {
const promptTokens = tokenUsage.promptTokens || 0;
const completionTokens = tokenUsage.completionTokens || 0;
// 简化的成本计算(基于 GPT-3.5-turbo 的定价)
const promptCost = (promptTokens / 1000) * 0.0015;
const completionCost = (completionTokens / 1000) * 0.002;
return promptCost + completionCost;
}
}
class LangSmithClient {
private apiKey: string;
constructor(config: { apiKey: string }) {
this.apiKey = config.apiKey;
}
async createRun(runData: any): Promise<void> {
console.log('LangSmith 创建运行:', runData);
}
async updateRun(runId: string, updateData: any): Promise<void> {
console.log('LangSmith 更新运行:', runId, updateData);
}
}
// 使用示例
async function demonstrateMonitoredQA() {
// 创建向量存储和检索器
const vectorStore = new MemoryVectorStore();
const sampleDocuments: Document[] = [
{
pageContent: "LangChain 是一个用于开发由语言模型驱动的应用程序的框架。它可以帮助开发者将语言模型与其他数据源和计算逻辑结合起来。",
metadata: { source: "langchain-docs" }
},
{
pageContent: "LCEL (LangChain Expression Language) 是 LangChain V3 中引入的表达式语言。它允许开发者使用管道操作符 (|) 来组合不同的组件。",
metadata: { source: "lcel-docs" }
}
];
await vectorStore.addDocuments(sampleDocuments);
const retriever = new VectorStoreRetriever({ vectorStore, k: 2 });
// 创建监控问答系统
const qaSystem = new MonitoredQASystem(
retriever,
new ChatOpenAI({ modelName: "gpt-3.5-turbo" })
);
console.log('=== 监控问答系统演示 ===\n');
const answer = await qaSystem.answerQuestion("什么是 LangChain?");
console.log('问题: 什么是 LangChain?');
console.log('答案:', answer);
console.log();
}总结
通过 RunnableConfig.callbacks 注入机制,LangChain V3 提供了强大的日志和监控能力:
- 多层监控 - 支持链、LLM、工具等不同层级的监控
- 性能指标 - 收集执行时间、token 使用量等性能数据
- 错误追踪 - 完整的错误信息记录和追踪
- 业务指标 - 可以集成业务相关的监控指标
- 平台集成 - 支持 LangSmith 等监控平台集成
- 灵活配置 - 可以根据需要启用不同的监控组件
这种回调机制使得开发者能够全面了解应用的运行状态,及时发现和解决问题,为构建生产级 LLM 应用提供了坚实的基础。
在下一章中,我们将探讨批处理优化:.batch() 如何并行化请求,了解如何通过批量处理提高系统性能。