Skip to content

微服务化:将不同 Runnable 部署为独立服务,通过 HTTP/gRPC 调用

在构建大型 LLM 应用时,微服务架构可以提供更好的可扩展性、可维护性和部署灵活性。LangChain V3 的 Runnable 接口为微服务化提供了天然的支持,使得不同的组件可以独立部署并通过 HTTP/gRPC 进行通信。本章将深入探讨如何将 LangChain 组件微服务化。

Runnable 微服务基础

Runnable 接口的标准化使得微服务化变得简单直接:

typescript
// 微服务化的 Runnable 基础接口
interface RunnableService<Input, Output> {
  invoke(input: Input): Promise<Output>;
  batch(inputs: Input[]): Promise<Output[]>;
  stream?(input: Input): AsyncGenerator<Output>;
}

// HTTP 服务客户端
class HttpRunnableClient<Input, Output> implements Runnable<Input, Output> {
  private baseUrl: string;
  private serviceName: string;
  
  constructor(baseUrl: string, serviceName: string) {
    this.baseUrl = baseUrl;
    this.serviceName = serviceName;
  }
  
  async invoke(input: Input): Promise<Output> {
    const response = await fetch(`${this.baseUrl}/${this.serviceName}/invoke`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ input }),
    });
    
    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }
    
    const result = await response.json();
    return result.output;
  }
  
  async batch(inputs: Input[]): Promise<Output[]> {
    const response = await fetch(`${this.baseUrl}/${this.serviceName}/batch`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ inputs }),
    });
    
    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }
    
    const result = await response.json();
    return result.outputs;
  }
  
  async *stream(input: Input): AsyncGenerator<Output> {
    const response = await fetch(`${this.baseUrl}/${this.serviceName}/stream`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ input }),
    });
    
    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }
    
    if (!response.body) {
      throw new Error('响应体为空');
    }
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    try {
      while (true) {
        const { done, value } = await reader.read();
        
        if (done) {
          break;
        }
        
        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');
        
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6);
            const result = JSON.parse(data);
            yield result.output;
          }
        }
      }
    } finally {
      reader.releaseLock();
    }
  }
}

gRPC 服务实现

使用 gRPC 实现更高效的微服务通信:

typescript
// gRPC 服务定义 (protobuf)
/*
syntax = "proto3";

package langchain;

service RunnableService {
  rpc Invoke(InvokeRequest) returns (InvokeResponse);
  rpc Batch(BatchRequest) returns (BatchResponse);
  rpc Stream(StreamRequest) returns (stream StreamResponse);
}

message InvokeRequest {
  bytes input = 1;
}

message InvokeResponse {
  bytes output = 1;
}

message BatchRequest {
  repeated bytes inputs = 1;
}

message BatchResponse {
  repeated bytes outputs = 1;
}

message StreamRequest {
  bytes input = 1;
}

message StreamResponse {
  bytes output = 1;
}
*/

// gRPC 服务端实现
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';

class GrpcRunnableService {
  private runnable: Runnable<any, any>;
  
  constructor(runnable: Runnable<any, any>) {
    this.runnable = runnable;
  }
  
  async invoke(call: any, callback: any): Promise<void> {
    try {
      const input = JSON.parse(call.request.input);
      const output = await this.runnable.invoke(input);
      callback(null, { output: JSON.stringify(output) });
    } catch (error) {
      callback(error, null);
    }
  }
  
  async batch(call: any, callback: any): Promise<void> {
    try {
      const inputs = call.request.inputs.map((input: string) => JSON.parse(input));
      const outputs = await this.runnable.batch(inputs);
      callback(null, { 
        outputs: outputs.map(output => JSON.stringify(output)) 
      });
    } catch (error) {
      callback(error, null);
    }
  }
  
  async *stream(call: any): AsyncGenerator<void> {
    try {
      const input = JSON.parse(call.request.input);
      const stream = await this.runnable.stream(input);
      
      for await (const output of stream) {
        call.write({ output: JSON.stringify(output) });
      }
      
      call.end();
    } catch (error) {
      call.emit('error', error);
    }
  }
}

// gRPC 客户端
class GrpcRunnableClient<Input, Output> implements Runnable<Input, Output> {
  private client: any; // gRPC 客户端
  
  constructor(address: string, serviceName: string) {
    // 初始化 gRPC 客户端
    const packageDefinition = protoLoader.loadSync('runnable.proto');
    const protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
    // this.client = new protoDescriptor.langchain.RunnableService(
    //   address,
    //   grpc.credentials.createInsecure()
    // );
  }
  
