Skip to content

批处理优化:.batch() 如何并行化请求?

控制 maxConcurrency 避免压垮 LLM API

在构建高性能的 LLM 应用时,批处理是一个关键的优化技术。通过 .batch() 方法,LangChain V3 能够并行化多个请求,显著提高处理效率。然而,批处理也需要谨慎控制并发数量,以避免压垮 LLM API 或超出速率限制。本章将深入探讨批处理优化的机制和最佳实践。

Batch 方法的基本概念

.batch() 方法允许同时处理多个输入,通过并行化提高整体处理效率:

typescript
class BaseRunnable<Input, Output> {
  async batch(
    inputs: Input[], 
    options?: RunnableConfig | RunnableConfig[]
  ): Promise<Output[]> {
    // 默认实现:并行执行多个 invoke 调用
    return await Promise.all(
      inputs.map((input, index) => {
        const config = Array.isArray(options) ? options[index] : options;
        return this.invoke(input, config);
      })
    );
  }
}

并发控制机制

为了避免压垮后端服务,需要实现并发控制:

typescript
class ConcurrencyControlledBatchProcessor {
  private maxConcurrency: number;
  private semaphore: Semaphore;
  
  constructor(maxConcurrency: number = 10) {
    this.maxConcurrency = maxConcurrency;
    this.semaphore = new Semaphore(maxConcurrency);
  }
  
  async batchWithConcurrencyControl<T, R>(
    inputs: T[],
    processor: (input: T) => Promise<R>
  ): Promise<R[]> {
    const results: R[] = new Array(inputs.length);
    const errors: (Error | null)[] = new Array(inputs.length).fill(null);
    
    // 创建处理任务
    const tasks = inputs.map(async (input, index) => {
      // 获取信号量
      await this.semaphore.acquire();
      
      try {
        const result = await processor(input);
        results[index] = result;
      } catch (error) {
        errors[index] = error as Error;
      } finally {
        // 释放信号量
        this.semaphore.release();
      }
    });
    
    // 等待所有任务完成
    await Promise.all(tasks);
    
    // 检查是否有错误
    const firstError = errors.find(error => error !== null);
    if (firstError) {
      throw new Error(`批处理中发生错误: ${firstError.message}`);
    }
    
    return results;
  }
}

// 信号量实现
class Semaphore {
  private capacity: number;
  private current: number;
  private queue: Array<() => void>;
  
  constructor(capacity: number) {
    this.capacity = capacity;
    this.current = 0;
    this.queue = [];
  }
  
  async acquire(): Promise<void> {
    return new Promise((resolve) => {
      if (this.current < this.capacity) {
        this.current++;
        resolve();
      } else {
        this.queue.push(resolve);
      }
    });
  }
  
  release(): void {
    this.current--;
    if (this.queue.length > 0) {
      this.current++;
      const resolve = this.queue.shift()!;
      resolve();
    }
  }
}

智能批处理优化

实现更智能的批处理优化,包括动态调整并发数:

typescript
interface BatchProcessingOptions {
  maxConcurrency?: number;
  timeout?: number;
  retryAttempts?: number;
  batchSize?: number;
}

class SmartBatchProcessor {
  private maxConcurrency: number;
  private currentConcurrency: number;
  private timeout: number;
  private retryAttempts: number;
  private batchSize: number;
  private successRate: number = 1.0;
  private avgResponseTime: number = 0;
  
  constructor(options: BatchProcessingOptions = {}) {
    this.maxConcurrency = options.maxConcurrency || 10;
    this.currentConcurrency = this.maxConcurrency;
    this.timeout = options.timeout || 30000;
    this.retryAttempts = options.retryAttempts || 3;
    this.batchSize = options.batchSize || 50;
  }
  
  async batchProcess<T, R>(
    inputs: T[],
    processor: (input: T) => Promise<R>
  ): Promise<R[]> {
    // 对于大批次,分批处理
    if (inputs.length > this.batchSize) {
      return await this.processInBatches(inputs, processor);
    }
    
    // 对于小批次,直接处理
    return await this.processBatch(inputs, processor);
  }
  
