Skip to content

身份验证与速率限制:如何在 RunnableConfig 中传递用户信息?

在构建生产级 LLM 应用时,身份验证和速率限制是确保系统安全性和稳定性的关键要素。LangChain V3 通过 RunnableConfig 提供了传递用户信息和实施访问控制的机制。本章将深入探讨如何在 LangChain 应用中集成身份验证和速率限制。

RunnableConfig 中的用户信息传递

RunnableConfig 允许在执行链中传递元数据,包括用户身份信息:

typescript
// 用户信息接口
interface UserInfo {
  userId: string;
  username: string;
  roles: string[];
  permissions: string[];
  tenantId?: string;
}

// 扩展 RunnableConfig 以支持用户信息
interface AuthenticatedRunnableConfig extends RunnableConfig {
  user?: UserInfo;
  apiKey?: string;
  sessionId?: string;
}

// 基础认证检查器
class AuthChecker {
  async checkPermission(
    config: AuthenticatedRunnableConfig,
    requiredPermission: string
  ): Promise<boolean> {
    const user = config.user;
    if (!user) {
      throw new Error('用户未认证');
    }
    
    // 检查用户是否具有所需权限
    if (user.permissions.includes(requiredPermission)) {
      return true;
    }
    
    // 检查用户角色是否具有所需权限
    // 这里简化处理,实际应用中可能需要查询角色权限映射
    const rolePermissions: Record<string, string[]> = {
      'admin': ['read', 'write', 'delete', 'admin'],
      'user': ['read', 'write'],
      'guest': ['read']
    };
    
    for (const role of user.roles) {
      if (rolePermissions[role]?.includes(requiredPermission)) {
        return true;
      }
    }
    
    return false;
  }
  
  async checkRole(
    config: AuthenticatedRunnableConfig,
    requiredRole: string
  ): Promise<boolean> {
    const user = config.user;
    if (!user) {
      throw new Error('用户未认证');
    }
    
    return user.roles.includes(requiredRole);
  }
}

速率限制实现

实现基于用户和 API 密钥的速率限制:

typescript
// 速率限制配置
interface RateLimitConfig {
  requestsPerMinute?: number;
  requestsPerHour?: number;
  tokensPerMinute?: number;
  tokensPerDay?: number;
}

// 速率限制记录
interface RateLimitRecord {
  userId?: string;
  apiKey?: string;
  requestCount: number;
  tokenCount: number;
  windowStart: number;
}

// 速率限制器
class RateLimiter {
  private limits: Map<string, RateLimitRecord>;
  private defaultConfig: RateLimitConfig;
  
  constructor(defaultConfig: RateLimitConfig = {}) {
    this.limits = new Map();
    this.defaultConfig = {
      requestsPerMinute: 60,
      tokensPerDay: 100000,
      ...defaultConfig
    };
  }
  
  async checkRateLimit(
    config: AuthenticatedRunnableConfig,
    tokens: number = 1
  ): Promise<{ allowed: boolean; resetTime?: number }> {
    const key = this.generateKey(config);
    const now = Date.now();
    const record = this.limits.get(key) || this.createEmptyRecord(now);
    
    // 检查时间窗口是否已过期
    if (now - record.windowStart >= 60000) { // 1分钟窗口
      record.requestCount = 0;
      record.tokenCount = 0;
      record.windowStart = now;
    }
    
    // 更新计数
    record.requestCount++;
    record.tokenCount += tokens;
    
    // 保存记录
    this.limits.set(key, record);
    
    // 检查限制
    const requestsPerMinute = config.user?.permissions.includes('premium') 
      ? this.defaultConfig.requestsPerMinute! * 10 
      : this.defaultConfig.requestsPerMinute!;
      
    if (record.requestCount > requestsPerMinute) {
      const resetTime = record.windowStart + 60000;
      return { allowed: false, resetTime };
    }
    
    const tokensPerDay = config.user?.permissions.includes('premium')
      ? this.defaultConfig.tokensPerDay! * 10
      : this.defaultConfig.tokensPerDay!;
      
    if (record.tokenCount > tokensPerDay) {
      // 这里简化处理,实际应用中可能需要更复杂的逻辑
      return { allowed: false };
    }
    
    return { allowed: true };
  }
  
