Skip to content

WebSocket 网关:@WebSocketGateway() 如何集成 Socket.IO?

在现代 Web 应用中,实时通信变得越来越重要。NestJS 通过 @WebSocketGateway() 装饰器提供了强大的 WebSocket 支持,并默认集成了 Socket.IO 库。本文将深入探讨 WebSocket 网关的工作机制,包括连接管理、会话状态和广播机制的实现。

1. WebSocket 基础概念

1.1 什么是 WebSocket?

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务器主动向客户端推送数据:

typescript
// 传统的 HTTP 请求-响应模式
// Client → Request → Server
// Client ← Response ← Server

// WebSocket 全双工通信
// Client ↔ Data ↔ Server (双向实时通信)

1.2 Socket.IO 简介

Socket.IO 是一个基于 WebSocket 的实时应用框架,提供了更多高级特性:

typescript
// Socket.IO 的主要特性
// 1. 自动重连
// 2. 心跳检测
// 3. 房间和广播
// 4. 跨浏览器兼容
// 5. 多路复用

2. 基本 WebSocket 网关

2.1 创建 WebSocket 网关

typescript
// gateways/chat.gateway.ts
import { 
  WebSocketGateway, 
  WebSocketServer, 
  SubscribeMessage, 
  OnGatewayConnection, 
  OnGatewayDisconnect,
  OnGatewayInit
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';

@WebSocketGateway({
  cors: {
    origin: '*',
  },
})
export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private logger: Logger = new Logger('ChatGateway');

  afterInit(server: Server) {
    this.logger.log('WebSocket Gateway initialized');
  }

  handleConnection(client: Socket, ...args: any[]) {
    this.logger.log(`Client connected: ${client.id}`);
  }

  handleDisconnect(client: Socket) {
    this.logger.log(`Client disconnected: ${client.id}`);
  }

  @SubscribeMessage('message')
  handleMessage(client: Socket, payload: any): void {
    this.server.emit('message', {
      ...payload,
      clientId: client.id,
      timestamp: new Date(),
    });
  }
}

2.2 网关配置选项

typescript
// 详细的网关配置
@WebSocketGateway({
  namespace: '/chat',           // 命名空间
  cors: {                       // CORS 配置
    origin: ['http://localhost:3000', 'https://myapp.com'],
    credentials: true,
  },
  transports: ['websocket', 'polling'], // 传输方式
  allowEIO3: true,              // 兼容 Socket.IO v3
  pingInterval: 10000,          // 心跳间隔
  pingTimeout: 5000,            // 心跳超时
})
export class ChatGateway {
  // 网关实现
}

3. 连接管理

3.1 客户端连接处理

typescript
// 连接管理实现
@WebSocketGateway()
export class ConnectionGateway implements OnGatewayConnection, OnGatewayDisconnect {
  private readonly logger = new Logger(ConnectionGateway.name);
  private connectedClients = new Map<string, Socket>();

  handleConnection(client: Socket, ...args: any[]) {
    // 存储客户端连接
    this.connectedClients.set(client.id, client);
    
    // 获取连接信息
    const handshake = client.handshake;
    this.logger.log(`Client connected: ${client.id}`);
    this.logger.log(`Query params: ${JSON.stringify(handshake.query)}`);
    this.logger.log(`Headers: ${JSON.stringify(handshake.headers)}`);
    
    // 发送欢迎消息
    client.emit('welcome', {
      message: 'Welcome to the WebSocket server!',
      clientId: client.id,
      timestamp: new Date(),
    });
  }

  handleDisconnect(client: Socket) {
    // 清理客户端连接
    this.connectedClients.delete(client.id);
    this.logger.log(`Client disconnected: ${client.id}`);
    
    // 通知其他客户端
    this.server.emit('user_disconnected', {
      clientId: client.id,
      timestamp: new Date(),
    });
  }

  @WebSocketServer()
  server: Server;

  // 获取在线客户端数量
  getConnectedClientsCount(): number {
    return this.connectedClients.size;
  }

