批处理优化:.batch() 如何并行化请求?
控制 maxConcurrency 避免压垮 LLM API
在构建高性能的 LLM 应用时,批处理是一个关键的优化技术。通过 .batch() 方法,LangChain V3 能够并行化多个请求,显著提高处理效率。然而,批处理也需要谨慎控制并发数量,以避免压垮 LLM API 或超出速率限制。本章将深入探讨批处理优化的机制和最佳实践。
Batch 方法的基本概念
.batch() 方法允许同时处理多个输入,通过并行化提高整体处理效率:
typescript
class BaseRunnable<Input, Output> {
async batch(
inputs: Input[],
options?: RunnableConfig | RunnableConfig[]
): Promise<Output[]> {
// 默认实现:并行执行多个 invoke 调用
return await Promise.all(
inputs.map((input, index) => {
const config = Array.isArray(options) ? options[index] : options;
return this.invoke(input, config);
})
);
}
}并发控制机制
为了避免压垮后端服务,需要实现并发控制:
typescript
class ConcurrencyControlledBatchProcessor {
private maxConcurrency: number;
private semaphore: Semaphore;
constructor(maxConcurrency: number = 10) {
this.maxConcurrency = maxConcurrency;
this.semaphore = new Semaphore(maxConcurrency);
}
async batchWithConcurrencyControl<T, R>(
inputs: T[],
processor: (input: T) => Promise<R>
): Promise<R[]> {
const results: R[] = new Array(inputs.length);
const errors: (Error | null)[] = new Array(inputs.length).fill(null);
// 创建处理任务
const tasks = inputs.map(async (input, index) => {
// 获取信号量
await this.semaphore.acquire();
try {
const result = await processor(input);
results[index] = result;
} catch (error) {
errors[index] = error as Error;
} finally {
// 释放信号量
this.semaphore.release();
}
});
// 等待所有任务完成
await Promise.all(tasks);
// 检查是否有错误
const firstError = errors.find(error => error !== null);
if (firstError) {
throw new Error(`批处理中发生错误: ${firstError.message}`);
}
return results;
}
}
// 信号量实现
class Semaphore {
private capacity: number;
private current: number;
private queue: Array<() => void>;
constructor(capacity: number) {
this.capacity = capacity;
this.current = 0;
this.queue = [];
}
async acquire(): Promise<void> {
return new Promise((resolve) => {
if (this.current < this.capacity) {
this.current++;
resolve();
} else {
this.queue.push(resolve);
}
});
}
release(): void {
this.current--;
if (this.queue.length > 0) {
this.current++;
const resolve = this.queue.shift()!;
resolve();
}
}
}智能批处理优化
实现更智能的批处理优化,包括动态调整并发数:
typescript
interface BatchProcessingOptions {
maxConcurrency?: number;
timeout?: number;
retryAttempts?: number;
batchSize?: number;
}
class SmartBatchProcessor {
private maxConcurrency: number;
private currentConcurrency: number;
private timeout: number;
private retryAttempts: number;
private batchSize: number;
private successRate: number = 1.0;
private avgResponseTime: number = 0;
constructor(options: BatchProcessingOptions = {}) {
this.maxConcurrency = options.maxConcurrency || 10;
this.currentConcurrency = this.maxConcurrency;
this.timeout = options.timeout || 30000;
this.retryAttempts = options.retryAttempts || 3;
this.batchSize = options.batchSize || 50;
}
async batchProcess<T, R>(
inputs: T[],
processor: (input: T) => Promise<R>
): Promise<R[]> {
// 对于大批次,分批处理
if (inputs.length > this.batchSize) {
return await this.processInBatches(inputs, processor);
}
// 对于小批次,直接处理
return await this.processBatch(inputs, processor);
}
private async processInBatches<T, R>(
inputs: T[],
processor: (input: T) => Promise<R>
): Promise<R[]> {
const results: R[] = [];
for (let i = 0; i < inputs.length; i += this.batchSize) {
const batch = inputs.slice(i, i + this.batchSize);
const batchResults = await this.processBatch(batch, processor);
results.push(...batchResults);
// 批次间短暂延迟,避免过于频繁的请求
if (i + this.batchSize < inputs.length) {
await this.delay(100);
}
}
return results;
}
private async processBatch<T, R>(
inputs: T[],
processor: (input: T) => Promise<R>
): Promise<R[]> {
const startTime = Date.now();
const results: (R | null)[] = new Array(inputs.length).fill(null);
const errors: (Error | null)[] = new Array(inputs.length).fill(null);
// 并发控制处理
const semaphore = new Semaphore(this.currentConcurrency);
const tasks = inputs.map(async (input, index) => {
await semaphore.acquire();
try {
// 添加超时控制
const result = await this.withTimeout(
processor(input),
this.timeout
);
results[index] = result;
} catch (error) {
errors[index] = error as Error;
} finally {
semaphore.release();
}
});
// 执行所有任务
await Promise.all(tasks);
// 统计性能指标
const endTime = Date.now();
const batchTime = endTime - startTime;
this.updatePerformanceMetrics(batchTime, errors);
// 处理错误和重试
const finalResults = await this.handleErrorsAndRetry(
inputs,
results,
errors,
processor
);
// 动态调整并发数
this.adjustConcurrency();
return finalResults;
}
private async withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`操作超时 (${ms}ms)`));
}, ms);
promise.then(
(result) => {
clearTimeout(timeout);
resolve(result);
},
(error) => {
clearTimeout(timeout);
reject(error);
}
);
});
}
private async handleErrorsAndRetry<T, R>(
inputs: T[],
results: (R | null)[],
errors: (Error | null)[],
processor: (input: T) => Promise<R>
): Promise<R[]> {
const finalResults: R[] = [];
for (let i = 0; i < inputs.length; i++) {
if (errors[i] && results[i] === null) {
// 尝试重试
let retrySuccess = false;
let retryResult: R | null = null;
for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
try {
retryResult = await processor(inputs[i]);
retrySuccess = true;
break;
} catch (retryError) {
if (attempt === this.retryAttempts) {
throw new Error(`重试 ${this.retryAttempts} 次后仍然失败: ${retryError.message}`);
}
// 指数退避
await this.delay(1000 * Math.pow(2, attempt));
}
}
if (retrySuccess && retryResult !== null) {
finalResults.push(retryResult);
} else {
throw errors[i]!;
}
} else {
finalResults.push(results[i]!);
}
}
return finalResults;
}
private updatePerformanceMetrics(batchTime: number, errors: (Error | null)[]): void {
// 更新成功率
const errorCount = errors.filter(error => error !== null).length;
const successCount = errors.length - errorCount;
this.successRate = successCount / errors.length;
// 更新平均响应时间
this.avgResponseTime = batchTime / errors.length;
}
private adjustConcurrency(): void {
// 根据成功率和响应时间动态调整并发数
if (this.successRate < 0.8) {
// 成功率低,降低并发数
this.currentConcurrency = Math.max(1, Math.floor(this.currentConcurrency * 0.8));
} else if (this.successRate > 0.95 && this.avgResponseTime < 1000) {
// 成功率高且响应快,可以适当增加并发数
this.currentConcurrency = Math.min(this.maxConcurrency, Math.ceil(this.currentConcurrency * 1.1));
}
}
private async delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}LLM 批处理优化
针对 LLM 的批处理优化实现:
typescript
class LLMBatchProcessor extends SmartBatchProcessor {
private llm: BaseLanguageModel;
private rateLimiter: RateLimiter;
constructor(
llm: BaseLanguageModel,
options: BatchProcessingOptions = {},
rateLimitOptions: { requestsPerMinute?: number } = {}
) {
super(options);
this.llm = llm;
this.rateLimiter = new RateLimiter(
rateLimitOptions.requestsPerMinute || 60
);
}
async batchProcessPrompts(prompts: string[]): Promise<string[]> {
return await this.batchProcess(prompts, async (prompt: string) => {
// 检查速率限制
await this.rateLimiter.acquire();
// 处理单个提示
return await this.llm.invoke(prompt);
});
}
async batchProcessMessages(
messageSets: BaseMessage[][]
): Promise<BaseMessage[]> {
return await this.batchProcess(messageSets, async (messages: BaseMessage[]) => {
// 检查速率限制
await this.rateLimiter.acquire();
// 处理消息集
return await this.llm.invoke(messages);
});
}
}
// 速率限制器实现
class RateLimiter {
private requestsPerMinute: number;
private requestsThisMinute: number;
private minuteStart: number;
private queue: Array<() => void>;
constructor(requestsPerMinute: number) {
this.requestsPerMinute = requestsPerMinute;
this.requestsThisMinute = 0;
this.minuteStart = Date.now();
this.queue = [];
}
async acquire(): Promise<void> {
const now = Date.now();
// 检查是否需要重置计数器
if (now - this.minuteStart >= 60000) {
this.requestsThisMinute = 0;
this.minuteStart = now;
}
return new Promise((resolve) => {
if (this.requestsThisMinute < this.requestsPerMinute) {
this.requestsThisMinute++;
resolve();
} else {
this.queue.push(resolve);
this.scheduleNextRequest();
}
});
}
private scheduleNextRequest(): void {
const timeUntilNextMinute = 60000 - (Date.now() - this.minuteStart);
const timePerRequest = 60000 / this.requestsPerMinute;
const delay = Math.max(timePerRequest, timeUntilNextMinute / (this.requestsPerMinute - this.requestsThisMinute));
setTimeout(() => {
this.requestsThisMinute++;
if (this.requestsThisMinute >= this.requestsPerMinute) {
this.requestsThisMinute = 0;
this.minuteStart = Date.now();
}
if (this.queue.length > 0) {
const resolve = this.queue.shift()!;
resolve();
if (this.queue.length > 0) {
this.scheduleNextRequest();
}
}
}, delay);
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何在文档处理系统中使用批处理优化:
typescript
// 批处理文档问答系统
class BatchDocumentQASystem {
private retriever: Runnable<string, Document[]>;
private llm: BaseLanguageModel;
private batchProcessor: LLMBatchProcessor;
constructor(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel
) {
this.retriever = retriever;
this.llm = llm;
this.batchProcessor = new LLMBatchProcessor(llm, {
maxConcurrency: 5,
timeout: 30000,
retryAttempts: 3,
batchSize: 20
}, {
requestsPerMinute: 300
});
}
async answerQuestions(questions: string[]): Promise<Array<{
question: string;
answer: string;
processingTime: number;
}>> {
console.log(`开始批处理 ${questions.length} 个问题`);
const startTime = Date.now();
try {
// 第一步:批量检索相关文档
console.log('批量检索文档...');
const retrievalStartTime = Date.now();
const documentSets = await this.batchRetrieveDocuments(questions);
const retrievalTime = Date.now() - retrievalStartTime;
console.log(`文档检索完成,耗时 ${retrievalTime}ms`);
// 第二步:构建提示
console.log('构建提示...');
const prompts = questions.map((question, index) => {
const documents = documentSets[index];
const context = documents.map(d => d.pageContent).join('\n\n');
return this.buildPrompt(context, question);
});
// 第三步:批量生成答案
console.log('批量生成答案...');
const generationStartTime = Date.now();
const answers = await this.batchProcessor.batchProcessPrompts(prompts);
const generationTime = Date.now() - generationStartTime;
console.log(`答案生成完成,耗时 ${generationTime}ms`);
// 第四步:组装结果
const results = questions.map((question, index) => ({
question,
answer: answers[index],
processingTime: Date.now() - startTime
}));
const totalTime = Date.now() - startTime;
console.log(`批处理完成,总耗时 ${totalTime}ms,平均每个问题 ${(totalTime / questions.length).toFixed(2)}ms`);
return results;
} catch (error) {
console.error('批处理错误:', error);
throw error;
}
}
private async batchRetrieveDocuments(questions: string[]): Promise<Document[][]> {
// 使用智能批处理器进行文档检索
const smartProcessor = new SmartBatchProcessor({
maxConcurrency: 10,
batchSize: 50
});
return await smartProcessor.batchProcess(questions, async (question) => {
return await this.retriever.invoke(question);
});
}
private buildPrompt(context: string, question: string): string {
return `使用以下文档内容回答问题。如果文档中没有相关信息,请说明无法回答。
文档:
${context}
问题: ${question}
答案:`;
}
// 流式批处理
async *streamAnswers(questions: string[]): AsyncGenerator<{
questionIndex: number;
question: string;
partialAnswer: string;
}> {
console.log(`开始流式批处理 ${questions.length} 个问题`);
// 批量检索文档
const documentSets = await this.batchRetrieveDocuments(questions);
// 为每个问题创建流式处理任务
const streams = questions.map(async (question, index) => {
const documents = documentSets[index];
const context = documents.map(d => d.pageContent).join('\n\n');
const prompt = this.buildPrompt(context, question);
const stream = await this.llm.stream(prompt);
return { index, question, stream };
});
// 等待所有流准备就绪
const streamTasks = await Promise.all(streams);
// 并行处理所有流
const activeStreams = new Map<number, AsyncIterator<string>>();
streamTasks.forEach(({ index, question, stream }) => {
activeStreams.set(index, stream[Symbol.asyncIterator]());
});
// 同时从所有流中读取数据
while (activeStreams.size > 0) {
const promises = Array.from(activeStreams.entries()).map(async ([index, iterator]) => {
try {
const { value, done } = await iterator.next();
return { index, value, done };
} catch (error) {
return { index, error, done: true };
}
});
try {
const results = await Promise.all(promises);
for (const result of results) {
if (result.done) {
activeStreams.delete(result.index);
if (result.error) {
yield {
questionIndex: result.index,
question: streamTasks[result.index].question,
partialAnswer: `错误: ${(result.error as Error).message}`
};
}
} else {
yield {
questionIndex: result.index,
question: streamTasks[result.index].question,
partialAnswer: result.value
};
}
}
} catch (error) {
console.error('流处理错误:', error);
break;
}
}
console.log('所有流式处理完成');
}
}
// 创建批处理问答系统
async function createBatchQASystem() {
// 创建向量存储和检索器
const vectorStore = new MemoryVectorStore();
// 添加大量示例文档
const sampleDocuments: Document[] = [];
for (let i = 0; i < 100; i++) {
sampleDocuments.push({
pageContent: `这是文档 ${i} 的内容。LangChain 是一个强大的框架,用于构建由大型语言模型驱动的应用程序。文档 ${i} 包含关于人工智能、机器学习和自然语言处理的信息。`,
metadata: {
source: `doc-${i}`,
category: i % 3 === 0 ? 'ai' : i % 3 === 1 ? 'ml' : 'nlp',
id: i
}
});
}
await vectorStore.addDocuments(sampleDocuments);
const retriever = new VectorStoreRetriever({
vectorStore,
k: 3,
filter: (doc) => doc.metadata.id !== undefined
});
// 创建批处理问答系统
const batchQASystem = new BatchDocumentQASystem(
retriever,
new ChatOpenAI({
modelName: "gpt-3.5-turbo",
temperature: 0.7
})
);
return batchQASystem;
}
// 使用示例
async function demonstrateBatchProcessing() {
const batchQASystem = await createBatchQASystem();
console.log('=== 批处理问答系统演示 ===\n');
// 准备批量问题
const questions = [
"什么是 LangChain?",
"LangChain 有什么特性?",
"如何使用 LCEL?",
"VectorStoreRetriever 的作用是什么?",
"ContextualCompressionRetriever 如何工作?",
"什么是 LLM?",
"如何构建 RAG 应用?",
"LangChain 支持哪些模型?",
"OutputParser 的作用是什么?",
"如何优化 LLM 应用性能?"
];
console.log(`准备处理 ${questions.length} 个问题...\n`);
// 批量处理
const startTime = Date.now();
const results = await batchQASystem.answerQuestions(questions);
const totalTime = Date.now() - startTime;
console.log(`\n=== 批处理结果 ===`);
console.log(`总处理时间: ${totalTime}ms`);
console.log(`平均每个问题: ${(totalTime / questions.length).toFixed(2)}ms\n`);
results.slice(0, 3).forEach((result, index) => {
console.log(`问题 ${index + 1}: ${result.question}`);
console.log(`答案: ${result.answer.substring(0, 100)}...`);
console.log(`处理时间: ${result.processingTime}ms\n`);
});
// 流式批处理演示
console.log('=== 流式批处理演示 ===');
const streamQuestions = [
"简述 LangChain 的核心概念",
"LCEL 有什么优势?"
];
console.log('开始流式处理...\n');
for await (const { questionIndex, question, partialAnswer } of
batchQASystem.streamAnswers(streamQuestions)) {
if (partialAnswer.includes('\n')) {
console.log(`[问题 ${questionIndex + 1}] ${question}`);
console.log(`答案: ${partialAnswer}\n`);
} else {
process.stdout.write(partialAnswer);
}
}
console.log('\n流式处理完成\n');
}总结
批处理优化通过 .batch() 方法和并发控制机制,显著提高了 LLM 应用的处理效率:
- 并行处理 - 同时处理多个请求,提高吞吐量
- 并发控制 - 通过信号量和速率限制避免压垮后端服务
- 智能优化 - 动态调整并发数和批处理大小
- 错误处理 - 完善的错误处理和重试机制
- 性能监控 - 实时监控成功率和响应时间
- 流式支持 - 支持流式批处理以提供实时反馈
通过这些优化技术,开发者能够构建高性能、高可靠性的 LLM 应用,满足大规模处理需求。
在下一章中,我们将探讨缓存机制:RunnableCache 如何避免重复调用,了解如何通过缓存提高系统效率。