  private generateKey(config: AuthenticatedRunnableConfig): string {
    if (config.apiKey) {
      return `api:${config.apiKey}`;
    }
    
    if (config.user?.userId) {
      return `user:${config.user.userId}`;
    }
    
    if (config.sessionId) {
      return `session:${config.sessionId}`;
    }
    
    throw new Error('无法生成速率限制键:缺少用户标识');
  }
  
  private createEmptyRecord(now: number): RateLimitRecord {
    return {
      requestCount: 0,
      tokenCount: 0,
      windowStart: now
    };
  }
  
  // 获取当前使用情况
  getUsage(config: AuthenticatedRunnableConfig): {
    requestsUsed: number;
    tokensUsed: number;
    requestsLimit: number;
    tokensLimit: number;
  } {
    const key = this.generateKey(config);
    const record = this.limits.get(key) || this.createEmptyRecord(Date.now());
    
    const requestsLimit = config.user?.permissions.includes('premium') 
      ? this.defaultConfig.requestsPerMinute! * 10 
      : this.defaultConfig.requestsPerMinute!;
      
    const tokensLimit = config.user?.permissions.includes('premium')
      ? this.defaultConfig.tokensPerDay! * 10
      : this.defaultConfig.tokensPerDay!;
    
    return {
      requestsUsed: record.requestCount,
      tokensUsed: record.tokenCount,
      requestsLimit,
      tokensLimit
    };
  }
}

认证和速率限制中间件

实现认证和速率限制的中间件:

typescript
// 认证和速率限制中间件
class AuthRateLimitMiddleware {
  private authChecker: AuthChecker;
  private rateLimiter: RateLimiter;
  
  constructor(authChecker: AuthChecker, rateLimiter: RateLimiter) {
    this.authChecker = authChecker;
    this.rateLimiter = rateLimiter;
  }
  
  async process(
    input: any,
    config: AuthenticatedRunnableConfig
  ): Promise<{ allowed: boolean; error?: string; resetTime?: number }> {
    try {
      // 1. 验证用户认证
      if (!config.user && !config.apiKey) {
        return { 
          allowed: false, 
          error: '缺少认证信息' 
        };
      }
      
      // 2. 检查权限(根据具体操作)
      // 这里假设需要 'use_llm' 权限
      const hasPermission = await this.authChecker.checkPermission(
        config,
        'use_llm'
      );
      
      if (!hasPermission) {
        return { 
          allowed: false, 
          error: '权限不足' 
        };
      }
      
      // 3. 检查速率限制
      // 估算 token 使用量(简化处理)
      const estimatedTokens = typeof input === 'string' 
        ? input.length / 4 
        : JSON.stringify(input).length / 4;
        
      const rateLimitResult = await this.rateLimiter.checkRateLimit(
        config,
        estimatedTokens
      );
      
      if (!rateLimitResult.allowed) {
        return {
          allowed: false,
          error: '超过速率限制',
          resetTime: rateLimitResult.resetTime
        };
      }
      
      return { allowed: true };
    } catch (error) {
      return {
        allowed: false,
        error: error instanceof Error ? error.message : '认证检查失败'
      };
    }
  }
}

在 LangChain 中集成认证和速率限制

将认证和速率限制集成到 LangChain 组件中:

typescript
// 带认证和速率限制的 Runnable 包装器
class AuthenticatedRunnable<Input, Output> extends Runnable<Input, Output> {
  private runnable: Runnable<Input, Output>;
  private middleware: AuthRateLimitMiddleware;
  
  constructor(
    runnable: Runnable<Input, Output>,
    middleware: AuthRateLimitMiddleware
  ) {
    super();
    this.runnable = runnable;
    this.middleware = middleware;
  }
  
