A/B 测试:RunnableParallel 并行执行多个版本,比较结果
在构建 LLM 应用时,A/B 测试是评估不同版本性能和效果的重要手段。LangChain V3 通过 RunnableParallel 机制支持并行执行多个版本,并比较它们的结果。这种机制使得开发者能够同时测试不同的提示、模型或其他参数,从而选择最优的配置。本章将深入探讨 A/B 测试的实现和应用。
RunnableParallel 的基本概念
RunnableParallel 允许并行执行多个 Runnable 组件,并将它们的结果组合在一起:
typescript
class RunnableParallel extends Runnable<any, Record<string, any>> {
private steps: Record<string, Runnable>;
constructor(steps: Record<string, Runnable>) {
super();
this.steps = steps;
}
async invoke(input: any, options?: RunnableConfig): Promise<Record<string, any>> {
// 并行执行所有步骤
const results = await Promise.all(
Object.entries(this.steps).map(async ([key, runnable]) => {
const result = await runnable.invoke(input, options);
return [key, result];
})
);
// 将结果组合成对象
return Object.fromEntries(results);
}
async batch(inputs: any[], options?: RunnableConfig): Promise<Record<string, any>[]> {
// 对每个输入并行执行所有步骤
const resultArrays = await Promise.all(
Object.entries(this.steps).map(async ([key, runnable]) => {
const results = await runnable.batch(inputs, options);
return [key, results];
})
);
// 重新组织结果,使每个输入对应一个包含所有步骤结果的对象
return inputs.map((_, index) => {
const result: Record<string, any> = {};
for (const [key, results] of resultArrays) {
result[key] = (results as any[])[index];
}
return result;
});
}
}A/B 测试框架实现
实现一个专门用于 A/B 测试的框架:
typescript
interface ABTestConfig {
variants: Record<string, Runnable>;
metrics: Array<{
name: string;
evaluator: (input: any, output: any) => Promise<number>;
}>;
selectionCriteria?: (results: Record<string, any>) => string;
}
class ABTester {
private variants: Record<string, Runnable>;
private metrics: Array<{
name: string;
evaluator: (input: any, output: any) => Promise<number>;
}>;
private selectionCriteria: (results: Record<string, any>) => string;
constructor(config: ABTestConfig) {
this.variants = config.variants;
this.metrics = config.metrics;
this.selectionCriteria = config.selectionCriteria || this.defaultSelectionCriteria;
}
async test(input: any, options?: RunnableConfig): Promise<{
results: Record<string, any>;
metrics: Record<string, Record<string, number>>;
winner: string;
}> {
// 并行执行所有变体
const parallelRunner = new RunnableParallel(this.variants);
const variantResults = await parallelRunner.invoke(input, options);
// 评估每个变体的指标
const metricResults: Record<string, Record<string, number>> = {};
for (const [variantName, variantOutput] of Object.entries(variantResults)) {
metricResults[variantName] = {};
for (const metric of this.metrics) {
try {
const score = await metric.evaluator(input, variantOutput);
metricResults[variantName][metric.name] = score;
} catch (error) {
console.warn(`评估指标 ${metric.name} 失败 (${variantName}):`, error);
metricResults[variantName][metric.name] = 0;
}
}
}
// 选择获胜者
const winner = this.selectionCriteria(variantResults);
return {
results: variantResults,
metrics: metricResults,
winner
};
}
async batchTest(inputs: any[], options?: RunnableConfig): Promise<Array<{
input: any;
results: Record<string, any>;
metrics: Record<string, Record<string, number>>;
winner: string;
}>> {
// 并行执行所有变体的批量处理
const parallelRunner = new RunnableParallel(this.variants);
const batchResults = await parallelRunner.batch(inputs, options);
// 评估每个输入的每个变体
const testResults = await Promise.all(
batchResults.map(async (variantResults, index) => {
// 评估指标
const metricResults: Record<string, Record<string, number>> = {};
for (const [variantName, variantOutput] of Object.entries(variantResults)) {
metricResults[variantName] = {};
for (const metric of this.metrics) {
try {
const score = await metric.evaluator(inputs[index], variantOutput);
metricResults[variantName][metric.name] = score;
} catch (error) {
console.warn(`评估指标 ${metric.name} 失败 (${variantName}):`, error);
metricResults[variantName][metric.name] = 0;
}
}
}
// 选择获胜者
const winner = this.selectionCriteria(variantResults);
return {
input: inputs[index],
results: variantResults,
metrics: metricResults,
winner
};
})
);
return testResults;
}
private defaultSelectionCriteria(results: Record<string, any>): string {
// 默认选择第一个变体(实际应用中应根据具体指标选择)
return Object.keys(results)[0];
}
}指标评估器实现
实现各种常用的指标评估器:
typescript
// 置信度评估器
class ConfidenceEvaluator {
private llm: BaseLanguageModel;
constructor(llm: BaseLanguageModel) {
this.llm = llm;
}
async evaluate(input: any, output: any): Promise<number> {
const prompt = new PromptTemplate({
template: `评估以下问答对的置信度(0-1之间的数字):
问题: {question}
答案: {answer}
置信度:`,
inputVariables: ["question", "answer"]
});
try {
const result = await prompt
.pipe(this.llm)
.pipe(new StringOutputParser())
.invoke({
question: typeof input === 'string' ? input : JSON.stringify(input),
answer: typeof output === 'string' ? output : JSON.stringify(output)
});
const confidence = parseFloat(result);
return isNaN(confidence) ? 0.5 : Math.max(0, Math.min(1, confidence));
} catch (error) {
console.warn('置信度评估失败:', error);
return 0.5;
}
}
}
// 相关性评估器
class RelevanceEvaluator {
private llm: BaseLanguageModel;
constructor(llm: BaseLanguageModel) {
this.llm = llm;
}
async evaluate(input: any, output: any): Promise<number> {
const prompt = new PromptTemplate({
template: `评估答案与问题的相关性(0-1之间的数字):
问题: {question}
答案: {answer}
相关性评分:`,
inputVariables: ["question", "answer"]
});
try {
const result = await prompt
.pipe(this.llm)
.pipe(new StringOutputParser())
.invoke({
question: typeof input === 'string' ? input : JSON.stringify(input),
answer: typeof output === 'string' ? output : JSON.stringify(output)
});
const relevance = parseFloat(result);
return isNaN(relevance) ? 0.5 : Math.max(0, Math.min(1, relevance));
} catch (error) {
console.warn('相关性评估失败:', error);
return 0.5;
}
}
}
// 长度评估器
class LengthEvaluator {
async evaluate(input: any, output: any): Promise<number> {
try {
const outputString = typeof output === 'string' ? output : JSON.stringify(output);
// 长度评分(0-1000字符为最佳)
const lengthScore = Math.max(0, Math.min(1, outputString.length / 1000));
return lengthScore;
} catch (error) {
console.warn('长度评估失败:', error);
return 0;
}
}
}
// 多样性评估器
class DiversityEvaluator {
async evaluate(input: any, output: any): Promise<number> {
try {
const outputString = typeof output === 'string' ? output : JSON.stringify(output);
const words = outputString.toLowerCase().split(/\s+/);
const uniqueWords = new Set(words);
// 词汇多样性评分
const diversityScore = uniqueWords.size / Math.max(1, words.length);
return diversityScore;
} catch (error) {
console.warn('多样性评估失败:', error);
return 0;
}
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何在问答系统中使用 A/B 测试:
typescript
// A/B 测试问答系统
class ABTestQASystem {
private abTester: ABTester;
constructor() {
this.abTester = this.createABTester();
}
private createABTester(): ABTester {
// 创建不同的变体
const variants = {
// 变体 A: 基础提示
'variant-a': new PromptTemplate({
template: `回答以下问题:
问题: {question}
答案:`,
inputVariables: ["question"]
}).pipe(new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0.7 }))
.pipe(new StringOutputParser()),
// 变体 B: 详细提示
'variant-b': new PromptTemplate({
template: `你是一个专业的知识问答助手。请提供详细、准确的回答。
问题: {question}
请按照以下格式回答:
1. 直接回答问题
2. 提供相关的背景信息
3. 如果不确定,请说明
答案:`,
inputVariables: ["question"]
}).pipe(new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0.7 }))
.pipe(new StringOutputParser()),
// 变体 C: 简洁提示
'variant-c': new PromptTemplate({
template: `简洁回答以下问题:
问题: {question}
简明答案:`,
inputVariables: ["question"]
}).pipe(new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0.3 }))
.pipe(new StringOutputParser())
};
// 定义评估指标
const metrics = [
{
name: 'confidence',
evaluator: new ConfidenceEvaluator(new ChatOpenAI()).evaluate.bind(new ConfidenceEvaluator(new ChatOpenAI()))
},
{
name: 'relevance',
evaluator: new RelevanceEvaluator(new ChatOpenAI()).evaluate.bind(new RelevanceEvaluator(new ChatOpenAI()))
},
{
name: 'length',
evaluator: new LengthEvaluator().evaluate.bind(new LengthEvaluator())
},
{
name: 'diversity',
evaluator: new DiversityEvaluator().evaluate.bind(new DiversityEvaluator())
}
];
// 定义选择标准(选择综合评分最高的变体)
const selectionCriteria = (results: Record<string, any>) => {
// 在实际应用中,这里应该基于评估指标选择最佳变体
return Object.keys(results)[0]; // 简化实现
};
return new ABTester({
variants,
metrics,
selectionCriteria
});
}
async answerQuestion(question: string): Promise<{
question: string;
results: Record<string, any>;
metrics: Record<string, Record<string, number>>;
winner: string;
bestAnswer: string;
}> {
console.log(`开始 A/B 测试: "${question}"`);
try {
const testResult = await this.abTester.test(question);
return {
question,
results: testResult.results,
metrics: testResult.metrics,
winner: testResult.winner,
bestAnswer: testResult.results[testResult.winner]
};
} catch (error) {
console.error('A/B 测试失败:', error);
return {
question,
results: {},
metrics: {},
winner: 'variant-a',
bestAnswer: "抱歉,处理您的问题时出现了错误。"
};
}
}
async batchAnswerQuestions(questions: string[]): Promise<Array<{
question: string;
results: Record<string, any>;
metrics: Record<string, Record<string, number>>;
winner: string;
bestAnswer: string;
}>> {
console.log(`开始批量 A/B 测试 (${questions.length} 个问题)`);
try {
const testResults = await this.abTester.batchTest(questions);
return testResults.map(result => ({
question: result.input,
results: result.results,
metrics: result.metrics,
winner: result.winner,
bestAnswer: result.results[result.winner]
}));
} catch (error) {
console.error('批量 A/B 测试失败:', error);
return questions.map(question => ({
question,
results: {},
metrics: {},
winner: 'variant-a',
bestAnswer: "抱歉,处理您的问题时出现了错误。"
}));
}
}
// 分析测试结果
analyzeResults(results: Array<{
metrics: Record<string, Record<string, number>>;
winner: string;
}>): Record<string, any> {
const analysis: Record<string, any> = {
totalTests: results.length,
winnerDistribution: {} as Record<string, number>,
averageMetrics: {} as Record<string, number>
};
// 统计获胜者分布
for (const result of results) {
analysis.winnerDistribution[result.winner] =
(analysis.winnerDistribution[result.winner] || 0) + 1;
}
// 计算平均指标
const metricSums: Record<string, number> = {};
const metricCounts: Record<string, number> = {};
for (const result of results) {
for (const [variant, metrics] of Object.entries(result.metrics)) {
for (const [metricName, value] of Object.entries(metrics)) {
const key = `${variant}.${metricName}`;
metricSums[key] = (metricSums[key] || 0) + value;
metricCounts[key] = (metricCounts[key] || 0) + 1;
}
}
}
for (const [key, sum] of Object.entries(metricSums)) {
analysis.averageMetrics[key] = sum / metricCounts[key];
}
return analysis;
}
}
// 创建 A/B 测试问答系统
async function createABTestQASystem() {
return new ABTestQASystem();
}
// 使用示例
async function demonstrateABTesting() {
console.log('=== A/B 测试问答系统演示 ===\n');
const abTestSystem = await createABTestQASystem();
// 单个问题测试
console.log('单个问题 A/B 测试:');
const singleResult = await abTestSystem.answerQuestion("什么是人工智能?");
console.log(`问题: ${singleResult.question}`);
console.log(`获胜变体: ${singleResult.winner}`);
console.log(`最佳答案: ${singleResult.bestAnswer.substring(0, 100)}...`);
console.log('\n各变体结果:');
for (const [variant, result] of Object.entries(singleResult.results)) {
console.log(` ${variant}: ${result.substring(0, 80)}...`);
}
console.log('\n各变体指标:');
for (const [variant, metrics] of Object.entries(singleResult.metrics)) {
console.log(` ${variant}:`, metrics);
}
console.log('\n' + '='.repeat(50) + '\n');
// 批量测试
console.log('批量 A/B 测试:');
const questions = [
"机器学习和深度学习有什么区别?",
"如何开始学习编程?",
"什么是 LangChain?",
"解释一下区块链技术"
];
const batchResults = await abTestSystem.batchAnswerQuestions(questions);
console.log(`测试了 ${batchResults.length} 个问题\n`);
// 显示前两个结果的详细信息
for (let i = 0; i < Math.min(2, batchResults.length); i++) {
const result = batchResults[i];
console.log(`问题 ${i + 1}: ${result.question}`);
console.log(`获胜变体: ${result.winner}`);
console.log(`最佳答案: ${result.bestAnswer.substring(0, 100)}...`);
console.log();
}
// 分析结果
console.log('测试结果分析:');
const analysis = abTestSystem.analyzeResults(batchResults);
console.log(`总测试数: ${analysis.totalTests}`);
console.log('获胜者分布:', analysis.winnerDistribution);
console.log('平均指标:');
for (const [key, value] of Object.entries(analysis.averageMetrics)) {
console.log(` ${key}: ${value.toFixed(3)}`);
}
}
// 高级 A/B 测试示例:提示模板优化
class PromptOptimizationTester {
private basePrompt: string;
private llm: BaseLanguageModel;
constructor(basePrompt: string, llm: BaseLanguageModel) {
this.basePrompt = basePrompt;
this.llm = llm;
}
async optimizePrompt(testQuestions: string[]): Promise<{
bestPrompt: string;
performance: number;
variantsTested: number;
}> {
// 创建提示变体
const promptVariants = this.generatePromptVariants();
// 为每个变体创建 Runnable
const variants: Record<string, Runnable> = {};
for (const [name, prompt] of Object.entries(promptVariants)) {
variants[name] = new PromptTemplate({
template: prompt,
inputVariables: ["question"]
}).pipe(this.llm).pipe(new StringOutputParser());
}
// 创建 A/B 测试器
const tester = new ABTester({
variants,
metrics: [
{
name: 'relevance',
evaluator: new RelevanceEvaluator(this.llm).evaluate.bind(new RelevanceEvaluator(this.llm))
},
{
name: 'confidence',
evaluator: new ConfidenceEvaluator(this.llm).evaluate.bind(new ConfidenceEvaluator(this.llm))
}
],
selectionCriteria: this.selectBestVariant.bind(this)
});
// 批量测试
const testResults = await tester.batchTest(testQuestions);
// 分析结果找出最佳变体
const analysis = this.analyzeTestResults(testResults);
const bestVariant = analysis.bestVariant;
return {
bestPrompt: promptVariants[bestVariant],
performance: analysis.bestScore,
variantsTested: Object.keys(promptVariants).length
};
}
private generatePromptVariants(): Record<string, string> {
return {
'direct': `${this.basePrompt}
问题: {question}
答案:`,
'structured': `${this.basePrompt}
问题: {question}
请按照以下结构回答:
1. 简要回答
2. 详细解释
3. 相关例子
答案:`,
'concise': `${this.basePrompt}
请简洁回答以下问题:
问题: {question}
简答:`,
'comprehensive': `${this.basePrompt}
请提供全面详细的回答:
问题: {question}
回答应包括:
- 核心概念
- 实际应用
- 相关技术
详细回答:`
};
}
private selectBestVariant(results: Record<string, any>): string {
// 简化实现,实际应用中应该基于具体指标
return Object.keys(results)[0];
}
private analyzeTestResults(results: Array<any>): {
bestVariant: string;
bestScore: number
} {
// 简化实现
return {
bestVariant: Object.keys(results[0]?.results || {})[0],
bestScore: 0.8
};
}
}
// 使用提示优化测试器
async function demonstratePromptOptimization() {
console.log('\n=== 提示模板优化演示 ===\n');
const optimizer = new PromptOptimizationTester(
"你是一个专业的技术顾问。",
new ChatOpenAI({ modelName: "gpt-3.5-turbo" })
);
const testQuestions = [
"什么是机器学习?",
"解释神经网络的工作原理",
"深度学习和传统机器学习有什么区别?"
];
const optimizationResult = await optimizer.optimizePrompt(testQuestions);
console.log('提示优化结果:');
console.log(`测试变体数量: ${optimizationResult.variantsTested}`);
console.log(`最佳性能得分: ${optimizationResult.performance.toFixed(3)}`);
console.log('最佳提示模板:');
console.log(optimizationResult.bestPrompt);
}总结
通过 RunnableParallel 和 A/B 测试框架,LangChain V3 提供了强大的并行执行和效果对比能力:
- 并行执行 - 同时运行多个变体,提高测试效率
- 多维度评估 - 支持置信度、相关性、长度、多样性等多种指标
- 批量测试 - 支持批量处理多个输入进行测试
- 结果分析 - 提供详细的测试结果分析功能
- 灵活配置 - 可以自定义变体、指标和选择标准
- 实际应用 - 可用于提示优化、模型选择等场景
A/B 测试机制使得开发者能够科学地评估和优化 LLM 应用,确保选择最佳的配置和实现方案。
在下一章中,我们将探讨与前端和全栈架构的衔接:在 NestJS 中暴露 LangChain 流式 API,了解如何将 LangChain 集成到现代 Web 应用架构中。