  // 向特定客户端发送消息
  sendToClient(clientId: string, event: string, data: any) {
    const client = this.connectedClients.get(clientId);
    if (client) {
      client.emit(event, data);
    }
  }
}

3.2 身份验证和授权

typescript
// 身份验证网关
@WebSocketGateway()
export class AuthGateway implements OnGatewayConnection {
  private readonly logger = new Logger(AuthGateway.name);

  handleConnection(client: Socket, ...args: any[]) {
    try {
      // 从查询参数中获取令牌
      const token = client.handshake.query.token as string;
      if (!token) {
        this.logger.warn(`Unauthorized connection attempt from ${client.id}`);
        client.disconnect();
        return;
      }

      // 验证令牌
      const user = this.validateToken(token);
      if (!user) {
        client.disconnect();
        return;
      }

      // 将用户信息附加到客户端
      (client as any).user = user;
      
      this.logger.log(`User ${user.id} connected with client ID: ${client.id}`);
    } catch (error) {
      this.logger.error('Error during connection authentication', error);
      client.disconnect();
    }
  }

  private validateToken(token: string): User | null {
    // 实现令牌验证逻辑
    try {
      // 这里可以使用 JWT 验证或其他认证机制
      return jwt.verify(token, process.env.JWT_SECRET) as User;
    } catch (error) {
      return null;
    }
  }
}

4. 会话状态管理

4.1 会话数据存储

typescript
// 会话状态管理
interface ClientSession {
  userId: string;
  username: string;
  joinTime: Date;
  rooms: Set<string>;
  lastActivity: Date;
}

@WebSocketGateway()
export class SessionGateway {
  private readonly logger = new Logger(SessionGateway.name);
  private readonly sessions = new Map<string, ClientSession>();

  handleConnection(client: Socket, ...args: any[]) {
    // 初始化会话
    const session: ClientSession = {
      userId: (client as any).user?.id || 'anonymous',
      username: (client as any).user?.username || 'Guest',
      joinTime: new Date(),
      rooms: new Set<string>(),
      lastActivity: new Date(),
    };

    this.sessions.set(client.id, session);
    this.logger.log(`Session created for client ${client.id}`);
  }

  handleDisconnect(client: Socket) {
    // 清理会话
    this.sessions.delete(client.id);
    this.logger.log(`Session removed for client ${client.id}`);
  }

  @SubscribeMessage('join_room')
  handleJoinRoom(client: Socket, room: string): void {
    const session = this.sessions.get(client.id);
    if (session) {
      client.join(room);
      session.rooms.add(room);
      session.lastActivity = new Date();
      
      client.emit('room_joined', {
        room,
        timestamp: new Date(),
      });
    }
  }

  @SubscribeMessage('leave_room')
  handleLeaveRoom(client: Socket, room: string): void {
    const session = this.sessions.get(client.id);
    if (session) {
      client.leave(room);
      session.rooms.delete(room);
      session.lastActivity = new Date();
      
      client.emit('room_left', {
        room,
        timestamp: new Date(),
      });
    }
  }

  // 获取会话信息
  getSessionInfo(clientId: string): ClientSession | undefined {
    return this.sessions.get(clientId);
  }

  // 获取所有会话
  getAllSessions(): Map<string, ClientSession> {
    return new Map(this.sessions);
  }
}

4.2 用户状态同步

typescript
// 用户状态同步
@WebSocketGateway()
export class UserStatusGateway {
  private readonly logger = new Logger(UserStatusGateway.name);
  private readonly userConnections = new Map<string, Set<string>>(); // userId -> Set<clientIds>

  handleConnection(client: Socket, ...args: any[]) {
    const userId = (client as any).user?.id;
    if (userId) {
      // 记录用户连接
      if (!this.userConnections.has(userId)) {
        this.userConnections.set(userId, new Set());
      }
      this.userConnections.get(userId).add(client.id);
      
      // 通知其他用户该用户上线
      this.broadcastUserStatus(userId, 'online');
    }
  }

