Skip to content

A/B 测试:RunnableParallel 并行执行多个版本,比较结果

在构建 LLM 应用时,A/B 测试是评估不同版本性能和效果的重要手段。LangChain V3 通过 RunnableParallel 机制支持并行执行多个版本,并比较它们的结果。这种机制使得开发者能够同时测试不同的提示、模型或其他参数,从而选择最优的配置。本章将深入探讨 A/B 测试的实现和应用。

RunnableParallel 的基本概念

RunnableParallel 允许并行执行多个 Runnable 组件,并将它们的结果组合在一起:

typescript
class RunnableParallel extends Runnable<any, Record<string, any>> {
  private steps: Record<string, Runnable>;
  
  constructor(steps: Record<string, Runnable>) {
    super();
    this.steps = steps;
  }
  
  async invoke(input: any, options?: RunnableConfig): Promise<Record<string, any>> {
    // 并行执行所有步骤
    const results = await Promise.all(
      Object.entries(this.steps).map(async ([key, runnable]) => {
        const result = await runnable.invoke(input, options);
        return [key, result];
      })
    );
    
    // 将结果组合成对象
    return Object.fromEntries(results);
  }
  
  async batch(inputs: any[], options?: RunnableConfig): Promise<Record<string, any>[]> {
    // 对每个输入并行执行所有步骤
    const resultArrays = await Promise.all(
      Object.entries(this.steps).map(async ([key, runnable]) => {
        const results = await runnable.batch(inputs, options);
        return [key, results];
      })
    );
    
    // 重新组织结果,使每个输入对应一个包含所有步骤结果的对象
    return inputs.map((_, index) => {
      const result: Record<string, any> = {};
      for (const [key, results] of resultArrays) {
        result[key] = (results as any[])[index];
      }
      return result;
    });
  }
}

A/B 测试框架实现

实现一个专门用于 A/B 测试的框架:

typescript
interface ABTestConfig {
  variants: Record<string, Runnable>;
  metrics: Array<{
    name: string;
    evaluator: (input: any, output: any) => Promise<number>;
  }>;
  selectionCriteria?: (results: Record<string, any>) => string;
}

class ABTester {
  private variants: Record<string, Runnable>;
  private metrics: Array<{
    name: string;
    evaluator: (input: any, output: any) => Promise<number>;
  }>;
  private selectionCriteria: (results: Record<string, any>) => string;
  
  constructor(config: ABTestConfig) {
    this.variants = config.variants;
    this.metrics = config.metrics;
    this.selectionCriteria = config.selectionCriteria || this.defaultSelectionCriteria;
  }
  
  async test(input: any, options?: RunnableConfig): Promise<{
    results: Record<string, any>;
    metrics: Record<string, Record<string, number>>;
    winner: string;
  }> {
    // 并行执行所有变体
    const parallelRunner = new RunnableParallel(this.variants);
    const variantResults = await parallelRunner.invoke(input, options);
    
    // 评估每个变体的指标
    const metricResults: Record<string, Record<string, number>> = {};
    
    for (const [variantName, variantOutput] of Object.entries(variantResults)) {
      metricResults[variantName] = {};
      
      for (const metric of this.metrics) {
        try {
          const score = await metric.evaluator(input, variantOutput);
          metricResults[variantName][metric.name] = score;
        } catch (error) {
          console.warn(`评估指标 ${metric.name} 失败 (${variantName}):`, error);
          metricResults[variantName][metric.name] = 0;
        }
      }
    }
    
    // 选择获胜者
    const winner = this.selectionCriteria(variantResults);
    
    return {
      results: variantResults,
      metrics: metricResults,
      winner
    };
  }
  