  async invoke(input: Input): Promise<Output> {
    // const result = await new Promise((resolve, reject) => {
    //   this.client.invoke({ input: JSON.stringify(input) }, (error: any, response: any) => {
    //     if (error) {
    //       reject(error);
    //     } else {
    //       resolve(JSON.parse(response.output));
    //     }
    //   });
    // });
    // return result as Output;
    throw new Error('gRPC 客户端实现需要根据具体 proto 文件完成');
  }
  
  async batch(inputs: Input[]): Promise<Output[]> {
    // const result = await new Promise((resolve, reject) => {
    //   this.client.batch({ 
    //     inputs: inputs.map(input => JSON.stringify(input)) 
    //   }, (error: any, response: any) => {
    //     if (error) {
    //       reject(error);
    //     } else {
    //       resolve(response.outputs.map((output: string) => JSON.parse(output)));
    //     }
    //   });
    // });
    // return result as Output[];
    throw new Error('gRPC 客户端实现需要根据具体 proto 文件完成');
  }
  
  async *stream(input: Input): AsyncGenerator<Output> {
    // gRPC 流式调用实现
    throw new Error('gRPC 流式调用实现需要根据具体 proto 文件完成');
  }
}

微服务化具体组件

将具体的 LangChain 组件微服务化:

typescript
// LLM 服务
class LLMService extends BaseChatModel {
  private modelName: string;
  private apiKey: string;
  
  constructor(modelName: string, apiKey: string) {
    super();
    this.modelName = modelName;
    this.apiKey = apiKey;
  }
  
  async _call(messages: BaseMessage[]): Promise<string> {
    // 调用实际的 LLM API
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        model: this.modelName,
        messages: messages,
        stream: false,
      }),
    });
    
    const data = await response.json();
    return data.choices[0].message.content;
  }
  
  async *_streamResponseChunks(
    messages: BaseMessage[]
  ): AsyncGenerator<ChatGeneration> {
    // 实现流式响应
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        model: this.modelName,
        messages: messages,
        stream: true,
      }),
    });
    
    if (!response.body) {
      throw new Error('响应体为空');
    }
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    try {
      while (true) {
        const { done, value } = await reader.read();
        
        if (done) {
          break;
        }
        
        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');
        
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6);
            if (data === '[DONE]') {
              return;
            }
            
            try {
              const parsed = JSON.parse(data);
              const content = parsed.choices[0]?.delta?.content;
              if (content) {
                yield {
                  text: content,
                  message: new AIMessage(content),
                };
              }
            } catch (error) {
              // 忽略解析错误
            }
          }
        }
      }
    } finally {
      reader.releaseLock();
    }
  }
}

// 提示模板服务
class PromptTemplateService extends PromptTemplate {
  // 提示模板服务可以包含额外的业务逻辑
  // 例如:模板版本管理、A/B 测试等
  
  async formatWithVersion(input: any, version?: string): Promise<string> {
    // 根据版本选择不同的模板
    if (version) {
      // 这里可以实现版本管理逻辑
      console.log(`使用模板版本: ${version}`);
    }
    
    return await this.format(input);
  }
}

// 检索服务
class RetrievalService {
  private vectorStore: VectorStore;
  
  constructor(vectorStore: VectorStore) {
    this.vectorStore = vectorStore;
  }
  
  async search(query: string, k: number = 4): Promise<Document[]> {
    return await this.vectorStore.similaritySearch(query, k);
  }
  
  async mmrSearch(
    query: string,
    k: number = 4,
    fetchK: number = 20
  ): Promise<Document[]> {
    return await this.vectorStore.maxMarginalRelevanceSearch(query, k, fetchK);
  }
}

服务注册与发现

实现服务注册与发现机制:

typescript
// 服务注册表
interface ServiceInfo {
  name: string;
  address: string;
  port: number;
  protocol: 'http' | 'grpc';
  version: string;
  tags: string[];
  healthCheckUrl: string;
}

class ServiceRegistry {
  private services: Map<string, ServiceInfo>;
  private healthChecks: Map<string, () => Promise<boolean>>;
  
  constructor() {
    this.services = new Map();
    this.healthChecks = new Map();
  }
  
  registerService(serviceInfo: ServiceInfo): void {
    const serviceId = `${serviceInfo.name}:${serviceInfo.version}`;
    this.services.set(serviceId, serviceInfo);
    
    // 注册健康检查
    this.healthChecks.set(serviceId, () => this.checkHealth(serviceInfo));
    
    console.log(`服务已注册: ${serviceId} at ${serviceInfo.address}:${serviceInfo.port}`);
  }
  