  private async processInBatches<T, R>(
    inputs: T[],
    processor: (input: T) => Promise<R>
  ): Promise<R[]> {
    const results: R[] = [];
    
    for (let i = 0; i < inputs.length; i += this.batchSize) {
      const batch = inputs.slice(i, i + this.batchSize);
      const batchResults = await this.processBatch(batch, processor);
      results.push(...batchResults);
      
      // 批次间短暂延迟,避免过于频繁的请求
      if (i + this.batchSize < inputs.length) {
        await this.delay(100);
      }
    }
    
    return results;
  }
  
  private async processBatch<T, R>(
    inputs: T[],
    processor: (input: T) => Promise<R>
  ): Promise<R[]> {
    const startTime = Date.now();
    const results: (R | null)[] = new Array(inputs.length).fill(null);
    const errors: (Error | null)[] = new Array(inputs.length).fill(null);
    
    // 并发控制处理
    const semaphore = new Semaphore(this.currentConcurrency);
    
    const tasks = inputs.map(async (input, index) => {
      await semaphore.acquire();
      
      try {
        // 添加超时控制
        const result = await this.withTimeout(
          processor(input),
          this.timeout
        );
        results[index] = result;
      } catch (error) {
        errors[index] = error as Error;
      } finally {
        semaphore.release();
      }
    });
    
    // 执行所有任务
    await Promise.all(tasks);
    
    // 统计性能指标
    const endTime = Date.now();
    const batchTime = endTime - startTime;
    this.updatePerformanceMetrics(batchTime, errors);
    
    // 处理错误和重试
    const finalResults = await this.handleErrorsAndRetry(
      inputs,
      results,
      errors,
      processor
    );
    
    // 动态调整并发数
    this.adjustConcurrency();
    
    return finalResults;
  }
  
  private async withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
    return new Promise((resolve, reject) => {
      const timeout = setTimeout(() => {
        reject(new Error(`操作超时 (${ms}ms)`));
      }, ms);
      
      promise.then(
        (result) => {
          clearTimeout(timeout);
          resolve(result);
        },
        (error) => {
          clearTimeout(timeout);
          reject(error);
        }
      );
    });
  }
  
  private async handleErrorsAndRetry<T, R>(
    inputs: T[],
    results: (R | null)[],
    errors: (Error | null)[],
    processor: (input: T) => Promise<R>
  ): Promise<R[]> {
    const finalResults: R[] = [];
    
    for (let i = 0; i < inputs.length; i++) {
      if (errors[i] && results[i] === null) {
        // 尝试重试
        let retrySuccess = false;
        let retryResult: R | null = null;
        
        for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
          try {
            retryResult = await processor(inputs[i]);
            retrySuccess = true;
            break;
          } catch (retryError) {
            if (attempt === this.retryAttempts) {
              throw new Error(`重试 ${this.retryAttempts} 次后仍然失败: ${retryError.message}`);
            }
            // 指数退避
            await this.delay(1000 * Math.pow(2, attempt));
          }
        }
        
        if (retrySuccess && retryResult !== null) {
          finalResults.push(retryResult);
        } else {
          throw errors[i]!;
        }
      } else {
        finalResults.push(results[i]!);
      }
    }
    
    return finalResults;
  }
  
  private updatePerformanceMetrics(batchTime: number, errors: (Error | null)[]): void {
    // 更新成功率
    const errorCount = errors.filter(error => error !== null).length;
    const successCount = errors.length - errorCount;
    this.successRate = successCount / errors.length;
    
    // 更新平均响应时间
    this.avgResponseTime = batchTime / errors.length;
  }
  
  private adjustConcurrency(): void {
    // 根据成功率和响应时间动态调整并发数
    if (this.successRate < 0.8) {
      // 成功率低,降低并发数
      this.currentConcurrency = Math.max(1, Math.floor(this.currentConcurrency * 0.8));
    } else if (this.successRate > 0.95 && this.avgResponseTime < 1000) {
      // 成功率高且响应快,可以适当增加并发数
      this.currentConcurrency = Math.min(this.maxConcurrency, Math.ceil(this.currentConcurrency * 1.1));
    }
  }
  
  private async delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