  async batchTest(inputs: any[], options?: RunnableConfig): Promise<Array<{
    input: any;
    results: Record<string, any>;
    metrics: Record<string, Record<string, number>>;
    winner: string;
  }>> {
    // 并行执行所有变体的批量处理
    const parallelRunner = new RunnableParallel(this.variants);
    const batchResults = await parallelRunner.batch(inputs, options);
    
    // 评估每个输入的每个变体
    const testResults = await Promise.all(
      batchResults.map(async (variantResults, index) => {
        // 评估指标
        const metricResults: Record<string, Record<string, number>> = {};
        
        for (const [variantName, variantOutput] of Object.entries(variantResults)) {
          metricResults[variantName] = {};
          
          for (const metric of this.metrics) {
            try {
              const score = await metric.evaluator(inputs[index], variantOutput);
              metricResults[variantName][metric.name] = score;
            } catch (error) {
              console.warn(`评估指标 ${metric.name} 失败 (${variantName}):`, error);
              metricResults[variantName][metric.name] = 0;
            }
          }
        }
        
        // 选择获胜者
        const winner = this.selectionCriteria(variantResults);
        
        return {
          input: inputs[index],
          results: variantResults,
          metrics: metricResults,
          winner
        };
      })
    );
    
    return testResults;
  }
  
  private defaultSelectionCriteria(results: Record<string, any>): string {
    // 默认选择第一个变体(实际应用中应根据具体指标选择)
    return Object.keys(results)[0];
  }
}

指标评估器实现

实现各种常用的指标评估器:

typescript
// 置信度评估器
class ConfidenceEvaluator {
  private llm: BaseLanguageModel;
  
  constructor(llm: BaseLanguageModel) {
    this.llm = llm;
  }
  
  async evaluate(input: any, output: any): Promise<number> {
    const prompt = new PromptTemplate({
      template: `评估以下问答对的置信度(0-1之间的数字):
问题: {question}
答案: {answer}

置信度:`,
      inputVariables: ["question", "answer"]
    });
    
    try {
      const result = await prompt
        .pipe(this.llm)
        .pipe(new StringOutputParser())
        .invoke({
          question: typeof input === 'string' ? input : JSON.stringify(input),
          answer: typeof output === 'string' ? output : JSON.stringify(output)
        });
      
      const confidence = parseFloat(result);
      return isNaN(confidence) ? 0.5 : Math.max(0, Math.min(1, confidence));
    } catch (error) {
      console.warn('置信度评估失败:', error);
      return 0.5;
    }
  }
}

// 相关性评估器
class RelevanceEvaluator {
  private llm: BaseLanguageModel;
  
  constructor(llm: BaseLanguageModel) {
    this.llm = llm;
  }
  
  async evaluate(input: any, output: any): Promise<number> {
    const prompt = new PromptTemplate({
      template: `评估答案与问题的相关性(0-1之间的数字):
问题: {question}
答案: {answer}

相关性评分:`,
      inputVariables: ["question", "answer"]
    });
    
    try {
      const result = await prompt
        .pipe(this.llm)
        .pipe(new StringOutputParser())
        .invoke({
          question: typeof input === 'string' ? input : JSON.stringify(input),
          answer: typeof output === 'string' ? output : JSON.stringify(output)
        });
      
      const relevance = parseFloat(result);
      return isNaN(relevance) ? 0.5 : Math.max(0, Math.min(1, relevance));
    } catch (error) {
      console.warn('相关性评估失败:', error);
      return 0.5;
    }
  }
}

// 长度评估器
class LengthEvaluator {
  async evaluate(input: any, output: any): Promise<number> {
    try {
      const outputString = typeof output === 'string' ? output : JSON.stringify(output);
      // 长度评分(0-1000字符为最佳)
      const lengthScore = Math.max(0, Math.min(1, outputString.length / 1000));
      return lengthScore;
    } catch (error) {
      console.warn('长度评估失败:', error);
      return 0;
    }
  }
}