  async invoke(input: Input, options?: AuthenticatedRunnableConfig): Promise<Output> {
    // 执行认证和速率限制检查
    const checkResult = await this.middleware.process(input, options || {});
    
    if (!checkResult.allowed) {
      throw new Error(
        `访问被拒绝: ${checkResult.error}` +
        (checkResult.resetTime 
          ? `。限制将在 ${new Date(checkResult.resetTime).toLocaleTimeString()} 重置`
          : '')
      );
    }
    
    // 执行实际的 Runnable
    return await this.runnable.invoke(input, options);
  }
  
  async batch(inputs: Input[], options?: AuthenticatedRunnableConfig): Promise<Output[]> {
    // 对于批处理,需要检查每个输入
    const checkResults = await Promise.all(
      inputs.map(input => this.middleware.process(input, options || {}))
    );
    
    // 检查是否有任何拒绝
    const deniedResult = checkResults.find(result => !result.allowed);
    if (deniedResult) {
      throw new Error(
        `批处理访问被拒绝: ${deniedResult.error}` +
        (deniedResult.resetTime 
          ? `。限制将在 ${new Date(deniedResult.resetTime).toLocaleTimeString()} 重置`
          : '')
      );
    }
    
    // 执行实际的批处理
    return await this.runnable.batch(inputs, options);
  }
}

// 带用户上下文的 LLM 调用
class UserContextLLM extends BaseChatModel {
  private llm: BaseChatModel;
  
  constructor(llm: BaseChatModel) {
    super();
    this.llm = llm;
  }
  
  async _call(
    messages: BaseMessage[],
    options?: AuthenticatedRunnableConfig
  ): Promise<string> {
    // 在消息中添加用户上下文信息
    const userContextMessages: BaseMessage[] = [];
    
    if (options?.user) {
      userContextMessages.push({
        role: 'system',
        content: `用户ID: ${options.user.userId}\n用户名: ${options.user.username}\n角色: ${options.user.roles.join(', ')}`
      });
    }
    
    const enhancedMessages = [...userContextMessages, ...messages];
    return await this.llm._call(enhancedMessages, options);
  }
  
  async *_streamResponseChunks(
    messages: BaseMessage[],
    options?: AuthenticatedRunnableConfig
  ): AsyncGenerator<ChatGeneration> {
    // 在流式响应中同样添加用户上下文
    const userContextMessages: BaseMessage[] = [];
    
    if (options?.user) {
      userContextMessages.push({
        role: 'system',
        content: `用户ID: ${options.user.userId}\n用户名: ${options.user.username}\n角色: ${options.user.roles.join(', ')}`
      });
    }
    
    const enhancedMessages = [...userContextMessages, ...messages];
    
    for await (const chunk of this.llm._streamResponseChunks(enhancedMessages, options)) {
      yield chunk;
    }
  }
}

NestJS 中的实现

在 NestJS 中实现认证和速率限制:

typescript
// auth.middleware.ts
import { Injectable, NestMiddleware } from '@nestjs/common';
import { Request, Response, NextFunction } from 'express';

@Injectable()
export class AuthMiddleware implements NestMiddleware {
  async use(req: Request, res: Response, next: NextFunction) {
    try {
      // 从请求头或会话中提取认证信息
      const authHeader = req.headers.authorization;
      const apiKey = req.headers['x-api-key'] as string;
      
      if (authHeader) {
        // Bearer Token 认证
        const token = authHeader.replace('Bearer ', '');
        const user = await this.validateToken(token);
        (req as any).user = user;
      } else if (apiKey) {
        // API Key 认证
        const user = await this.validateApiKey(apiKey);
        (req as any).user = user;
      } else {
        // 匿名用户
        (req as any).user = null;
      }
      
      next();
    } catch (error) {
      res.status(401).json({ error: '认证失败' });
    }
  }
  