  handleDisconnect(client: Socket) {
    const userId = (client as any).user?.id;
    if (userId) {
      const userClients = this.userConnections.get(userId);
      if (userClients) {
        userClients.delete(client.id);
        
        // 如果用户没有其他连接,通知下线
        if (userClients.size === 0) {
          this.userConnections.delete(userId);
          this.broadcastUserStatus(userId, 'offline');
        }
      }
    }
  }

  private broadcastUserStatus(userId: string, status: 'online' | 'offline') {
    this.server.emit('user_status_changed', {
      userId,
      status,
      timestamp: new Date(),
    });
  }

  // 获取用户在线状态
  getUserStatus(userId: string): 'online' | 'offline' {
    return this.userConnections.has(userId) ? 'online' : 'offline';
  }

  // 获取在线用户列表
  getOnlineUsers(): string[] {
    return Array.from(this.userConnections.keys());
  }
}

5. 广播机制

5.1 基本广播功能

typescript
// 广播机制实现
@WebSocketGateway()
export class BroadcastGateway {
  @WebSocketServer()
  server: Server;

  @SubscribeMessage('broadcast_message')
  handleBroadcastMessage(client: Socket, payload: any): void {
    // 向所有客户端广播消息
    this.server.emit('broadcast', {
      ...payload,
      senderId: client.id,
      timestamp: new Date(),
    });
  }

  @SubscribeMessage('room_message')
  handleRoomMessage(client: Socket, payload: { room: string; message: string }): void {
    // 向特定房间广播消息
    this.server.to(payload.room).emit('room_message', {
      room: payload.room,
      message: payload.message,
      senderId: client.id,
      timestamp: new Date(),
    });
  }

  @SubscribeMessage('private_message')
  handlePrivateMessage(client: Socket, payload: { targetId: string; message: string }): void {
    // 向特定客户端发送私信
    client.to(payload.targetId).emit('private_message', {
      message: payload.message,
      senderId: client.id,
      timestamp: new Date(),
    });
  }
}

5.2 房间管理

typescript
// 房间管理实现
@WebSocketGateway()
export class RoomGateway {
  @WebSocketServer()
  server: Server;

  private readonly logger = new Logger(RoomGateway.name);
  private readonly rooms = new Map<string, Set<string>>(); // roomName -> Set<clientIds>

  @SubscribeMessage('join_room')
  handleJoinRoom(client: Socket, room: string): void {
    client.join(room);
    
    // 更新房间成员列表
    if (!this.rooms.has(room)) {
      this.rooms.set(room, new Set());
    }
    this.rooms.get(room).add(client.id);
    
    // 通知房间内其他成员
    client.to(room).emit('user_joined', {
      userId: (client as any).user?.id,
      clientId: client.id,
      room,
      timestamp: new Date(),
    });
    
    // 向客户端确认加入成功
    client.emit('room_joined', {
      room,
      members: Array.from(this.rooms.get(room)),
      timestamp: new Date(),
    });
    
    this.logger.log(`Client ${client.id} joined room ${room}`);
  }

  @SubscribeMessage('leave_room')
  handleLeaveRoom(client: Socket, room: string): void {
    client.leave(room);
    
    // 更新房间成员列表
    const roomMembers = this.rooms.get(room);
    if (roomMembers) {
      roomMembers.delete(client.id);
      
      // 如果房间为空,删除房间
      if (roomMembers.size === 0) {
        this.rooms.delete(room);
      }
    }
    
    // 通知房间内其他成员
    client.to(room).emit('user_left', {
      userId: (client as any).user?.id,
      clientId: client.id,
      room,
      timestamp: new Date(),
    });
    
    client.emit('room_left', {
      room,
      timestamp: new Date(),
    });
    
    this.logger.log(`Client ${client.id} left room ${room}`);
  }

  // 获取房间信息
  getRoomInfo(room: string): { 
    name: string; 
    memberCount: number; 
    members: string[] 
  } | null {
    const members = this.rooms.get(room);
    if (!members) {
      return null;
    }
    
    return {
      name: room,
      memberCount: members.size,
      members: Array.from(members),
    };
  }

