身份验证与速率限制:如何在 RunnableConfig 中传递用户信息?
在构建生产级 LLM 应用时,身份验证和速率限制是确保系统安全性和稳定性的关键要素。LangChain V3 通过 RunnableConfig 提供了传递用户信息和实施访问控制的机制。本章将深入探讨如何在 LangChain 应用中集成身份验证和速率限制。
RunnableConfig 中的用户信息传递
RunnableConfig 允许在执行链中传递元数据,包括用户身份信息:
typescript
// 用户信息接口
interface UserInfo {
userId: string;
username: string;
roles: string[];
permissions: string[];
tenantId?: string;
}
// 扩展 RunnableConfig 以支持用户信息
interface AuthenticatedRunnableConfig extends RunnableConfig {
user?: UserInfo;
apiKey?: string;
sessionId?: string;
}
// 基础认证检查器
class AuthChecker {
async checkPermission(
config: AuthenticatedRunnableConfig,
requiredPermission: string
): Promise<boolean> {
const user = config.user;
if (!user) {
throw new Error('用户未认证');
}
// 检查用户是否具有所需权限
if (user.permissions.includes(requiredPermission)) {
return true;
}
// 检查用户角色是否具有所需权限
// 这里简化处理,实际应用中可能需要查询角色权限映射
const rolePermissions: Record<string, string[]> = {
'admin': ['read', 'write', 'delete', 'admin'],
'user': ['read', 'write'],
'guest': ['read']
};
for (const role of user.roles) {
if (rolePermissions[role]?.includes(requiredPermission)) {
return true;
}
}
return false;
}
async checkRole(
config: AuthenticatedRunnableConfig,
requiredRole: string
): Promise<boolean> {
const user = config.user;
if (!user) {
throw new Error('用户未认证');
}
return user.roles.includes(requiredRole);
}
}速率限制实现
实现基于用户和 API 密钥的速率限制:
typescript
// 速率限制配置
interface RateLimitConfig {
requestsPerMinute?: number;
requestsPerHour?: number;
tokensPerMinute?: number;
tokensPerDay?: number;
}
// 速率限制记录
interface RateLimitRecord {
userId?: string;
apiKey?: string;
requestCount: number;
tokenCount: number;
windowStart: number;
}
// 速率限制器
class RateLimiter {
private limits: Map<string, RateLimitRecord>;
private defaultConfig: RateLimitConfig;
constructor(defaultConfig: RateLimitConfig = {}) {
this.limits = new Map();
this.defaultConfig = {
requestsPerMinute: 60,
tokensPerDay: 100000,
...defaultConfig
};
}
async checkRateLimit(
config: AuthenticatedRunnableConfig,
tokens: number = 1
): Promise<{ allowed: boolean; resetTime?: number }> {
const key = this.generateKey(config);
const now = Date.now();
const record = this.limits.get(key) || this.createEmptyRecord(now);
// 检查时间窗口是否已过期
if (now - record.windowStart >= 60000) { // 1分钟窗口
record.requestCount = 0;
record.tokenCount = 0;
record.windowStart = now;
}
// 更新计数
record.requestCount++;
record.tokenCount += tokens;
// 保存记录
this.limits.set(key, record);
// 检查限制
const requestsPerMinute = config.user?.permissions.includes('premium')
? this.defaultConfig.requestsPerMinute! * 10
: this.defaultConfig.requestsPerMinute!;
if (record.requestCount > requestsPerMinute) {
const resetTime = record.windowStart + 60000;
return { allowed: false, resetTime };
}
const tokensPerDay = config.user?.permissions.includes('premium')
? this.defaultConfig.tokensPerDay! * 10
: this.defaultConfig.tokensPerDay!;
if (record.tokenCount > tokensPerDay) {
// 这里简化处理,实际应用中可能需要更复杂的逻辑
return { allowed: false };
}
return { allowed: true };
}
private generateKey(config: AuthenticatedRunnableConfig): string {
if (config.apiKey) {
return `api:${config.apiKey}`;
}
if (config.user?.userId) {
return `user:${config.user.userId}`;
}
if (config.sessionId) {
return `session:${config.sessionId}`;
}
throw new Error('无法生成速率限制键:缺少用户标识');
}
private createEmptyRecord(now: number): RateLimitRecord {
return {
requestCount: 0,
tokenCount: 0,
windowStart: now
};
}
// 获取当前使用情况
getUsage(config: AuthenticatedRunnableConfig): {
requestsUsed: number;
tokensUsed: number;
requestsLimit: number;
tokensLimit: number;
} {
const key = this.generateKey(config);
const record = this.limits.get(key) || this.createEmptyRecord(Date.now());
const requestsLimit = config.user?.permissions.includes('premium')
? this.defaultConfig.requestsPerMinute! * 10
: this.defaultConfig.requestsPerMinute!;
const tokensLimit = config.user?.permissions.includes('premium')
? this.defaultConfig.tokensPerDay! * 10
: this.defaultConfig.tokensPerDay!;
return {
requestsUsed: record.requestCount,
tokensUsed: record.tokenCount,
requestsLimit,
tokensLimit
};
}
}认证和速率限制中间件
实现认证和速率限制的中间件:
typescript
// 认证和速率限制中间件
class AuthRateLimitMiddleware {
private authChecker: AuthChecker;
private rateLimiter: RateLimiter;
constructor(authChecker: AuthChecker, rateLimiter: RateLimiter) {
this.authChecker = authChecker;
this.rateLimiter = rateLimiter;
}
async process(
input: any,
config: AuthenticatedRunnableConfig
): Promise<{ allowed: boolean; error?: string; resetTime?: number }> {
try {
// 1. 验证用户认证
if (!config.user && !config.apiKey) {
return {
allowed: false,
error: '缺少认证信息'
};
}
// 2. 检查权限(根据具体操作)
// 这里假设需要 'use_llm' 权限
const hasPermission = await this.authChecker.checkPermission(
config,
'use_llm'
);
if (!hasPermission) {
return {
allowed: false,
error: '权限不足'
};
}
// 3. 检查速率限制
// 估算 token 使用量(简化处理)
const estimatedTokens = typeof input === 'string'
? input.length / 4
: JSON.stringify(input).length / 4;
const rateLimitResult = await this.rateLimiter.checkRateLimit(
config,
estimatedTokens
);
if (!rateLimitResult.allowed) {
return {
allowed: false,
error: '超过速率限制',
resetTime: rateLimitResult.resetTime
};
}
return { allowed: true };
} catch (error) {
return {
allowed: false,
error: error instanceof Error ? error.message : '认证检查失败'
};
}
}
}在 LangChain 中集成认证和速率限制
将认证和速率限制集成到 LangChain 组件中:
typescript
// 带认证和速率限制的 Runnable 包装器
class AuthenticatedRunnable<Input, Output> extends Runnable<Input, Output> {
private runnable: Runnable<Input, Output>;
private middleware: AuthRateLimitMiddleware;
constructor(
runnable: Runnable<Input, Output>,
middleware: AuthRateLimitMiddleware
) {
super();
this.runnable = runnable;
this.middleware = middleware;
}
async invoke(input: Input, options?: AuthenticatedRunnableConfig): Promise<Output> {
// 执行认证和速率限制检查
const checkResult = await this.middleware.process(input, options || {});
if (!checkResult.allowed) {
throw new Error(
`访问被拒绝: ${checkResult.error}` +
(checkResult.resetTime
? `。限制将在 ${new Date(checkResult.resetTime).toLocaleTimeString()} 重置`
: '')
);
}
// 执行实际的 Runnable
return await this.runnable.invoke(input, options);
}
async batch(inputs: Input[], options?: AuthenticatedRunnableConfig): Promise<Output[]> {
// 对于批处理,需要检查每个输入
const checkResults = await Promise.all(
inputs.map(input => this.middleware.process(input, options || {}))
);
// 检查是否有任何拒绝
const deniedResult = checkResults.find(result => !result.allowed);
if (deniedResult) {
throw new Error(
`批处理访问被拒绝: ${deniedResult.error}` +
(deniedResult.resetTime
? `。限制将在 ${new Date(deniedResult.resetTime).toLocaleTimeString()} 重置`
: '')
);
}
// 执行实际的批处理
return await this.runnable.batch(inputs, options);
}
}
// 带用户上下文的 LLM 调用
class UserContextLLM extends BaseChatModel {
private llm: BaseChatModel;
constructor(llm: BaseChatModel) {
super();
this.llm = llm;
}
async _call(
messages: BaseMessage[],
options?: AuthenticatedRunnableConfig
): Promise<string> {
// 在消息中添加用户上下文信息
const userContextMessages: BaseMessage[] = [];
if (options?.user) {
userContextMessages.push({
role: 'system',
content: `用户ID: ${options.user.userId}\n用户名: ${options.user.username}\n角色: ${options.user.roles.join(', ')}`
});
}
const enhancedMessages = [...userContextMessages, ...messages];
return await this.llm._call(enhancedMessages, options);
}
async *_streamResponseChunks(
messages: BaseMessage[],
options?: AuthenticatedRunnableConfig
): AsyncGenerator<ChatGeneration> {
// 在流式响应中同样添加用户上下文
const userContextMessages: BaseMessage[] = [];
if (options?.user) {
userContextMessages.push({
role: 'system',
content: `用户ID: ${options.user.userId}\n用户名: ${options.user.username}\n角色: ${options.user.roles.join(', ')}`
});
}
const enhancedMessages = [...userContextMessages, ...messages];
for await (const chunk of this.llm._streamResponseChunks(enhancedMessages, options)) {
yield chunk;
}
}
}NestJS 中的实现
在 NestJS 中实现认证和速率限制:
typescript
// auth.middleware.ts
import { Injectable, NestMiddleware } from '@nestjs/common';
import { Request, Response, NextFunction } from 'express';
@Injectable()
export class AuthMiddleware implements NestMiddleware {
async use(req: Request, res: Response, next: NextFunction) {
try {
// 从请求头或会话中提取认证信息
const authHeader = req.headers.authorization;
const apiKey = req.headers['x-api-key'] as string;
if (authHeader) {
// Bearer Token 认证
const token = authHeader.replace('Bearer ', '');
const user = await this.validateToken(token);
(req as any).user = user;
} else if (apiKey) {
// API Key 认证
const user = await this.validateApiKey(apiKey);
(req as any).user = user;
} else {
// 匿名用户
(req as any).user = null;
}
next();
} catch (error) {
res.status(401).json({ error: '认证失败' });
}
}
private async validateToken(token: string): Promise<UserInfo> {
// 实现 token 验证逻辑
// 这里简化处理,实际应用中可能需要调用认证服务
return {
userId: 'user123',
username: 'testuser',
roles: ['user'],
permissions: ['read', 'write', 'use_llm']
};
}
private async validateApiKey(apiKey: string): Promise<UserInfo> {
// 实现 API Key 验证逻辑
return {
userId: 'apiuser123',
username: 'apiuser',
roles: ['api'],
permissions: ['read', 'use_llm']
};
}
}
// rate-limit.guard.ts
import { Injectable, CanActivate, ExecutionContext } from '@nestjs/common';
import { RateLimiter } from './rate-limiter';
@Injectable()
export class RateLimitGuard implements CanActivate {
private rateLimiter: RateLimiter;
constructor() {
this.rateLimiter = new RateLimiter({
requestsPerMinute: 10,
tokensPerDay: 50000
});
}
async canActivate(context: ExecutionContext): Promise<boolean> {
const request = context.switchToHttp().getRequest();
const user = (request as any).user;
if (!user) {
// 匿名用户限制
return true; // 或者拒绝匿名访问
}
const config: AuthenticatedRunnableConfig = {
user
};
const result = await this.rateLimiter.checkRateLimit(config);
if (!result.allowed) {
throw new Error(
'超过速率限制' +
(result.resetTime
? `。限制将在 ${new Date(result.resetTime).toLocaleTimeString()} 重置`
: '')
);
}
return true;
}
}
// langchain-with-auth.controller.ts
import { Controller, Post, Body, UseGuards, UseInterceptors } from '@nestjs/common';
import { AuthMiddleware } from './auth.middleware';
import { RateLimitGuard } from './rate-limit.guard';
interface GenerateTextDto {
prompt: string;
}
@Controller('secure-langchain')
@UseGuards(RateLimitGuard)
export class SecureLangchainController {
private readonly authChecker: AuthChecker;
private readonly rateLimiter: RateLimiter;
private readonly middleware: AuthRateLimitMiddleware;
constructor() {
this.authChecker = new AuthChecker();
this.rateLimiter = new RateLimiter();
this.middleware = new AuthRateLimitMiddleware(this.authChecker, this.rateLimiter);
}
@Post('generate')
async generateText(
@Body() body: GenerateTextDto,
@Request() req: any
): Promise<{ result: string; usage?: any }> {
try {
const user = req.user;
// 创建带认证的配置
const config: AuthenticatedRunnableConfig = {
user,
metadata: {
endpoint: 'generate-text',
timestamp: Date.now()
}
};
// 创建带认证的 LLM
const baseLLM = new ChatOpenAI({ modelName: 'gpt-3.5-turbo' });
const userLLM = new UserContextLLM(baseLLM);
const authenticatedLLM = new AuthenticatedRunnable(userLLM, this.middleware);
// 生成文本
const result = await authenticatedLLM.invoke(body.prompt, config);
// 获取使用情况
const usage = this.rateLimiter.getUsage(config);
return { result, usage };
} catch (error) {
throw new Error(`处理失败: ${error.message}`);
}
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何在系统中集成认证和速率限制:
typescript
// 完整的安全问答系统
class SecureQASystem {
private authChecker: AuthChecker;
private rateLimiter: RateLimiter;
private middleware: AuthRateLimitMiddleware;
private vectorStore: MemoryVectorStore;
constructor() {
this.authChecker = new AuthChecker();
this.rateLimiter = new RateLimiter({
requestsPerMinute: 5,
tokensPerDay: 10000
});
this.middleware = new AuthRateLimitMiddleware(this.authChecker, this.rateLimiter);
this.vectorStore = new MemoryVectorStore(new OpenAIEmbeddings());
// 初始化文档
this.initializeDocuments();
}
private async initializeDocuments(): Promise<void> {
const documents: Document[] = [
{
pageContent: "LangChain 是一个用于开发由语言模型驱动的应用程序的框架。它提供了一套工具和组件,帮助开发者将语言模型与其他数据源、计算逻辑和外部系统集成。",
metadata: { category: "framework", accessLevel: "public" }
},
{
pageContent: "LCEL (LangChain Expression Language) 是 LangChain V3 的重要特性。它是一种声明式语言,允许开发者使用管道操作符(|)将不同的组件连接在一起。",
metadata: { category: "language", accessLevel: "public" }
},
{
pageContent: "企业内部文档:这是敏感的商业信息,只允许授权员工访问。包含公司的战略规划和财务数据。",
metadata: { category: "internal", accessLevel: "confidential" }
}
];
await this.vectorStore.addDocuments(documents);
}
async answerQuestion(
question: string,
config: AuthenticatedRunnableConfig
): Promise<{
answer: string;
usage?: {
requestsUsed: number;
tokensUsed: number;
requestsLimit: number;
tokensLimit: number;
};
documentsAccessed?: number;
}> {
try {
// 执行认证和速率限制检查
const checkResult = await this.middleware.process(question, config);
if (!checkResult.allowed) {
throw new Error(
`访问被拒绝: ${checkResult.error}` +
(checkResult.resetTime
? `。限制将在 ${new Date(checkResult.resetTime).toLocaleTimeString()} 重置`
: '')
);
}
// 检查文档访问权限
const accessibleDocs = await this.getAccessibleDocuments(config);
// 检索相关文档
const retriever = new VectorStoreRetriever({
vectorStore: this.vectorStore,
k: 3,
filter: (doc) => accessibleDocs.some(d => d.pageContent === doc.pageContent)
});
const documents = await retriever.invoke(question);
// 构建提示
const context = documents.map(d => d.pageContent).join('\n\n');
const prompt = new PromptTemplate({
template: `基于以下文档回答问题。如果文档中没有相关信息,请说明无法回答。
文档:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
});
// 创建带用户上下文的 LLM
const baseLLM = new ChatOpenAI({ modelName: 'gpt-3.5-turbo' });
const userLLM = new UserContextLLM(baseLLM);
// 添加用户信息到提示中
const enhancedPrompt = `用户信息:
用户ID: ${config.user?.userId || 'anonymous'}
用户名: ${config.user?.username || 'anonymous'}
角色: ${config.user?.roles.join(', ') || 'guest'}
${await prompt.format({ context, question })}`;
const answer = await userLLM.invoke([{ role: 'user', content: enhancedPrompt }]);
// 获取使用情况
const usage = this.rateLimiter.getUsage(config);
return {
answer,
usage,
documentsAccessed: documents.length
};
} catch (error) {
throw new Error(`问答处理失败: ${error.message}`);
}
}
private async getAccessibleDocuments(
config: AuthenticatedRunnableConfig
): Promise<Document[]> {
// 根据用户权限过滤可访问的文档
const allDocs = await this.vectorStore.similaritySearch('', 100); // 获取所有文档
if (!config.user) {
// 匿名用户只能访问公开文档
return allDocs.filter(doc => doc.metadata.accessLevel === 'public');
}
if (config.user.roles.includes('admin')) {
// 管理员可以访问所有文档
return allDocs;
}
if (config.user.roles.includes('employee')) {
// 员工可以访问公开和内部文档
return allDocs.filter(doc =>
doc.metadata.accessLevel === 'public' ||
doc.metadata.accessLevel === 'confidential'
);
}
// 普通用户只能访问公开文档
return allDocs.filter(doc => doc.metadata.accessLevel === 'public');
}
// 获取用户配额信息
async getUserQuota(config: AuthenticatedRunnableConfig): Promise<{
requestsUsed: number;
tokensUsed: number;
requestsLimit: number;
tokensLimit: number;
resetTime: string;
}> {
const usage = this.rateLimiter.getUsage(config);
const record = this.rateLimiter['limits'].get(
this.rateLimiter['generateKey'](config)
) || { windowStart: Date.now() };
return {
...usage,
resetTime: new Date(record.windowStart + 60000).toLocaleTimeString()
};
}
}
// 使用示例
async function demonstrateSecureQA() {
console.log('=== 安全问答系统演示 ===\n');
const qaSystem = new SecureQASystem();
// 普通用户配置
const userConfig: AuthenticatedRunnableConfig = {
user: {
userId: 'user123',
username: 'testuser',
roles: ['user'],
permissions: ['read', 'write', 'use_llm']
}
};
// 管理员用户配置
const adminConfig: AuthenticatedRunnableConfig = {
user: {
userId: 'admin123',
username: 'admin',
roles: ['user', 'admin'],
permissions: ['read', 'write', 'delete', 'admin', 'use_llm']
}
};
try {
// 普通用户提问
console.log('普通用户提问:');
const result1 = await qaSystem.answerQuestion(
"什么是 LangChain?",
userConfig
);
console.log(`答案: ${result1.answer}`);
console.log(`访问文档数: ${result1.documentsAccessed}`);
console.log(`使用情况:`, result1.usage);
console.log();
// 管理员用户提问(可以访问敏感信息)
console.log('管理员用户提问:');
const result2 = await qaSystem.answerQuestion(
"企业内部有哪些敏感信息?",
adminConfig
);
console.log(`答案: ${result2.answer}`);
console.log(`访问文档数: ${result2.documentsAccessed}`);
console.log();
// 检查配额
console.log('用户配额信息:');
const quota = await qaSystem.getUserQuota(userConfig);
console.log(quota);
} catch (error) {
console.error('错误:', error.message);
}
}
// API 调用示例
/*
// 客户端调用示例
async function callSecureAPI() {
const response = await fetch('/api/secure-langchain/generate', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-jwt-token',
// 或者使用 API Key
// 'X-API-Key': 'your-api-key'
},
body: JSON.stringify({
prompt: '解释一下 LangChain 的核心概念'
})
});
const data = await response.json();
console.log('API 响应:', data);
}
*/总结
通过在 RunnableConfig 中传递用户信息,LangChain V3 提供了强大的身份验证和速率限制能力:
- 用户信息传递 - 通过
RunnableConfig传递用户身份和权限信息 - 权限检查 - 实现基于角色和权限的访问控制
- 速率限制 - 基于用户和 API Key 实施请求和令牌限制
- 中间件模式 - 使用中间件统一处理认证和速率限制
- NestJS 集成 - 在 NestJS 中实现守卫和中间件模式
- 文档级控制 - 实现细粒度的文档访问控制
- 使用情况监控 - 提供配额和使用情况查询功能
这些机制确保了 LangChain 应用的安全性和稳定性,为构建生产级 LLM 应用提供了坚实的基础。
在下一章中,我们将探讨持久化会话:RunnableWithMessageHistory 如何管理 chat_history,了解如何在 LangChain 中实现对话历史管理。