// 多样性评估器
class DiversityEvaluator {
  async evaluate(input: any, output: any): Promise<number> {
    try {
      const outputString = typeof output === 'string' ? output : JSON.stringify(output);
      const words = outputString.toLowerCase().split(/\s+/);
      const uniqueWords = new Set(words);
      
      // 词汇多样性评分
      const diversityScore = uniqueWords.size / Math.max(1, words.length);
      return diversityScore;
    } catch (error) {
      console.warn('多样性评估失败:', error);
      return 0;
    }
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何在问答系统中使用 A/B 测试:

typescript
// A/B 测试问答系统
class ABTestQASystem {
  private abTester: ABTester;
  
  constructor() {
    this.abTester = this.createABTester();
  }
  
  private createABTester(): ABTester {
    // 创建不同的变体
    const variants = {
      // 变体 A: 基础提示
      'variant-a': new PromptTemplate({
        template: `回答以下问题:
问题: {question}

答案:`,
        inputVariables: ["question"]
      }).pipe(new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0.7 }))
        .pipe(new StringOutputParser()),
      
      // 变体 B: 详细提示
      'variant-b': new PromptTemplate({
        template: `你是一个专业的知识问答助手。请提供详细、准确的回答。

问题: {question}

请按照以下格式回答:
1. 直接回答问题
2. 提供相关的背景信息
3. 如果不确定,请说明

答案:`,
        inputVariables: ["question"]
      }).pipe(new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0.7 }))
        .pipe(new StringOutputParser()),
      
      // 变体 C: 简洁提示
      'variant-c': new PromptTemplate({
        template: `简洁回答以下问题:
问题: {question}

简明答案:`,
        inputVariables: ["question"]
      }).pipe(new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0.3 }))
        .pipe(new StringOutputParser())
    };
    
    // 定义评估指标
    const metrics = [
      {
        name: 'confidence',
        evaluator: new ConfidenceEvaluator(new ChatOpenAI()).evaluate.bind(new ConfidenceEvaluator(new ChatOpenAI()))
      },
      {
        name: 'relevance',
        evaluator: new RelevanceEvaluator(new ChatOpenAI()).evaluate.bind(new RelevanceEvaluator(new ChatOpenAI()))
      },
      {
        name: 'length',
        evaluator: new LengthEvaluator().evaluate.bind(new LengthEvaluator())
      },
      {
        name: 'diversity',
        evaluator: new DiversityEvaluator().evaluate.bind(new DiversityEvaluator())
      }
    ];
    
    // 定义选择标准(选择综合评分最高的变体)
    const selectionCriteria = (results: Record<string, any>) => {
      // 在实际应用中,这里应该基于评估指标选择最佳变体
      return Object.keys(results)[0]; // 简化实现
    };
    
    return new ABTester({
      variants,
      metrics,
      selectionCriteria
    });
  }
  
  async answerQuestion(question: string): Promise<{
    question: string;
    results: Record<string, any>;
    metrics: Record<string, Record<string, number>>;
    winner: string;
    bestAnswer: string;
  }> {
    console.log(`开始 A/B 测试: "${question}"`);
    
    try {
      const testResult = await this.abTester.test(question);
      
      return {
        question,
        results: testResult.results,
        metrics: testResult.metrics,
        winner: testResult.winner,
        bestAnswer: testResult.results[testResult.winner]
      };
    } catch (error) {
      console.error('A/B 测试失败:', error);
      return {
        question,
        results: {},
        metrics: {},
        winner: 'variant-a',
        bestAnswer: "抱歉,处理您的问题时出现了错误。"
      };
    }
  }
  
  async batchAnswerQuestions(questions: string[]): Promise<Array<{
    question: string;
    results: Record<string, any>;
    metrics: Record<string, Record<string, number>>;
    winner: string;
    bestAnswer: string;
  }>> {
    console.log(`开始批量 A/B 测试 (${questions.length} 个问题)`);
    
    try {
      const testResults = await this.abTester.batchTest(questions);
      
      return testResults.map(result => ({
        question: result.input,
        results: result.results,
        metrics: result.metrics,
        winner: result.winner,
        bestAnswer: result.results[result.winner]
      }));
    } catch (error) {
      console.error('批量 A/B 测试失败:', error);
      return questions.map(question => ({
        question,
        results: {},
        metrics: {},
        winner: 'variant-a',
        bestAnswer: "抱歉,处理您的问题时出现了错误。"
      }));
    }
  }
  
  // 分析测试结果
  analyzeResults(results: Array<{
    metrics: Record<string, Record<string, number>>;
    winner: string;
  }>): Record<string, any> {
    const analysis: Record<string, any> = {
      totalTests: results.length,
      winnerDistribution: {} as Record<string, number>,
      averageMetrics: {} as Record<string, number>
    };
    
    // 统计获胜者分布
    for (const result of results) {
      analysis.winnerDistribution[result.winner] = 
        (analysis.winnerDistribution[result.winner] || 0) + 1;
    }
    
    // 计算平均指标
    const metricSums: Record<string, number> = {};
    const metricCounts: Record<string, number> = {};
    
    for (const result of results) {
      for (const [variant, metrics] of Object.entries(result.metrics)) {
        for (const [metricName, value] of Object.entries(metrics)) {
          const key = `${variant}.${metricName}`;
          metricSums[key] = (metricSums[key] || 0) + value;
          metricCounts[key] = (metricCounts[key] || 0) + 1;
        }
      }
    }
    
    for (const [key, sum] of Object.entries(metricSums)) {
      analysis.averageMetrics[key] = sum / metricCounts[key];
    }
    
    return analysis;
  }
}