  getService(name: string, version?: string): ServiceInfo | undefined {
    if (version) {
      return this.services.get(`${name}:${version}`);
    }
    
    // 返回最新版本
    const services = Array.from(this.services.values())
      .filter(service => service.name === name)
      .sort((a, b) => b.version.localeCompare(a.version));
    
    return services[0];
  }
  
  getAllServices(): ServiceInfo[] {
    return Array.from(this.services.values());
  }
  
  async checkHealth(serviceInfo: ServiceInfo): Promise<boolean> {
    try {
      const response = await fetch(serviceInfo.healthCheckUrl, {
        timeout: 5000,
      });
      return response.ok;
    } catch (error) {
      console.warn(`健康检查失败 (${serviceInfo.name}):`, error);
      return false;
    }
  }
  
  async checkAllServices(): Promise<Record<string, boolean>> {
    const results: Record<string, boolean> = {};
    
    for (const [serviceId, serviceInfo] of this.services.entries()) {
      const isHealthy = await this.checkHealth(serviceInfo);
      results[serviceId] = isHealthy;
      
      if (!isHealthy) {
        console.warn(`服务不健康: ${serviceId}`);
      }
    }
    
    return results;
  }
}

// 服务发现客户端
class ServiceDiscoveryClient {
  private registry: ServiceRegistry;
  
  constructor(registry: ServiceRegistry) {
    this.registry = registry;
  }
  
  async getRunnableClient<Input, Output>(
    serviceName: string,
    version?: string
  ): Promise<Runnable<Input, Output>> {
    const serviceInfo = this.registry.getService(serviceName, version);
    
    if (!serviceInfo) {
      throw new Error(`未找到服务: ${serviceName}`);
    }
    
    if (serviceInfo.protocol === 'http') {
      return new HttpRunnableClient<Input, Output>(
        `http://${serviceInfo.address}:${serviceInfo.port}`,
        serviceName
      );
    } else {
      return new GrpcRunnableClient<Input, Output>(
        `${serviceInfo.address}:${serviceInfo.port}`,
        serviceName
      );
    }
  }
}

微服务编排

实现微服务的编排和组合:

typescript
// 微服务编排器
class MicroserviceOrchestrator {
  private serviceDiscovery: ServiceDiscoveryClient;
  private serviceRegistry: ServiceRegistry;
  
  constructor(
    serviceDiscovery: ServiceDiscoveryClient,
    serviceRegistry: ServiceRegistry
  ) {
    this.serviceDiscovery = serviceDiscovery;
    this.serviceRegistry = serviceRegistry;
  }
  
  async createRAGChain(sessionId?: string): Promise<Runnable<string, string>> {
    try {
      // 获取检索服务客户端
      const retrievalService = await this.serviceDiscovery.getRunnableClient<
        string,
        Document[]
      >('retrieval-service');
      
      // 获取提示模板服务客户端
      const promptService = await this.serviceDiscovery.getRunnableClient<
        { context: string; question: string },
        string
      >('prompt-template-service');
      
      // 获取 LLM 服务客户端
      const llmService = await this.serviceDiscovery.getRunnableClient<
        BaseMessage[],
        string
      >('llm-service');
      
      // 构建 RAG 链
      const ragChain = RunnableMap.from({
        question: (input: string) => input,
        context: async (input: string) => {
          const documents = await retrievalService.invoke(input);
          return documents.map(doc => doc.pageContent).join('\n\n');
        }
      }).pipe(
        async (input: { question: string; context: string }) => {
          return await promptService.invoke({
            context: input.context,
            question: input.question
          });
        }
      ).pipe(
        async (prompt: string) => {
          return await llmService.invoke([{ role: 'user', content: prompt }]);
        }
      );
      
      // 如果需要会话历史,包装为带历史的 Runnable
      if (sessionId) {
        const messageHistoryService = await this.getMessageHistoryService(sessionId);
        return new RunnableWithMessageHistory(
          ragChain,
          async () => messageHistoryService,
          { sessionId }
        );
      }
      
      return ragChain;
    } catch (error) {
      throw new Error(`创建 RAG 链失败: ${error.message}`);
    }
  }
  
  private async getMessageHistoryService(sessionId: string): Promise<BaseChatMessageHistory> {
    // 获取消息历史服务客户端
    const historyService = await this.serviceDiscovery.getRunnableClient<
      any,
      BaseMessage[]
    >('message-history-service');
    
    // 创建适配器
    return new class extends BaseChatMessageHistory {
      async getMessages(): Promise<BaseMessage[]> {
        return await historyService.invoke({ sessionId, action: 'getMessages' });
      }
      
      async addMessage(message: BaseMessage): Promise<void> {
        await historyService.invoke({ sessionId, action: 'addMessage', message });
      }
      
      async clear(): Promise<void> {
        await historyService.invoke({ sessionId, action: 'clear' });
      }
    };
  }
  