  private async validateToken(token: string): Promise<UserInfo> {
    // 实现 token 验证逻辑
    // 这里简化处理,实际应用中可能需要调用认证服务
    return {
      userId: 'user123',
      username: 'testuser',
      roles: ['user'],
      permissions: ['read', 'write', 'use_llm']
    };
  }
  
  private async validateApiKey(apiKey: string): Promise<UserInfo> {
    // 实现 API Key 验证逻辑
    return {
      userId: 'apiuser123',
      username: 'apiuser',
      roles: ['api'],
      permissions: ['read', 'use_llm']
    };
  }
}

// rate-limit.guard.ts
import { Injectable, CanActivate, ExecutionContext } from '@nestjs/common';
import { RateLimiter } from './rate-limiter';

@Injectable()
export class RateLimitGuard implements CanActivate {
  private rateLimiter: RateLimiter;
  
  constructor() {
    this.rateLimiter = new RateLimiter({
      requestsPerMinute: 10,
      tokensPerDay: 50000
    });
  }
  
  async canActivate(context: ExecutionContext): Promise<boolean> {
    const request = context.switchToHttp().getRequest();
    const user = (request as any).user;
    
    if (!user) {
      // 匿名用户限制
      return true; // 或者拒绝匿名访问
    }
    
    const config: AuthenticatedRunnableConfig = {
      user
    };
    
    const result = await this.rateLimiter.checkRateLimit(config);
    
    if (!result.allowed) {
      throw new Error(
        '超过速率限制' +
        (result.resetTime 
          ? `。限制将在 ${new Date(result.resetTime).toLocaleTimeString()} 重置`
          : '')
      );
    }
    
    return true;
  }
}

// langchain-with-auth.controller.ts
import { Controller, Post, Body, UseGuards, UseInterceptors } from '@nestjs/common';
import { AuthMiddleware } from './auth.middleware';
import { RateLimitGuard } from './rate-limit.guard';

interface GenerateTextDto {
  prompt: string;
}

@Controller('secure-langchain')
@UseGuards(RateLimitGuard)
export class SecureLangchainController {
  private readonly authChecker: AuthChecker;
  private readonly rateLimiter: RateLimiter;
  private readonly middleware: AuthRateLimitMiddleware;
  
  constructor() {
    this.authChecker = new AuthChecker();
    this.rateLimiter = new RateLimiter();
    this.middleware = new AuthRateLimitMiddleware(this.authChecker, this.rateLimiter);
  }
  