// 创建 A/B 测试问答系统
async function createABTestQASystem() {
  return new ABTestQASystem();
}

// 使用示例
async function demonstrateABTesting() {
  console.log('=== A/B 测试问答系统演示 ===\n');
  
  const abTestSystem = await createABTestQASystem();
  
  // 单个问题测试
  console.log('单个问题 A/B 测试:');
  const singleResult = await abTestSystem.answerQuestion("什么是人工智能?");
  
  console.log(`问题: ${singleResult.question}`);
  console.log(`获胜变体: ${singleResult.winner}`);
  console.log(`最佳答案: ${singleResult.bestAnswer.substring(0, 100)}...`);
  console.log('\n各变体结果:');
  
  for (const [variant, result] of Object.entries(singleResult.results)) {
    console.log(`  ${variant}: ${result.substring(0, 80)}...`);
  }
  
  console.log('\n各变体指标:');
  for (const [variant, metrics] of Object.entries(singleResult.metrics)) {
    console.log(`  ${variant}:`, metrics);
  }
  
  console.log('\n' + '='.repeat(50) + '\n');
  
  // 批量测试
  console.log('批量 A/B 测试:');
  const questions = [
    "机器学习和深度学习有什么区别?",
    "如何开始学习编程?",
    "什么是 LangChain?",
    "解释一下区块链技术"
  ];
  
  const batchResults = await abTestSystem.batchAnswerQuestions(questions);
  
  console.log(`测试了 ${batchResults.length} 个问题\n`);
  
  // 显示前两个结果的详细信息
  for (let i = 0; i < Math.min(2, batchResults.length); i++) {
    const result = batchResults[i];
    console.log(`问题 ${i + 1}: ${result.question}`);
    console.log(`获胜变体: ${result.winner}`);
    console.log(`最佳答案: ${result.bestAnswer.substring(0, 100)}...`);
    console.log();
  }
  
  // 分析结果
  console.log('测试结果分析:');
  const analysis = abTestSystem.analyzeResults(batchResults);
  console.log(`总测试数: ${analysis.totalTests}`);
  console.log('获胜者分布:', analysis.winnerDistribution);
  console.log('平均指标:');
  for (const [key, value] of Object.entries(analysis.averageMetrics)) {
    console.log(`  ${key}: ${value.toFixed(3)}`);
  }
}

// 高级 A/B 测试示例:提示模板优化
class PromptOptimizationTester {
  private basePrompt: string;
  private llm: BaseLanguageModel;
  
  constructor(basePrompt: string, llm: BaseLanguageModel) {
    this.basePrompt = basePrompt;
    this.llm = llm;
  }
  