LLM 批处理优化

针对 LLM 的批处理优化实现:

typescript
class LLMBatchProcessor extends SmartBatchProcessor {
  private llm: BaseLanguageModel;
  private rateLimiter: RateLimiter;
  
  constructor(
    llm: BaseLanguageModel,
    options: BatchProcessingOptions = {},
    rateLimitOptions: { requestsPerMinute?: number } = {}
  ) {
    super(options);
    this.llm = llm;
    this.rateLimiter = new RateLimiter(
      rateLimitOptions.requestsPerMinute || 60
    );
  }
  
  async batchProcessPrompts(prompts: string[]): Promise<string[]> {
    return await this.batchProcess(prompts, async (prompt: string) => {
      // 检查速率限制
      await this.rateLimiter.acquire();
      
      // 处理单个提示
      return await this.llm.invoke(prompt);
    });
  }
  
  async batchProcessMessages(
    messageSets: BaseMessage[][]
  ): Promise<BaseMessage[]> {
    return await this.batchProcess(messageSets, async (messages: BaseMessage[]) => {
      // 检查速率限制
      await this.rateLimiter.acquire();
      
      // 处理消息集
      return await this.llm.invoke(messages);
    });
  }
}

// 速率限制器实现
class RateLimiter {
  private requestsPerMinute: number;
  private requestsThisMinute: number;
  private minuteStart: number;
  private queue: Array<() => void>;
  
  constructor(requestsPerMinute: number) {
    this.requestsPerMinute = requestsPerMinute;
    this.requestsThisMinute = 0;
    this.minuteStart = Date.now();
    this.queue = [];
  }
  
  async acquire(): Promise<void> {
    const now = Date.now();
    
    // 检查是否需要重置计数器
    if (now - this.minuteStart >= 60000) {
      this.requestsThisMinute = 0;
      this.minuteStart = now;
    }
    
    return new Promise((resolve) => {
      if (this.requestsThisMinute < this.requestsPerMinute) {
        this.requestsThisMinute++;
        resolve();
      } else {
        this.queue.push(resolve);
        this.scheduleNextRequest();
      }
    });
  }
  