  @Post('generate')
  async generateText(
    @Body() body: GenerateTextDto,
    @Request() req: any
  ): Promise<{ result: string; usage?: any }> {
    try {
      const user = req.user;
      
      // 创建带认证的配置
      const config: AuthenticatedRunnableConfig = {
        user,
        metadata: {
          endpoint: 'generate-text',
          timestamp: Date.now()
        }
      };
      
      // 创建带认证的 LLM
      const baseLLM = new ChatOpenAI({ modelName: 'gpt-3.5-turbo' });
      const userLLM = new UserContextLLM(baseLLM);
      const authenticatedLLM = new AuthenticatedRunnable(userLLM, this.middleware);
      
      // 生成文本
      const result = await authenticatedLLM.invoke(body.prompt, config);
      
      // 获取使用情况
      const usage = this.rateLimiter.getUsage(config);
      
      return { result, usage };
    } catch (error) {
      throw new Error(`处理失败: ${error.message}`);
    }
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何在系统中集成认证和速率限制:

typescript
// 完整的安全问答系统
class SecureQASystem {
  private authChecker: AuthChecker;
  private rateLimiter: RateLimiter;
  private middleware: AuthRateLimitMiddleware;
  private vectorStore: MemoryVectorStore;
  
  constructor() {
    this.authChecker = new AuthChecker();
    this.rateLimiter = new RateLimiter({
      requestsPerMinute: 5,
      tokensPerDay: 10000
    });
    this.middleware = new AuthRateLimitMiddleware(this.authChecker, this.rateLimiter);
    this.vectorStore = new MemoryVectorStore(new OpenAIEmbeddings());
    
    // 初始化文档
    this.initializeDocuments();
  }
  
  private async initializeDocuments(): Promise<void> {
    const documents: Document[] = [
      {
        pageContent: "LangChain 是一个用于开发由语言模型驱动的应用程序的框架。它提供了一套工具和组件,帮助开发者将语言模型与其他数据源、计算逻辑和外部系统集成。",
        metadata: { category: "framework", accessLevel: "public" }
      },
      {
        pageContent: "LCEL (LangChain Expression Language) 是 LangChain V3 的重要特性。它是一种声明式语言,允许开发者使用管道操作符(|)将不同的组件连接在一起。",
        metadata: { category: "language", accessLevel: "public" }
      },
      {
        pageContent: "企业内部文档:这是敏感的商业信息,只允许授权员工访问。包含公司的战略规划和财务数据。",
        metadata: { category: "internal", accessLevel: "confidential" }
      }
    ];
    
    await this.vectorStore.addDocuments(documents);
  }
  
  async answerQuestion(
    question: string,
    config: AuthenticatedRunnableConfig
  ): Promise<{
    answer: string;
    usage?: {
      requestsUsed: number;
      tokensUsed: number;
      requestsLimit: number;
      tokensLimit: number;
    };
    documentsAccessed?: number;
  }> {
    try {
      // 执行认证和速率限制检查
      const checkResult = await this.middleware.process(question, config);
      
      if (!checkResult.allowed) {
        throw new Error(
          `访问被拒绝: ${checkResult.error}` +
          (checkResult.resetTime 
            ? `。限制将在 ${new Date(checkResult.resetTime).toLocaleTimeString()} 重置`
            : '')
        );
      }
      
      // 检查文档访问权限
      const accessibleDocs = await this.getAccessibleDocuments(config);
      
      // 检索相关文档
      const retriever = new VectorStoreRetriever({
        vectorStore: this.vectorStore,
        k: 3,
        filter: (doc) => accessibleDocs.some(d => d.pageContent === doc.pageContent)
      });
      
      const documents = await retriever.invoke(question);
      
      // 构建提示
      const context = documents.map(d => d.pageContent).join('\n\n');
      
      const prompt = new PromptTemplate({
        template: `基于以下文档回答问题。如果文档中没有相关信息,请说明无法回答。
        
文档:
{context}

问题: {question}

答案:`,
        inputVariables: ["context", "question"]
      });
      
      // 创建带用户上下文的 LLM
      const baseLLM = new ChatOpenAI({ modelName: 'gpt-3.5-turbo' });
      const userLLM = new UserContextLLM(baseLLM);
      
      // 添加用户信息到提示中
      const enhancedPrompt = `用户信息:
用户ID: ${config.user?.userId || 'anonymous'}
用户名: ${config.user?.username || 'anonymous'}
角色: ${config.user?.roles.join(', ') || 'guest'}

${await prompt.format({ context, question })}`;
      
      const answer = await userLLM.invoke([{ role: 'user', content: enhancedPrompt }]);
      
      // 获取使用情况
      const usage = this.rateLimiter.getUsage(config);
      
      return {
        answer,
        usage,
        documentsAccessed: documents.length
      };
    } catch (error) {
      throw new Error(`问答处理失败: ${error.message}`);
    }
  }
  
  private async getAccessibleDocuments(
    config: AuthenticatedRunnableConfig
  ): Promise<Document[]> {
    // 根据用户权限过滤可访问的文档
    const allDocs = await this.vectorStore.similaritySearch('', 100); // 获取所有文档
    
    if (!config.user) {
      // 匿名用户只能访问公开文档
      return allDocs.filter(doc => doc.metadata.accessLevel === 'public');
    }
    
    if (config.user.roles.includes('admin')) {
      // 管理员可以访问所有文档
      return allDocs;
    }
    
    if (config.user.roles.includes('employee')) {
      // 员工可以访问公开和内部文档
      return allDocs.filter(doc => 
        doc.metadata.accessLevel === 'public' || 
        doc.metadata.accessLevel === 'confidential'
      );
    }
    
    // 普通用户只能访问公开文档
    return allDocs.filter(doc => doc.metadata.accessLevel === 'public');
  }
  
  // 获取用户配额信息
  async getUserQuota(config: AuthenticatedRunnableConfig): Promise<{
    requestsUsed: number;
    tokensUsed: number;
    requestsLimit: number;
    tokensLimit: number;
    resetTime: string;
  }> {
    const usage = this.rateLimiter.getUsage(config);
    const record = this.rateLimiter['limits'].get(
      this.rateLimiter['generateKey'](config)
    ) || { windowStart: Date.now() };
    
    return {
      ...usage,
      resetTime: new Date(record.windowStart + 60000).toLocaleTimeString()
    };
  }
}

// 使用示例
async function demonstrateSecureQA() {
  console.log('=== 安全问答系统演示 ===\n');
  
  const qaSystem = new SecureQASystem();
  
  // 普通用户配置
  const userConfig: AuthenticatedRunnableConfig = {
    user: {
      userId: 'user123',
      username: 'testuser',
      roles: ['user'],
      permissions: ['read', 'write', 'use_llm']
    }
  };
  
  // 管理员用户配置
  const adminConfig: AuthenticatedRunnableConfig = {
    user: {
      userId: 'admin123',
      username: 'admin',
      roles: ['user', 'admin'],
      permissions: ['read', 'write', 'delete', 'admin', 'use_llm']
    }
  };
  
  try {
    // 普通用户提问
    console.log('普通用户提问:');
    const result1 = await qaSystem.answerQuestion(
      "什么是 LangChain?",
      userConfig
    );
    
    console.log(`答案: ${result1.answer}`);
    console.log(`访问文档数: ${result1.documentsAccessed}`);
    console.log(`使用情况:`, result1.usage);
    console.log();
    
    // 管理员用户提问(可以访问敏感信息)
    console.log('管理员用户提问:');
    const result2 = await qaSystem.answerQuestion(
      "企业内部有哪些敏感信息?",
      adminConfig
    );
    
    console.log(`答案: ${result2.answer}`);
    console.log(`访问文档数: ${result2.documentsAccessed}`);
    console.log();
    
    // 检查配额
    console.log('用户配额信息:');
    const quota = await qaSystem.getUserQuota(userConfig);
    console.log(quota);
    
  } catch (error) {
    console.error('错误:', error.message);
  }
}

// API 调用示例
/*
// 客户端调用示例
async function callSecureAPI() {
  const response = await fetch('/api/secure-langchain/generate', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': 'Bearer your-jwt-token',
      // 或者使用 API Key
      // 'X-API-Key': 'your-api-key'
    },
    body: JSON.stringify({
      prompt: '解释一下 LangChain 的核心概念'
    })
  });
  
  const data = await response.json();
  console.log('API 响应:', data);
}
*/

总结

通过在 RunnableConfig 中传递用户信息,LangChain V3 提供了强大的身份验证和速率限制能力:

  1. 用户信息传递 - 通过 RunnableConfig 传递用户身份和权限信息
  2. 权限检查 - 实现基于角色和权限的访问控制
  3. 速率限制 - 基于用户和 API Key 实施请求和令牌限制
  4. 中间件模式 - 使用中间件统一处理认证和速率限制
  5. NestJS 集成 - 在 NestJS 中实现守卫和中间件模式
  6. 文档级控制 - 实现细粒度的文档访问控制
  7. 使用情况监控 - 提供配额和使用情况查询功能

这些机制确保了 LangChain 应用的安全性和稳定性,为构建生产级 LLM 应用提供了坚实的基础。

在下一章中,我们将探讨持久化会话:RunnableWithMessageHistory 如何管理 chat_history,了解如何在 LangChain 中实现对话历史管理。