  async optimizePrompt(testQuestions: string[]): Promise<{
    bestPrompt: string;
    performance: number;
    variantsTested: number;
  }> {
    // 创建提示变体
    const promptVariants = this.generatePromptVariants();
    
    // 为每个变体创建 Runnable
    const variants: Record<string, Runnable> = {};
    for (const [name, prompt] of Object.entries(promptVariants)) {
      variants[name] = new PromptTemplate({
        template: prompt,
        inputVariables: ["question"]
      }).pipe(this.llm).pipe(new StringOutputParser());
    }
    
    // 创建 A/B 测试器
    const tester = new ABTester({
      variants,
      metrics: [
        {
          name: 'relevance',
          evaluator: new RelevanceEvaluator(this.llm).evaluate.bind(new RelevanceEvaluator(this.llm))
        },
        {
          name: 'confidence',
          evaluator: new ConfidenceEvaluator(this.llm).evaluate.bind(new ConfidenceEvaluator(this.llm))
        }
      ],
      selectionCriteria: this.selectBestVariant.bind(this)
    });
    
    // 批量测试
    const testResults = await tester.batchTest(testQuestions);
    
    // 分析结果找出最佳变体
    const analysis = this.analyzeTestResults(testResults);
    const bestVariant = analysis.bestVariant;
    
    return {
      bestPrompt: promptVariants[bestVariant],
      performance: analysis.bestScore,
      variantsTested: Object.keys(promptVariants).length
    };
  }
  
  private generatePromptVariants(): Record<string, string> {
    return {
      'direct': `${this.basePrompt}
问题: {question}
答案:`,
      
      'structured': `${this.basePrompt}
问题: {question}

请按照以下结构回答:
1. 简要回答
2. 详细解释
3. 相关例子

答案:`,
      
      'concise': `${this.basePrompt}
请简洁回答以下问题:
问题: {question}
简答:`,
      
      'comprehensive': `${this.basePrompt}
请提供全面详细的回答:

问题: {question}

回答应包括:
- 核心概念
- 实际应用
- 相关技术

详细回答:`
    };
  }
  
  private selectBestVariant(results: Record<string, any>): string {
    // 简化实现,实际应用中应该基于具体指标
    return Object.keys(results)[0];
  }
  
  private analyzeTestResults(results: Array<any>): { 
    bestVariant: string; 
    bestScore: number 
  } {
    // 简化实现
    return {
      bestVariant: Object.keys(results[0]?.results || {})[0],
      bestScore: 0.8
    };
  }
}

// 使用提示优化测试器
async function demonstratePromptOptimization() {
  console.log('\n=== 提示模板优化演示 ===\n');
  
  const optimizer = new PromptOptimizationTester(
    "你是一个专业的技术顾问。",
    new ChatOpenAI({ modelName: "gpt-3.5-turbo" })
  );
  
  const testQuestions = [
    "什么是机器学习?",
    "解释神经网络的工作原理",
    "深度学习和传统机器学习有什么区别?"
  ];
  
  const optimizationResult = await optimizer.optimizePrompt(testQuestions);
  
  console.log('提示优化结果:');
  console.log(`测试变体数量: ${optimizationResult.variantsTested}`);
  console.log(`最佳性能得分: ${optimizationResult.performance.toFixed(3)}`);
  console.log('最佳提示模板:');
  console.log(optimizationResult.bestPrompt);
}

总结

通过 RunnableParallel 和 A/B 测试框架,LangChain V3 提供了强大的并行执行和效果对比能力:

  1. 并行执行 - 同时运行多个变体,提高测试效率
  2. 多维度评估 - 支持置信度、相关性、长度、多样性等多种指标
  3. 批量测试 - 支持批量处理多个输入进行测试
  4. 结果分析 - 提供详细的测试结果分析功能
  5. 灵活配置 - 可以自定义变体、指标和选择标准
  6. 实际应用 - 可用于提示优化、模型选择等场景

A/B 测试机制使得开发者能够科学地评估和优化 LLM 应用,确保选择最佳的配置和实现方案。

在下一章中,我们将探讨与前端和全栈架构的衔接:在 NestJS 中暴露 LangChain 流式 API,了解如何将 LangChain 集成到现代 Web 应用架构中。