  async createChatChain(): Promise<Runnable<BaseMessage[], string>> {
    try {
      // 获取 LLM 服务客户端
      const llmService = await this.serviceDiscovery.getRunnableClient<
        BaseMessage[],
        string
      >('llm-service');
      
      return llmService;
    } catch (error) {
      throw new Error(`创建聊天链失败: ${error.message}`);
    }
  }
}

// 服务组合示例
class CompositeAIService {
  private orchestrator: MicroserviceOrchestrator;
  
  constructor(orchestrator: MicroserviceOrchestrator) {
    this.orchestrator = orchestrator;
  }
  
  async answerQuestion(
    question: string,
    options?: { useRAG?: boolean; sessionId?: string }
  ): Promise<string> {
    try {
      let chain: Runnable<string, string>;
      
      if (options?.useRAG) {
        chain = await this.orchestrator.createRAGChain(options.sessionId);
      } else {
        const chatChain = await this.orchestrator.createChatChain();
        chain = new RunnableLambda(async (input: string) => {
          return await chatChain.invoke([{ role: 'user', content: input }]);
        });
      }
      
      return await chain.invoke(question);
    } catch (error) {
      throw new Error(`回答问题失败: ${error.message}`);
    }
  }
  
  async *streamAnswer(
    question: string,
    options?: { useRAG?: boolean; sessionId?: string }
  ): AsyncGenerator<string> {
    try {
      let chain: Runnable<string, string>;
      
      if (options?.useRAG) {
        chain = await this.orchestrator.createRAGChain(options.sessionId);
      } else {
        const chatChain = await this.orchestrator.createChatChain();
        chain = new RunnableLambda(async (input: string) => {
          return await chatChain.invoke([{ role: 'user', content: input }]);
        });
      }
      
      const stream = await chain.stream(question);
      for await (const chunk of stream) {
        yield chunk;
      }
    } catch (error) {
      throw new Error(`流式回答失败: ${error.message}`);
    }
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何构建和使用微服务化的 LangChain 应用:

typescript
// 微服务化问答系统
class MicroserviceQASystem {
  private serviceRegistry: ServiceRegistry;
  private serviceDiscovery: ServiceDiscoveryClient;
  private orchestrator: MicroserviceOrchestrator;
  private compositeAI: CompositeAIService;
  
  constructor() {
    this.serviceRegistry = new ServiceRegistry();
    this.serviceDiscovery = new ServiceDiscoveryClient(this.serviceRegistry);
    this.orchestrator = new MicroserviceOrchestrator(
      this.serviceDiscovery,
      this.serviceRegistry
    );
    this.compositeAI = new CompositeAIService(this.orchestrator);
    
    // 注册服务
    this.registerServices();
  }
  
  private registerServices(): void {
    // 注册 LLM 服务
    this.serviceRegistry.registerService({
      name: 'llm-service',
      address: 'llm-service.internal',
      port: 8080,
      protocol: 'http',
      version: '1.0.0',
      tags: ['llm', 'openai'],
      healthCheckUrl: 'http://llm-service.internal:8080/health'
    });
    
    // 注册检索服务
    this.serviceRegistry.registerService({
      name: 'retrieval-service',
      address: 'retrieval-service.internal',
      port: 8081,
      protocol: 'http',
      version: '1.0.0',
      tags: ['retrieval', 'vector'],
      healthCheckUrl: 'http://retrieval-service.internal:8081/health'
    });
    
    // 注册提示模板服务
    this.serviceRegistry.registerService({
      name: 'prompt-template-service',
      address: 'prompt-service.internal',
      port: 8082,
      protocol: 'http',
      version: '1.0.0',
      tags: ['prompt', 'template'],
      healthCheckUrl: 'http://prompt-service.internal:8082/health'
    });
    
    // 注册消息历史服务
    this.serviceRegistry.registerService({
      name: 'message-history-service',
      address: 'history-service.internal',
      port: 8083,
      protocol: 'http',
      version: '1.0.0',
      tags: ['history', 'chat'],
      healthCheckUrl: 'http://history-service.internal:8083/health'
    });
  }
  
  async answerQuestion(
    question: string,
    options?: { 
      useRAG?: boolean; 
      sessionId?: string;
      stream?: boolean;
    }
  ): Promise<string | AsyncGenerator<string>> {
    try {
      if (options?.stream) {
        return this.compositeAI.streamAnswer(
          question,
          { 
            useRAG: options.useRAG, 
            sessionId: options.sessionId 
          }
        );
      } else {
        const answer = await this.compositeAI.answerQuestion(
          question,
          { 
            useRAG: options?.useRAG, 
            sessionId: options?.sessionId 
          }
        );
        return answer;
      }
    } catch (error) {
      throw new Error(`问答处理失败: ${error.message}`);
    }
  }
  
  async checkSystemHealth(): Promise<Record<string, boolean>> {
    return await this.serviceRegistry.checkAllServices();
  }
  
  getAvailableServices(): ServiceInfo[] {
    return this.serviceRegistry.getAllServices();
  }
}

// 使用示例
async function demonstrateMicroserviceQA() {
  console.log('=== 微服务化问答系统演示 ===\n');
  
  const qaSystem = new MicroserviceQASystem();
  
  // 检查系统健康状态
  console.log('系统健康状态:');
  const healthStatus = await qaSystem.checkSystemHealth();
  console.log(healthStatus);
  console.log();
  
  // 显示可用服务
  console.log('可用服务:');
  const services = qaSystem.getAvailableServices();
  services.forEach(service => {
    console.log(`- ${service.name} (${service.version}) at ${service.address}:${service.port}`);
  });
  console.log();
  
  try {
    // 普通问答
    console.log('普通问答:');
    const answer1 = await qaSystem.answerQuestion('你好,介绍一下 LangChain');
    console.log(`答案: ${answer1}`);
    console.log();
    
    // RAG 问答
    console.log('RAG 问答:');
    const answer2 = await qaSystem.answerQuestion(
      'LangChain 有什么主要特性?',
      { useRAG: true }
    );
    console.log(`答案: ${answer2}`);
    console.log();
    
    // 带会话历史的问答
    console.log('带会话历史的问答:');
    const sessionId = 'session_' + Date.now();
    const answer3 = await qaSystem.answerQuestion(
      '能详细说说 LCEL 吗?',
      { useRAG: true, sessionId }
    );
    console.log(`答案: ${answer3}`);
    console.log();
    
    // 流式问答
    console.log('流式问答:');
    const stream = await qaSystem.answerQuestion(
      '请逐步解释 LangChain 的工作原理',
      { stream: true }
    ) as AsyncGenerator<string>;
    
    console.log('逐步响应:');
    for await (const chunk of stream) {
      process.stdout.write(chunk);
    }
    console.log('\n');
    
  } catch (error) {
    console.error('错误:', error.message);
  }
}

// NestJS 微服务集成示例
/*
// llm-service.controller.ts
import { Controller, Post, Body } from '@nestjs/common';

@Controller('llm-service')
export class LLMServiceController {
  private readonly llmService: LLMService;
  
  constructor() {
    this.llmService = new LLMService(
      process.env.MODEL_NAME || 'gpt-3.5-turbo',
      process.env.OPENAI_API_KEY || ''
    );
  }
  
  @Post('invoke')
  async invoke(@Body() body: { input: BaseMessage[] }) {
    const output = await this.llmService.invoke(body.input);
    return { output };
  }
  
  @Post('batch')
  async batch(@Body() body: { inputs: BaseMessage[][] }) {
    const outputs = await this.llmService.batch(body.inputs);
    return { outputs };
  }
}

// main.ts (LLM 服务)
import { NestFactory } from '@nestjs/core';
import { LLMServiceModule } from './llm-service.module';

async function bootstrap() {
  const app = await NestFactory.create(LLMServiceModule);
  await app.listen(8080);
  console.log('LLM 服务运行在端口 8080');
}
bootstrap();
*/

总结

通过将不同的 Runnable 组件部署为独立的微服务,LangChain V3 应用可以获得以下优势:

  1. 独立部署 - 每个组件可以独立开发、测试和部署
  2. 可扩展性 - 可以根据需要独立扩展不同的服务
  3. 技术多样性 - 不同的服务可以使用最适合的技术栈
  4. 故障隔离 - 单个服务的故障不会影响整个系统
  5. 团队协作 - 不同的团队可以负责不同的服务
  6. 版本管理 - 可以独立管理不同服务的版本

微服务化使得 LangChain 应用更加灵活和可维护,但也带来了分布式系统的复杂性,需要仔细考虑服务间通信、数据一致性、监控和调试等问题。

在下一章中,我们将探讨 Agent 2.0:基于 LCEL 的自主代理,了解如何构建更智能的自主 AI 代理。