  private scheduleNextRequest(): void {
    const timeUntilNextMinute = 60000 - (Date.now() - this.minuteStart);
    const timePerRequest = 60000 / this.requestsPerMinute;
    const delay = Math.max(timePerRequest, timeUntilNextMinute / (this.requestsPerMinute - this.requestsThisMinute));
    
    setTimeout(() => {
      this.requestsThisMinute++;
      if (this.requestsThisMinute >= this.requestsPerMinute) {
        this.requestsThisMinute = 0;
        this.minuteStart = Date.now();
      }
      
      if (this.queue.length > 0) {
        const resolve = this.queue.shift()!;
        resolve();
        if (this.queue.length > 0) {
          this.scheduleNextRequest();
        }
      }
    }, delay);
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何在文档处理系统中使用批处理优化:

typescript
// 批处理文档问答系统
class BatchDocumentQASystem {
  private retriever: Runnable<string, Document[]>;
  private llm: BaseLanguageModel;
  private batchProcessor: LLMBatchProcessor;
  
  constructor(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel
  ) {
    this.retriever = retriever;
    this.llm = llm;
    this.batchProcessor = new LLMBatchProcessor(llm, {
      maxConcurrency: 5,
      timeout: 30000,
      retryAttempts: 3,
      batchSize: 20
    }, {
      requestsPerMinute: 300
    });
  }
  
  async answerQuestions(questions: string[]): Promise<Array<{
    question: string;
    answer: string;
    processingTime: number;
  }>> {
    console.log(`开始批处理 ${questions.length} 个问题`);
    const startTime = Date.now();
    
    try {
      // 第一步:批量检索相关文档
      console.log('批量检索文档...');
      const retrievalStartTime = Date.now();
      const documentSets = await this.batchRetrieveDocuments(questions);
      const retrievalTime = Date.now() - retrievalStartTime;
      console.log(`文档检索完成,耗时 ${retrievalTime}ms`);
      
      // 第二步:构建提示
      console.log('构建提示...');
      const prompts = questions.map((question, index) => {
        const documents = documentSets[index];
        const context = documents.map(d => d.pageContent).join('\n\n');
        return this.buildPrompt(context, question);
      });
      
      // 第三步:批量生成答案
      console.log('批量生成答案...');
      const generationStartTime = Date.now();
      const answers = await this.batchProcessor.batchProcessPrompts(prompts);
      const generationTime = Date.now() - generationStartTime;
      console.log(`答案生成完成,耗时 ${generationTime}ms`);
      
      // 第四步:组装结果
      const results = questions.map((question, index) => ({
        question,
        answer: answers[index],
        processingTime: Date.now() - startTime
      }));
      
      const totalTime = Date.now() - startTime;
      console.log(`批处理完成,总耗时 ${totalTime}ms,平均每个问题 ${(totalTime / questions.length).toFixed(2)}ms`);
      
      return results;
    } catch (error) {
      console.error('批处理错误:', error);
      throw error;
    }
  }
  
  private async batchRetrieveDocuments(questions: string[]): Promise<Document[][]> {
    // 使用智能批处理器进行文档检索
    const smartProcessor = new SmartBatchProcessor({
      maxConcurrency: 10,
      batchSize: 50
    });
    
    return await smartProcessor.batchProcess(questions, async (question) => {
      return await this.retriever.invoke(question);
    });
  }
  
  private buildPrompt(context: string, question: string): string {
    return `使用以下文档内容回答问题。如果文档中没有相关信息,请说明无法回答。

文档:
${context}

问题: ${question}

答案:`;
  }
  
  // 流式批处理
  async *streamAnswers(questions: string[]): AsyncGenerator<{
    questionIndex: number;
    question: string;
    partialAnswer: string;
  }> {
    console.log(`开始流式批处理 ${questions.length} 个问题`);
    
    // 批量检索文档
    const documentSets = await this.batchRetrieveDocuments(questions);
    
    // 为每个问题创建流式处理任务
    const streams = questions.map(async (question, index) => {
      const documents = documentSets[index];
      const context = documents.map(d => d.pageContent).join('\n\n');
      const prompt = this.buildPrompt(context, question);
      
      const stream = await this.llm.stream(prompt);
      return { index, question, stream };
    });
    
    // 等待所有流准备就绪
    const streamTasks = await Promise.all(streams);
    
    // 并行处理所有流
    const activeStreams = new Map<number, AsyncIterator<string>>();
    streamTasks.forEach(({ index, question, stream }) => {
      activeStreams.set(index, stream[Symbol.asyncIterator]());
    });
    
    // 同时从所有流中读取数据
    while (activeStreams.size > 0) {
      const promises = Array.from(activeStreams.entries()).map(async ([index, iterator]) => {
        try {
          const { value, done } = await iterator.next();
          return { index, value, done };
        } catch (error) {
          return { index, error, done: true };
        }
      });
      
      try {
        const results = await Promise.all(promises);
        
        for (const result of results) {
          if (result.done) {
            activeStreams.delete(result.index);
            if (result.error) {
              yield {
                questionIndex: result.index,
                question: streamTasks[result.index].question,
                partialAnswer: `错误: ${(result.error as Error).message}`
              };
            }
          } else {
            yield {
              questionIndex: result.index,
              question: streamTasks[result.index].question,
              partialAnswer: result.value
            };
          }
        }
      } catch (error) {
        console.error('流处理错误:', error);
        break;
      }
    }
    
    console.log('所有流式处理完成');
  }
}

// 创建批处理问答系统
async function createBatchQASystem() {
  // 创建向量存储和检索器
  const vectorStore = new MemoryVectorStore();
  
  // 添加大量示例文档
  const sampleDocuments: Document[] = [];
  for (let i = 0; i < 100; i++) {
    sampleDocuments.push({
      pageContent: `这是文档 ${i} 的内容。LangChain 是一个强大的框架,用于构建由大型语言模型驱动的应用程序。文档 ${i} 包含关于人工智能、机器学习和自然语言处理的信息。`,
      metadata: { 
        source: `doc-${i}`,
        category: i % 3 === 0 ? 'ai' : i % 3 === 1 ? 'ml' : 'nlp',
        id: i
      }
    });
  }
  
  await vectorStore.addDocuments(sampleDocuments);
  
  const retriever = new VectorStoreRetriever({ 
    vectorStore, 
    k: 3,
    filter: (doc) => doc.metadata.id !== undefined
  });
  
  // 创建批处理问答系统
  const batchQASystem = new BatchDocumentQASystem(
    retriever,
    new ChatOpenAI({ 
      modelName: "gpt-3.5-turbo",
      temperature: 0.7
    })
  );
  
  return batchQASystem;
}

// 使用示例
async function demonstrateBatchProcessing() {
  const batchQASystem = await createBatchQASystem();
  
  console.log('=== 批处理问答系统演示 ===\n');
  
  // 准备批量问题
  const questions = [
    "什么是 LangChain?",
    "LangChain 有什么特性?",
    "如何使用 LCEL?",
    "VectorStoreRetriever 的作用是什么?",
    "ContextualCompressionRetriever 如何工作?",
    "什么是 LLM?",
    "如何构建 RAG 应用?",
    "LangChain 支持哪些模型?",
    "OutputParser 的作用是什么?",
    "如何优化 LLM 应用性能?"
  ];
  
  console.log(`准备处理 ${questions.length} 个问题...\n`);
  
  // 批量处理
  const startTime = Date.now();
  const results = await batchQASystem.answerQuestions(questions);
  const totalTime = Date.now() - startTime;
  
  console.log(`\n=== 批处理结果 ===`);
  console.log(`总处理时间: ${totalTime}ms`);
  console.log(`平均每个问题: ${(totalTime / questions.length).toFixed(2)}ms\n`);
  
  results.slice(0, 3).forEach((result, index) => {
    console.log(`问题 ${index + 1}: ${result.question}`);
    console.log(`答案: ${result.answer.substring(0, 100)}...`);
    console.log(`处理时间: ${result.processingTime}ms\n`);
  });
  
  // 流式批处理演示
  console.log('=== 流式批处理演示 ===');
  const streamQuestions = [
    "简述 LangChain 的核心概念",
    "LCEL 有什么优势?"
  ];
  
  console.log('开始流式处理...\n');
  for await (const { questionIndex, question, partialAnswer } of 
    batchQASystem.streamAnswers(streamQuestions)) {
    if (partialAnswer.includes('\n')) {
      console.log(`[问题 ${questionIndex + 1}] ${question}`);
      console.log(`答案: ${partialAnswer}\n`);
    } else {
      process.stdout.write(partialAnswer);
    }
  }
  console.log('\n流式处理完成\n');
}

总结

批处理优化通过 .batch() 方法和并发控制机制,显著提高了 LLM 应用的处理效率:

  1. 并行处理 - 同时处理多个请求,提高吞吐量
  2. 并发控制 - 通过信号量和速率限制避免压垮后端服务
  3. 智能优化 - 动态调整并发数和批处理大小
  4. 错误处理 - 完善的错误处理和重试机制
  5. 性能监控 - 实时监控成功率和响应时间
  6. 流式支持 - 支持流式批处理以提供实时反馈

通过这些优化技术,开发者能够构建高性能、高可靠性的 LLM 应用,满足大规模处理需求。

在下一章中,我们将探讨缓存机制:RunnableCache 如何避免重复调用,了解如何通过缓存提高系统效率。