ContextualCompressionRetriever:在检索后压缩上下文
使用 LLMChainExtractor 移除无关句子
在检索增强生成(RAG)应用中,检索到的文档往往包含大量信息,其中有些可能与当前查询无关。ContextualCompressionRetriever 通过在检索后压缩上下文,移除无关内容,提高生成质量。本章将深入探讨 ContextualCompressionRetriever 的设计和实现。
ContextualCompressionRetriever 的基本概念
ContextualCompressionRetriever 在传统的检索器基础上增加了一个压缩步骤,使用 LLM 来提取与查询最相关的内容:
typescript
interface BaseCompressor {
compressDocuments(
documents: Document[],
query: string
): Promise<Document[]>;
}
class ContextualCompressionRetriever implements Runnable<string, Document[]> {
private baseRetriever: Runnable<string, Document[]>;
private baseCompressor: BaseCompressor;
constructor(
baseRetriever: Runnable<string, Document[]>,
baseCompressor: BaseCompressor
) {
this.baseRetriever = baseRetriever;
this.baseCompressor = baseCompressor;
}
async invoke(query: string, options?: RunnableConfig): Promise<Document[]> {
// 第一步:使用基础检索器检索文档
const documents = await this.baseRetriever.invoke(query, options);
// 第二步:使用压缩器压缩文档
const compressedDocuments = await this.baseCompressor.compressDocuments(documents, query);
return compressedDocuments;
}
async batch(queries: string[], options?: RunnableConfig): Promise<Document[][]> {
// 批量处理查询
return await Promise.all(queries.map(query => this.invoke(query, options)));
}
}LLMChainExtractor 的实现
LLMChainExtractor 是最常用的压缩器之一,它使用 LLM 来提取与查询相关的内容:
typescript
class LLMChainExtractor implements BaseCompressor {
private llm: BaseLanguageModel;
private promptTemplate: PromptTemplate;
constructor(llm: BaseLanguageModel, promptTemplate?: PromptTemplate) {
this.llm = llm;
this.promptTemplate = promptTemplate || this.getDefaultPromptTemplate();
}
async compressDocuments(documents: Document[], query: string): Promise<Document[]> {
const compressedDocuments: Document[] = [];
// 对每个文档进行压缩
for (const document of documents) {
try {
const compressedContent = await this.extractRelevantContent(
document.pageContent,
query
);
if (compressedContent.trim()) {
compressedDocuments.push({
pageContent: compressedContent,
metadata: {
...document.metadata,
originalContentLength: document.pageContent.length,
compressedContentLength: compressedContent.length,
compressionRatio: compressedContent.length / document.pageContent.length
}
});
}
} catch (error) {
console.warn('文档压缩失败,保留原始文档:', error);
compressedDocuments.push(document);
}
}
return compressedDocuments;
}
private async extractRelevantContent(content: string, query: string): Promise<string> {
const prompt = await this.promptTemplate.format({
content,
query
});
const response = await this.llm.invoke(prompt);
return response.trim();
}
private getDefaultPromptTemplate(): PromptTemplate {
return new PromptTemplate({
template: `从以下文档内容中提取与查询问题直接相关的信息。
如果文档内容与查询无关,则返回空字符串。
文档内容:
{content}
查询问题:
{query}
相关内容:`,
inputVariables: ["content", "query"]
});
}
}更智能的压缩策略
让我们实现一个更智能的压缩器,能够处理复杂场景:
typescript
interface CompressionOptions {
maxTokens?: number;
minCompressionRatio?: number;
preserveMetadata?: boolean;
}
class SmartDocumentCompressor implements BaseCompressor {
private llm: BaseLanguageModel;
private maxTokens: number;
private minCompressionRatio: number;
private preserveMetadata: boolean;
constructor(
llm: BaseLanguageModel,
options: CompressionOptions = {}
) {
this.llm = llm;
this.maxTokens = options.maxTokens || 1000;
this.minCompressionRatio = options.minCompressionRatio || 0.3;
this.preserveMetadata = options.preserveMetadata ?? true;
}
async compressDocuments(documents: Document[], query: string): Promise<Document[]> {
const compressedDocuments: Document[] = [];
for (const document of documents) {
try {
const compressed = await this.compressDocument(document, query);
if (compressed) {
compressedDocuments.push(compressed);
}
} catch (error) {
console.warn('文档压缩失败:', error);
// 如果压缩失败,根据配置决定是否保留原始文档
if (this.shouldPreserveFailedDocuments()) {
compressedDocuments.push(document);
}
}
}
return compressedDocuments;
}
private async compressDocument(document: Document, query: string): Promise<Document | null> {
const { pageContent, metadata } = document;
// 对于很短的文档,直接返回
if (pageContent.length < 100) {
return document;
}
// 使用 LLM 提取相关内容
const extractedContent = await this.extractRelevantContent(pageContent, query);
// 如果提取的内容太少,可能提取失败
if (extractedContent.length < 10) {
// 尝试另一种策略:摘要
const summary = await this.summarizeContent(pageContent, query);
if (summary.length > 10) {
return {
pageContent: summary,
metadata: this.preserveMetadata ? {
...metadata,
compressionType: 'summary',
originalContentLength: pageContent.length,
compressedContentLength: summary.length,
compressionRatio: summary.length / pageContent.length
} : {}
};
} else {
return null; // 内容太不相关
}
}
// 检查压缩比
const compressionRatio = extractedContent.length / pageContent.length;
if (compressionRatio < this.minCompressionRatio) {
// 压缩比过低,可能丢失了太多信息,使用摘要代替
const summary = await this.summarizeContent(pageContent, query);
return {
pageContent: summary,
metadata: this.preserveMetadata ? {
...metadata,
compressionType: 'summary',
originalContentLength: pageContent.length,
compressedContentLength: summary.length,
compressionRatio: summary.length / pageContent.length
} : {}
};
}
return {
pageContent: extractedContent,
metadata: this.preserveMetadata ? {
...metadata,
compressionType: 'extraction',
originalContentLength: pageContent.length,
compressedContentLength: extractedContent.length,
compressionRatio
} : {}
};
}
private async extractRelevantContent(content: string, query: string): Promise<string> {
const prompt = new PromptTemplate({
template: `从以下文档中提取与查询问题直接相关的内容片段。
只返回相关的内容,不要添加解释或其他文本。
文档内容:
{content}
查询问题:
{query}
相关内容:`,
inputVariables: ["content", "query"]
});
const response = await prompt.pipe(this.llm).pipe(new StringOutputParser()).invoke({
content,
query
});
return response;
}
private async summarizeContent(content: string, query: string): Promise<string> {
const prompt = new PromptTemplate({
template: `为以下查询问题总结文档内容。
提供一个简洁但信息丰富的摘要。
文档内容:
{content}
查询相关性:
{query}
文档摘要:`,
inputVariables: ["content", "query"]
});
const response = await prompt.pipe(this.llm).pipe(new StringOutputParser()).invoke({
content,
query
});
return response;
}
private shouldPreserveFailedDocuments(): boolean {
// 可以根据配置或上下文决定是否保留压缩失败的文档
return true;
}
}多阶段压缩策略
实现一个多阶段的压缩策略,逐步优化压缩效果:
typescript
class MultiStageCompressor implements BaseCompressor {
private compressors: BaseCompressor[];
private strategy: 'sequential' | 'parallel' | 'adaptive';
constructor(
compressors: BaseCompressor[],
strategy: 'sequential' | 'parallel' | 'adaptive' = 'sequential'
) {
this.compressors = compressors;
this.strategy = strategy;
}
async compressDocuments(documents: Document[], query: string): Promise<Document[]> {
switch (this.strategy) {
case 'sequential':
return await this.compressSequential(documents, query);
case 'parallel':
return await this.compressParallel(documents, query);
case 'adaptive':
return await this.compressAdaptive(documents, query);
default:
return await this.compressSequential(documents, query);
}
}
private async compressSequential(documents: Document[], query: string): Promise<Document[]> {
let compressedDocs = documents;
// 依次应用每个压缩器
for (const compressor of this.compressors) {
compressedDocs = await compressor.compressDocuments(compressedDocs, query);
}
return compressedDocs;
}
private async compressParallel(documents: Document[], query: string): Promise<Document[]> {
// 并行应用所有压缩器
const results = await Promise.all(
this.compressors.map(compressor =>
compressor.compressDocuments(documents, query)
)
);
// 合并结果,去重并选择最佳内容
return this.mergeCompressedResults(results, query);
}
private async compressAdaptive(documents: Document[], query: string): Promise<Document[]> {
let currentDocs = documents;
for (const compressor of this.compressors) {
const compressed = await compressor.compressDocuments(currentDocs, query);
// 评估压缩效果
const quality = this.evaluateCompressionQuality(currentDocs, compressed);
// 如果质量足够好,继续使用压缩结果
if (quality > 0.7) {
currentDocs = compressed;
}
// 否则保留原始文档或尝试其他策略
}
return currentDocs;
}
private mergeCompressedResults(results: Document[][], query: string): Document[] {
// 实现结果合并逻辑
// 这里简化处理,实际应用中可能需要更复杂的合并策略
const merged: Document[] = [];
const docMap = new Map<string, Document[]>();
// 按文档ID分组
results.flat().forEach(doc => {
const docId = this.getDocumentId(doc);
if (!docMap.has(docId)) {
docMap.set(docId, []);
}
docMap.get(docId)!.push(doc);
});
// 对每组文档选择最佳版本
for (const [_, docs] of docMap) {
const bestDoc = this.selectBestDocument(docs, query);
if (bestDoc) {
merged.push(bestDoc);
}
}
return merged;
}
private evaluateCompressionQuality(original: Document[], compressed: Document[]): number {
if (original.length === 0) return 1;
if (compressed.length === 0) return 0;
// 计算平均压缩比
const totalRatio = compressed.reduce((sum, doc) => {
const originalDoc = original.find(d => this.getDocumentId(d) === this.getDocumentId(doc));
if (originalDoc) {
return sum + (doc.pageContent.length / originalDoc.pageContent.length);
}
return sum;
}, 0);
const avgRatio = totalRatio / compressed.length;
// 压缩比在 0.3-0.7 之间被认为是较好的
if (avgRatio >= 0.3 && avgRatio <= 0.7) {
return 1;
} else if (avgRatio > 0.7) {
return 0.5; // 压缩效果一般
} else {
return 0.8; // 过度压缩
}
}
private selectBestDocument(documents: Document[], query: string): Document | null {
if (documents.length === 0) return null;
if (documents.length === 1) return documents[0];
// 选择压缩比最合适的文档
return documents.reduce((best, current) => {
const bestRatio = best.metadata?.compressionRatio || 1;
const currentRatio = current.metadata?.compressionRatio || 1;
// 寻找压缩比在 0.3-0.7 之间的最佳文档
const bestScore = Math.abs(0.5 - bestRatio);
const currentScore = Math.abs(0.5 - currentRatio);
return currentScore < bestScore ? current : best;
});
}
private getDocumentId(doc: Document): string {
return doc.metadata?.id || this.hashCode(doc.pageContent);
}
private hashCode(str: string): string {
let hash = 0;
for (let i = 0; i < str.length; i++) {
const char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash;
}
return Math.abs(hash).toString();
}
}实际应用示例
让我们看一个完整的实际应用示例:
typescript
// 创建一个智能文档问答系统
class SmartDocumentQASystem {
private retriever: ContextualCompressionRetriever;
private llm: BaseChatModel;
constructor(retriever: ContextualCompressionRetriever, llm: BaseChatModel) {
this.retriever = retriever;
this.llm = llm;
}
async answerQuestion(question: string): Promise<{ answer: string; sources: Document[] }> {
try {
// 检索并压缩相关文档
console.log('检索并压缩相关文档...');
const documents = await this.retriever.invoke(question);
console.log(`检索到 ${documents.length} 个相关文档`);
documents.forEach((doc, index) => {
console.log(`文档 ${index + 1}: 压缩比 ${doc.metadata.compressionRatio?.toFixed(2) || 'N/A'}`);
});
// 构建上下文
const context = this.formatContext(documents);
// 生成答案
const prompt = new PromptTemplate({
template: `基于以下文档内容回答问题。只使用文档中提供的信息,如果文档中没有相关信息,请说明无法回答。
文档内容:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
});
const answer = await prompt
.pipe(this.llm)
.pipe(new StringOutputParser())
.invoke({
context,
question
});
return {
answer,
sources: documents
};
} catch (error) {
console.error('问答系统错误:', error);
return {
answer: "抱歉,处理您的问题时出现了错误。",
sources: []
};
}
}
private formatContext(documents: Document[]): string {
return documents
.map((doc, index) => {
let docText = `[文档 ${index + 1}]\n${doc.pageContent}`;
if (doc.metadata.source) {
docText += `\n来源: ${doc.metadata.source}`;
}
if (doc.metadata.compressionType) {
docText += `\n压缩类型: ${doc.metadata.compressionType}`;
}
return docText;
})
.join('\n\n');
}
// 流式问答
async *streamAnswer(question: string): AsyncGenerator<{
type: 'content' | 'sources' | 'complete';
data: string | Document[]
}> {
try {
// 检索文档
const documents = await this.retriever.invoke(question);
yield { type: 'sources', data: documents };
const context = this.formatContext(documents);
// 流式生成答案
const prompt = new PromptTemplate({
template: `基于以下文档内容回答问题。只使用文档中提供的信息,如果文档中没有相关信息,请说明无法回答。
文档内容:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
});
const stream = await prompt
.pipe(this.llm)
.pipe(new StringOutputParser())
.stream({
context,
question
});
for await (const chunk of stream) {
yield { type: 'content', data: chunk };
}
yield { type: 'complete', data: '' };
} catch (error) {
console.error('流式问答错误:', error);
yield { type: 'content', data: "抱歉,处理您的问题时出现了错误。" };
}
}
}
// 创建系统组件
async function createSmartQASystem() {
// 创建基础向量存储和检索器
const vectorStore = new MemoryVectorStore();
// 添加示例文档
const sampleDocuments: Document[] = [
{
pageContent: `LangChain框架介绍:
LangChain是一个开源框架,专门用于构建由大型语言模型(LLM)驱动的应用程序。它提供了一套工具和组件,帮助开发者将语言模型与其他数据源、计算逻辑和外部系统集成。
主要特性包括:
1. 组件化设计:提供可重用的组件,如提示模板、模型接口、索引等
2. 链式调用:支持将多个组件链接在一起形成处理链
3. 数据连接:支持连接各种数据源,包括文件、数据库、API等
4. 智能代理:提供构建自主代理的能力,可以执行复杂任务
LangChain支持多种编程语言,包括Python和JavaScript/TypeScript。`,
metadata: { source: "langchain-intro", category: "framework" }
},
{
pageContent: `LCEL (LangChain Expression Language) 是LangChain V3的重要特性:
LCEL是一种声明式语言,允许开发者使用管道操作符(|)将不同的组件连接在一起。这种设计使得构建复杂的处理链变得更加直观和简洁。
LCEL的核心概念:
1. Runnable接口:所有组件都实现Runnable接口
2. 管道操作:使用|操作符连接组件
3. 类型安全:提供完整的TypeScript类型支持
4. 流式处理:原生支持流式数据处理
示例代码:
const chain = promptTemplate | llm | outputParser;`,
metadata: { source: "lcel-docs", category: "language" }
},
{
pageContent: `VectorStoreRetriever是LangChain中的检索组件:
VectorStoreRetriever负责从向量存储中检索与查询相关的文档。它实现了Runnable<string, Document[]>接口,可以无缝集成到LCEL管道中。
主要功能:
1. 向量相似度搜索:基于嵌入向量进行相似度匹配
2. 多种搜索策略:支持相似度搜索、MMR等
3. 文档过滤:支持基于元数据的文档过滤
4. 批量处理:支持批量查询处理
使用示例:
const retriever = new VectorStoreRetriever({ vectorStore, k: 4 });`,
metadata: { source: "retriever-docs", category: "retrieval" }
}
];
await vectorStore.addDocuments(sampleDocuments);
const baseRetriever = new VectorStoreRetriever({
vectorStore,
k: 3
});
// 创建压缩器
const compressor = new SmartDocumentCompressor(
new ChatOpenAI({ modelName: "gpt-3.5-turbo" }),
{
maxTokens: 500,
minCompressionRatio: 0.2
}
);
// 创建上下文压缩检索器
const compressionRetriever = new ContextualCompressionRetriever(
baseRetriever,
compressor
);
// 创建问答系统
const qaSystem = new SmartDocumentQASystem(
compressionRetriever,
new ChatOpenAI({ modelName: "gpt-3.5-turbo" })
);
return qaSystem;
}
// 使用示例
async function demonstrateSmartQASystem() {
const qaSystem = await createSmartQASystem();
// 普通问答
console.log('=== 智能文档问答系统演示 ===\n');
const result = await qaSystem.answerQuestion("LangChain的主要特性是什么?");
console.log('问题: LangChain的主要特性是什么?');
console.log('答案:', result.answer);
console.log('使用的文档数量:', result.sources.length);
console.log();
// 流式问答
console.log('流式回答 "什么是LCEL?":');
for await (const { type, data } of qaSystem.streamAnswer("什么是LCEL?")) {
if (type === 'content') {
process.stdout.write(data as string);
}
}
console.log('\n');
}与 LCEL 的集成
ContextualCompressionRetriever 可以无缝集成到 LCEL 管道中:
typescript
// 创建完整的压缩 RAG 链
async function createCompressedRAGChain() {
// 创建组件
const vectorStore = new MemoryVectorStore();
const baseRetriever = new VectorStoreRetriever({ vectorStore, k: 4 });
const compressor = new LLMChainExtractor(new ChatOpenAI());
const compressionRetriever = new ContextualCompressionRetriever(baseRetriever, compressor);
const prompt = new PromptTemplate({
template: `使用以下文档内容回答问题:
文档:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
});
const llm = new ChatOpenAI({ modelName: "gpt-3.5-turbo" });
const parser = new StringOutputParser();
// 构建压缩 RAG 链
const compressedRAGChain =
RunnableMap.from({
question: (input: string) => input,
context: compressionRetriever.pipe((docs: Document[]) =>
docs.map(doc => doc.pageContent).join('\n\n')
)
})
.pipe(prompt)
.pipe(llm)
.pipe(parser);
return compressedRAGChain;
}总结
ContextualCompressionRetriever 通过在检索后压缩上下文,显著提高了 RAG 应用的效果:
- 智能压缩 - 使用 LLMChainExtractor 等压缩器提取相关内容
- 多策略支持 - 支持提取、摘要等多种压缩策略
- 质量控制 - 通过压缩比等指标控制压缩质量
- 灵活集成 - 可以与任何基础检索器集成
- LCEL 兼容 - 无缝集成到 LCEL 管道中
通过这些特性,ContextualCompressionRetriever 使得 RAG 应用能够更精准地使用相关文档,提高生成质量和准确性。
在下一章中,我们将探讨 MultiQueryRetriever:如何提升召回率,了解如何通过生成多个查询来改善检索效果。