Skip to content

微服务通信:ClientProxy 如何封装 TCP/RMQ/Kafka?

在现代分布式系统中,微服务架构已成为主流。NestJS 提供了强大的微服务支持,通过 ClientProxy 抽象层封装了多种传输协议,包括 TCP、Redis、RabbitMQ、Kafka 等。本文将深入探讨 ClientProxy 的工作机制,以及 emit()send() 方法在事件驱动与请求响应模式中的区别。

1. 微服务基础概念

1.1 什么是微服务通信?

微服务通信是指在分布式系统中,不同服务之间进行数据交换和协调的机制:

typescript
// 传统的单体应用
// 所有功能都在一个应用中
// 用户服务 <-> 订单服务 <-> 支付服务 (都在同一进程中)

// 微服务架构
// 用户服务 (独立进程)
// 订单服务 (独立进程)
// 支付服务 (独立进程)
// 服务间通过网络进行通信

1.2 NestJS 微服务支持

NestJS 提供了统一的微服务抽象层:

typescript
// 微服务配置示例
@Module({
  imports: [
    // TCP 微服务
    ClientsModule.register([
      {
        name: 'USER_SERVICE',
        transport: Transport.TCP,
        options: {
          host: 'localhost',
          port: 3001,
        },
      },
    ]),
    
    // RabbitMQ 微服务
    ClientsModule.register([
      {
        name: 'ORDER_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'orders_queue',
          queueOptions: {
            durable: false,
          },
        },
      },
    ]),
  ],
})
export class AppModule {}

2. ClientProxy 核心概念

2.1 ClientProxy 简介

ClientProxy 是 NestJS 中用于微服务通信的抽象客户端:

typescript
import { ClientProxy } from '@nestjs/microservices';

@Controller('users')
export class UserController {
  constructor(
    @Inject('USER_SERVICE') private readonly userService: ClientProxy,
    @Inject('ORDER_SERVICE') private readonly orderService: ClientProxy,
  ) {}
  
  @Get(':id')
  async getUser(@Param('id') id: string) {
    // 使用 send() 发送请求并等待响应
    const user = await this.userService.send({ cmd: 'get_user' }, { id }).toPromise();
    
    // 使用 emit() 发送事件(无需等待响应)
    this.orderService.emit('user_viewed', { userId: id });
    
    return user;
  }
}

2.2 send() vs emit() 的区别

typescript
// send() - 请求-响应模式
// 等待服务响应,返回 Observable
const response = client.send(pattern, data);

// emit() - 事件驱动模式
// 发送事件不等待响应,返回 Observable
const ack = client.emit(pattern, data);

3. TCP 传输协议

3.1 TCP 客户端配置

typescript
// TCP 客户端配置
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'TCP_SERVICE',
        transport: Transport.TCP,
        options: {
          host: 'localhost',
          port: 3001,
          retryAttempts: 5,
          retryDelay: 3000,
        },
      },
    ]),
  ],
})
export class AppModule {}

// 使用 TCP 客户端
@Controller()
export class AppController {
  constructor(
    @Inject('TCP_SERVICE') private readonly client: ClientProxy,
  ) {}
  
  @Get('data')
  async getData() {
    return this.client.send('get_data', {}).toPromise();
  }
}

3.2 TCP 服务端实现

typescript
// TCP 服务端
@Controller()
export class TcpController {
  @MessagePattern('get_data')
  getData() {
    return { message: 'Hello from TCP service', timestamp: new Date() };
  }
  
  @EventPattern('user_created')
  async handleUserCreated(data: any) {
    console.log('User created:', data);
    // 处理用户创建事件
  }
}

4. RabbitMQ 传输协议

4.1 RabbitMQ 客户端配置

typescript
// RabbitMQ 客户端配置
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RMQ_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'messages_queue',
          queueOptions: {
            durable: true, // 队列持久化
          },
          prefetchCount: 1, // 预取消息数量
        },
      },
    ]),
  ],
})
export class AppModule {}

4.2 RabbitMQ 服务端实现

typescript
// RabbitMQ 服务端
@Controller()
export class RmqController {
  @MessagePattern('process_order')
  async processOrder(data: any) {
    console.log('Processing order:', data);
    
    // 模拟处理时间
    await new Promise(resolve => setTimeout(resolve, 1000));
    
    return {
      orderId: data.orderId,
      status: 'processed',
      processedAt: new Date(),
    };
  }
  
  @EventPattern('order_placed')
  async handleOrderPlaced(data: any) {
    console.log('Order placed event received:', data);
    // 处理订单放置事件
  }
}

5. Kafka 传输协议

5.1 Kafka 客户端配置

typescript
// Kafka 客户端配置
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'my-app',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'my-app-consumer-group',
          },
        },
      },
    ]),
  ],
})
export class AppModule {}

5.2 Kafka 服务端实现

typescript
// Kafka 服务端
@Controller()
export class KafkaController {
  @MessagePattern('user.registered')
  async handleUserRegistered(data: any) {
    console.log('User registered:', data);
    
    // 处理用户注册事件
    return { processed: true };
  }
  