  // 获取所有房间
  getAllRooms(): { name: string; memberCount: number }[] {
    return Array.from(this.rooms.entries()).map(([name, members]) => ({
      name,
      memberCount: members.size,
    }));
  }
}

6. 错误处理和监控

6.1 错误处理

typescript
// 错误处理实现
@WebSocketGateway()
export class ErrorHandlingGateway {
  private readonly logger = new Logger(ErrorHandlingGateway.name);

  @SubscribeMessage('risky_operation')
  async handleRiskyOperation(client: Socket, payload: any) {
    try {
      // 执行可能出错的操作
      const result = await this.performRiskyOperation(payload);
      
      client.emit('operation_success', {
        result,
        timestamp: new Date(),
      });
    } catch (error) {
      this.logger.error('Error in risky operation', error);
      
      client.emit('operation_error', {
        message: error.message,
        code: error.code,
        timestamp: new Date(),
      });
    }
  }

  private async performRiskyOperation(payload: any): Promise<any> {
    // 模拟可能出错的操作
    if (Math.random() < 0.3) {
      throw new Error('Random operation failure');
    }
    
    return { success: true, data: payload };
  }

  // 全局错误处理
  @UseFilters(new WsExceptionFilter())
  @SubscribeMessage('another_operation')
  handleAnotherOperation(client: Socket, payload: any) {
    // 处理逻辑
  }
}

6.2 性能监控

typescript
// 性能监控实现
@WebSocketGateway()
export class MonitoringGateway {
  private readonly logger = new Logger(MonitoringGateway.name);
  private readonly messageStats = new Map<string, { count: number; totalTime: number }>();

  @SubscribeMessage('tracked_message')
  handleTrackedMessage(client: Socket, payload: { type: string; data: any }) {
    const startTime = Date.now();
    
    // 处理消息
    this.processMessage(payload);
    
    const endTime = Date.now();
    const duration = endTime - startTime;
    
    // 更新统计信息
    if (!this.messageStats.has(payload.type)) {
      this.messageStats.set(payload.type, { count: 0, totalTime: 0 });
    }
    
    const stats = this.messageStats.get(payload.type);
    stats.count++;
    stats.totalTime += duration;
    
    // 记录性能日志
    if (duration > 1000) { // 超过1秒的慢操作
      this.logger.warn(`Slow message processing: ${payload.type} took ${duration}ms`);
    }
  }

  private processMessage(payload: any) {
    // 消息处理逻辑
  }

  // 获取消息统计信息
  getMessageStats(): any {
    const stats = {};
    for (const [type, data] of this.messageStats.entries()) {
      stats[type] = {
        count: data.count,
        averageTime: data.totalTime / data.count,
        totalTime: data.totalTime,
      };
    }
    return stats;
  }
}

7. 总结

NestJS 的 @WebSocketGateway() 通过集成 Socket.IO 提供了完整的 WebSocket 解决方案:

  1. 连接管理:处理客户端连接和断开连接
  2. 会话状态:维护客户端和用户状态信息
  3. 广播机制:支持全局、房间和私信广播
  4. 房间管理:实现灵活的房间分组功能
  5. 错误处理:提供完善的错误处理机制
  6. 性能监控:支持性能统计和监控

通过合理使用 WebSocket 网关,我们可以构建出:

  1. 实时聊天应用:支持消息广播和私信
  2. 在线游戏:实现实时游戏状态同步
  3. 协作工具:支持实时文档编辑和状态共享
  4. 通知系统:实现实时推送通知
  5. 监控面板:实时显示系统状态和指标

WebSocket 网关的核心优势:

  1. 全双工通信:支持服务器主动推送数据
  2. 低延迟:相比轮询方式延迟更低
  3. 高并发:支持大量并发连接
  4. 灵活广播:支持多种消息广播模式
  5. 状态管理:提供完整的连接和会话管理

至此,我们已经完成了 NestJS 专栏第四阶段的所有内容。在下一篇文章中,我们将进入第五阶段,探讨 Nest CLI 的代码生成原理:nest g service user 发生了什么?