  @EventPattern('order.created')
  async handleOrderCreated(data: any) {
    console.log('Order created event:', data);
    // 处理订单创建事件
  }
}

6. ClientProxy 内部实现

6.1 ClientProxy 抽象类

typescript
// ClientProxy 抽象实现
export abstract class ClientProxy implements Closeable {
  protected readonly logger = new Logger(ClientProxy.name);
  
  // 发送请求并等待响应
  abstract send<T>(pattern: any, data: any): Observable<T>;
  
  // 发送事件不等待响应
  abstract emit<TResult = any, TInput = any>(
    pattern: any,
    data: TInput,
  ): Observable<TResult>;
  
  // 连接到微服务
  abstract connect(): Promise<any>;
  
  // 关闭连接
  abstract close(): any;
  
  // 处理连接错误
  protected handleError(error: any): Observable<never> {
    this.logger.error(error);
    return throwError(error);
  }
}

6.2 TCP 客户端实现

typescript
// TCP 客户端实现示例
export class TcpClient extends ClientProxy {
  private readonly logger = new Logger(TcpClient.name);
  private socket: net.Socket;
  
  constructor(private readonly options: TcpClientOptions) {
    super();
  }
  
  send<T>(pattern: any, data: any): Observable<T> {
    return new Observable(observer => {
      // 建立连接
      this.connect()
        .then(() => {
          // 发送消息
          const id = randomString();
          const message = { id, pattern, data };
          
          this.socket.write(JSON.stringify(message));
          
          // 监听响应
          const handler = (response: any) => {
            if (response.id === id) {
              observer.next(response.data);
              observer.complete();
            }
          };
          
          this.socket.on('data', handler);
          
          // 清理资源
          return () => {
            this.socket.removeListener('data', handler);
          };
        })
        .catch(error => observer.error(error));
    });
  }
  
  emit(pattern: any, data: any): Observable<any> {
    return new Observable(observer => {
      this.connect()
        .then(() => {
          const message = { pattern, data, isEvent: true };
          this.socket.write(JSON.stringify(message));
          observer.next(true);
          observer.complete();
        })
        .catch(error => observer.error(error));
    });
  }
  
  async connect(): Promise<any> {
    if (this.socket && !this.socket.destroyed) {
      return Promise.resolve();
    }
    
    return new Promise((resolve, reject) => {
      this.socket = net.createConnection(this.options);
      
      this.socket.on('connect', () => {
        this.logger.log('Connected to TCP server');
        resolve();
      });
      
      this.socket.on('error', reject);
    });
  }
  
  close() {
    if (this.socket) {
      this.socket.destroy();
    }
  }
}

7. 高级用法和模式

7.1 负载均衡和故障转移

typescript
// 负载均衡配置
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'LOAD_BALANCED_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: [
            'amqp://node1:5672',
            'amqp://node2:5672',
            'amqp://node3:5672',
          ],
          queue: 'service_queue',
          queueOptions: {
            durable: true,
          },
        },
      },
    ]),
  ],
})
export class AppModule {}

7.2 消息确认和重试机制

typescript
// 带重试机制的客户端使用
@Controller()
export class AppController {
  constructor(
    @Inject('RMQ_SERVICE') private readonly client: ClientProxy,
  ) {}
  
  @Post('order')
  async createOrder(@Body() orderData: any) {
    try {
      // 发送订单处理请求,带超时和重试
      const result = await this.client
        .send('process_order', orderData)
        .pipe(
          timeout(5000), // 5秒超时
          retry(3),      // 重试3次
        )
        .toPromise();
      
      return result;
    } catch (error) {
      throw new HttpException('Order processing failed', 500);
    }
  }
}

7.3 批量消息处理

typescript
// 批量消息处理
@Injectable()
export class BatchMessageService {
  constructor(
    @Inject('KAFKA_SERVICE') private readonly client: ClientProxy,
  ) {}
  
  async sendBatchMessages(messages: any[]) {
    // 并行发送多个消息
    const observables = messages.map(message => 
      this.client.emit('batch_event', message)
    );
    
    // 等待所有消息发送完成
    return forkJoin(observables).toPromise();
  }
}

8. 总结

NestJS 的 ClientProxy 通过统一的抽象层封装了多种微服务传输协议:

  1. TCP:适用于高性能、低延迟的场景
  2. RabbitMQ:适用于需要消息队列和复杂路由的场景
  3. Kafka:适用于高吞吐量、流处理的场景

两种通信模式的区别:

  1. send():请求-响应模式,等待服务响应
  2. emit():事件驱动模式,发送事件不等待响应

通过合理选择传输协议和通信模式,我们可以构建出:

  1. 高可用性:通过负载均衡和故障转移
  2. 高可靠性:通过消息确认和重试机制
  3. 高性能:通过批量处理和异步通信
  4. 可扩展性:通过微服务架构和消息队列

在下一篇文章中,我们将探讨 WebSocket 网关:@WebSocketGateway() 如何集成 Socket.IO,了解连接管理、会话状态、广播机制的实现。