Skip to content

Node.js 网络编程与协议实现

Node.js 在网络编程方面表现出色,提供了丰富的 API 来处理各种网络协议。从底层的 TCP/UDP 到高层的 HTTP/HTTPS,Node.js 都有相应的模块支持。本文将深入探讨 Node.js 的网络编程能力和协议实现机制。

一、TCP 编程

1. TCP 服务器实现

javascript
// TCP 服务器示例
const net = require('net');
const { EventEmitter } = require('events');

class TCPServer extends EventEmitter {
  constructor(options = {}) {
    super();
    this.port = options.port || 8080;
    this.host = options.host || 'localhost';
    this.connections = new Map();
    this.server = null;
  }
  
  start() {
    this.server = net.createServer((socket) => {
      this.handleConnection(socket);
    });
    
    this.server.listen(this.port, this.host, () => {
      console.log(`TCP 服务器运行在 ${this.host}:${this.port}`);
      this.emit('listening', { port: this.port, host: this.host });
    });
    
    this.server.on('error', (err) => {
      this.emit('error', err);
    });
    
    return this.server;
  }
  
  handleConnection(socket) {
    const clientId = `${socket.remoteAddress}:${socket.remotePort}`;
    console.log(`客户端 ${clientId} 已连接`);
    
    // 存储连接信息
    this.connections.set(clientId, {
      socket,
      connectedAt: new Date(),
      bytesReceived: 0,
      bytesSent: 0
    });
    
    // 设置 socket 选项
    socket.setEncoding('utf8');
    socket.setTimeout(30000); // 30秒超时
    
    // 处理数据接收
    socket.on('data', (data) => {
      const connection = this.connections.get(clientId);
      connection.bytesReceived += data.length;
      
      console.log(`从 ${clientId} 接收到: ${data.trim()}`);
      this.emit('data', { clientId, data });
      
      // 回显数据
      this.sendToClient(clientId, `Echo: ${data}`);
    });
    
    // 处理连接关闭
    socket.on('end', () => {
      console.log(`客户端 ${clientId} 断开连接`);
      this.connections.delete(clientId);
      this.emit('disconnect', { clientId });
    });
    
    // 处理错误
    socket.on('error', (err) => {
      console.error(`客户端 ${clientId} 发生错误:`, err);
      this.connections.delete(clientId);
      this.emit('error', { clientId, error: err });
    });
    
    // 处理超时
    socket.on('timeout', () => {
      console.log(`客户端 ${clientId} 连接超时`);
      socket.end('连接超时\r\n');
    });
    
    // 发送欢迎消息
    socket.write('欢迎连接到 TCP 服务器!\r\n');
    this.emit('connect', { clientId, socket });
  }
  
  sendToClient(clientId, message) {
    const connection = this.connections.get(clientId);
    if (connection && connection.socket.writable) {
      connection.socket.write(message + '\r\n');
      connection.bytesSent += Buffer.byteLength(message + '\r\n');
      return true;
    }
    return false;
  }
  
  broadcast(message) {
    let count = 0;
    for (const [clientId, connection] of this.connections) {
      if (this.sendToClient(clientId, message)) {
        count++;
      }
    }
    return count;
  }
  
  getConnectionInfo(clientId) {
    return this.connections.get(clientId);
  }
  
  getAllConnections() {
    return Array.from(this.connections.keys());
  }
  
  close() {
    if (this.server) {
      // 关闭所有连接
      for (const [clientId, connection] of this.connections) {
        connection.socket.end('服务器关闭\r\n');
      }
      
      this.server.close(() => {
        console.log('TCP 服务器已关闭');
        this.emit('close');
      });
    }
  }
}

// 使用 TCP 服务器
const server = new TCPServer({ port: 8080, host: 'localhost' });

server.on('listening', (info) => {
  console.log(`服务器开始监听: ${info.host}:${info.port}`);
});

server.on('connect', (info) => {
  console.log(`新连接: ${info.clientId}`);
});

server.on('data', (info) => {
  console.log(`处理数据: ${info.data}`);
});

server.on('disconnect', (info) => {
  console.log(`连接断开: ${info.clientId}`);
});

server.start();

// 优雅关闭
process.on('SIGINT', () => {
  console.log('正在关闭服务器...');
  server.close();
  process.exit(0);
});

2. TCP 客户端实现

javascript
// TCP 客户端示例
const net = require('net');

class TCPClient extends EventEmitter {
  constructor(options = {}) {
    super();
    this.host = options.host || 'localhost';
    this.port = options.port || 8080;
    this.socket = null;
    this.connected = false;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
    this.reconnectDelay = options.reconnectDelay || 1000;
  }
  
  connect() {
    return new Promise((resolve, reject) => {
      this.socket = net.createConnection(this.port, this.host, () => {
        this.connected = true;
        this.reconnectAttempts = 0;
        console.log(`已连接到 ${this.host}:${this.port}`);
        this.emit('connect');
        resolve(this.socket);
      });
      
      this.socket.setEncoding('utf8');
      
      this.socket.on('data', (data) => {
        console.log(`接收到数据: ${data.trim()}`);
        this.emit('data', data);
      });
      
      this.socket.on('end', () => {
        this.connected = false;
        console.log('连接已断开');
        this.emit('disconnect');
        
        // 尝试重连
        this.attemptReconnect();
      });
      
      this.socket.on('error', (err) => {
        this.connected = false;
        console.error('连接错误:', err);
        this.emit('error', err);
        reject(err);
        
        // 尝试重连
        this.attemptReconnect();
      });
    });
  }
  
  attemptReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})...`);
      
      setTimeout(() => {
        this.connect().catch(err => {
          console.error('重连失败:', err);
        });
      }, this.reconnectDelay * this.reconnectAttempts);
    } else {
      console.log('达到最大重连次数,停止重连');
      this.emit('reconnectFailed');
    }
  }
  
  send(message) {
    if (this.connected && this.socket && this.socket.writable) {
      this.socket.write(message + '\r\n');
      return true;
    }
    return false;
  }
  
  disconnect() {
    if (this.socket) {
      this.socket.end();
      this.connected = false;
    }
  }
}

// 使用 TCP 客户端
const client = new TCPClient({ host: 'localhost', port: 8080 });

client.on('connect', () => {
  console.log('客户端已连接');
  // 发送消息
  client.send('Hello Server!');
});

client.on('data', (data) => {
  console.log('客户端收到:', data.trim());
});

client.on('disconnect', () => {
  console.log('客户端断开连接');
});

client.on('error', (err) => {
  console.error('客户端错误:', err);
});

// 连接服务器
client.connect().catch(err => {
  console.error('连接失败:', err);
});

二、UDP 编程

1. UDP 服务器实现

javascript
// UDP 服务器示例
const dgram = require('dgram');

class UDPServer {
  constructor(options = {}) {
    this.port = options.port || 8081;
    this.host = options.host || 'localhost';
    this.socket = null;
    this.clients = new Map();
  }
  
  start() {
    this.socket = dgram.createSocket('udp4');
    
    this.socket.on('message', (msg, rinfo) => {
      this.handleMessage(msg, rinfo);
    });
    
    this.socket.on('listening', () => {
      const address = this.socket.address();
      console.log(`UDP 服务器监听 ${address.address}:${address.port}`);
    });
    
    this.socket.on('error', (err) => {
      console.error('UDP 服务器错误:', err);
      this.socket.close();
    });
    
    this.socket.bind(this.port, this.host);
  }
  
  handleMessage(msg, rinfo) {
    const clientId = `${rinfo.address}:${rinfo.port}`;
    console.log(`从 ${clientId} 接收到: ${msg}`);
    
    // 更新客户端信息
    this.clients.set(clientId, {
      address: rinfo.address,
      port: rinfo.port,
      lastSeen: new Date()
    });
    
    // 回复消息
    const response = Buffer.from(`Echo: ${msg}`);
    this.socket.send(response, rinfo.port, rinfo.address, (err) => {
      if (err) {
        console.error('发送回复失败:', err);
      }
    });
  }
  
  broadcast(message) {
    const msg = Buffer.from(message);
    for (const [clientId, client] of this.clients) {
      this.socket.send(msg, client.port, client.address, (err) => {
        if (err) {
          console.error(`向 ${clientId} 广播失败:`, err);
        }
      });
    }
  }
  
  close() {
    if (this.socket) {
      this.socket.close(() => {
        console.log('UDP 服务器已关闭');
      });
    }
  }
}

// 使用 UDP 服务器
const udpServer = new UDPServer({ port: 8081, host: 'localhost' });
udpServer.start();

// 优雅关闭
process.on('SIGINT', () => {
  udpServer.close();
  process.exit(0);
});

2. UDP 客户端实现

javascript
// UDP 客户端示例
const dgram = require('dgram');

class UDPClient {
  constructor(options = {}) {
    this.host = options.host || 'localhost';
    this.port = options.port || 8081;
    this.socket = null;
  }
  
  connect() {
    this.socket = dgram.createSocket('udp4');
    
    this.socket.on('message', (msg, rinfo) => {
      console.log(`从服务器接收到: ${msg}`);
    });
    
    this.socket.on('error', (err) => {
      console.error('UDP 客户端错误:', err);
    });
    
    return this.socket;
  }
  
  send(message) {
    const msg = Buffer.from(message);
    this.socket.send(msg, this.port, this.host, (err) => {
      if (err) {
        console.error('发送消息失败:', err);
      }
    });
  }
  
  close() {
    if (this.socket) {
      this.socket.close();
    }
  }
}

// 使用 UDP 客户端
const udpClient = new UDPClient({ host: 'localhost', port: 8081 });
udpClient.connect();

// 发送消息
udpClient.send('Hello UDP Server!');

// 5秒后关闭
setTimeout(() => {
  udpClient.close();
}, 5000);

三、HTTP 协议实现

1. HTTP 服务器实现

javascript
// HTTP 服务器示例
const http = require('http');
const url = require('url');
const querystring = require('querystring');

class HTTPServer {
  constructor(options = {}) {
    this.port = options.port || 3000;
    this.host = options.host || 'localhost';
    this.routes = new Map();
    this.middlewares = [];
    this.server = null;
  }
  
  use(middleware) {
    this.middlewares.push(middleware);
  }
  
  route(method, path, handler) {
    const key = `${method.toUpperCase()}:${path}`;
    this.routes.set(key, handler);
  }
  
  get(path, handler) {
    this.route('GET', path, handler);
  }
  
  post(path, handler) {
    this.route('POST', path, handler);
  }
  
  put(path, handler) {
    this.route('PUT', path, handler);
  }
  
  delete(path, handler) {
    this.route('DELETE', path, handler);
  }
  
  start() {
    this.server = http.createServer((req, res) => {
      this.handleRequest(req, res);
    });
    
    this.server.listen(this.port, this.host, () => {
      console.log(`HTTP 服务器运行在 http://${this.host}:${this.port}`);
    });
    
    this.server.on('error', (err) => {
      console.error('HTTP 服务器错误:', err);
    });
    
    return this.server;
  }
  
  async handleRequest(req, res) {
    const parsedUrl = url.parse(req.url, true);
    const pathname = parsedUrl.pathname;
    const method = req.method;
    
    // 设置响应头
    res.setHeader('Content-Type', 'application/json');
    res.setHeader('X-Powered-By', 'Custom HTTP Server');
    
    // 创建上下文对象
    const ctx = {
      req,
      res,
      url: parsedUrl,
      params: {},
      query: parsedUrl.query,
      body: null,
      status: 200,
      headers: {}
    };
    
    try {
      // 解析请求体(仅对 POST/PUT 请求)
      if (['POST', 'PUT', 'PATCH'].includes(method)) {
        ctx.body = await this.parseBody(req);
      }
      
      // 执行中间件
      for (const middleware of this.middlewares) {
        await middleware(ctx, () => Promise.resolve());
      }
      
      // 查找路由处理器
      const routeKey = `${method}:${pathname}`;
      const handler = this.routes.get(routeKey);
      
      if (handler) {
        await handler(ctx);
      } else {
        // 404 处理
        ctx.status = 404;
        ctx.body = { error: 'Not Found' };
      }
    } catch (err) {
      console.error('处理请求时出错:', err);
      ctx.status = 500;
      ctx.body = { error: 'Internal Server Error' };
    }
    
    // 发送响应
    this.sendResponse(ctx);
  }
  
  parseBody(req) {
    return new Promise((resolve, reject) => {
      let body = '';
      req.on('data', chunk => {
        body += chunk.toString();
      });
      
      req.on('end', () => {
        try {
          if (req.headers['content-type'] === 'application/json') {
            resolve(JSON.parse(body));
          } else if (req.headers['content-type'] === 'application/x-www-form-urlencoded') {
            resolve(querystring.parse(body));
          } else {
            resolve(body);
          }
        } catch (err) {
          reject(err);
        }
      });
      
      req.on('error', reject);
    });
  }
  
  sendResponse(ctx) {
    const { res, status, body, headers } = ctx;
    
    // 设置状态码
    res.statusCode = status;
    
    // 设置自定义头
    Object.keys(headers).forEach(key => {
      res.setHeader(key, headers[key]);
    });
    
    // 发送响应体
    if (body !== null && body !== undefined) {
      if (typeof body === 'object') {
        res.end(JSON.stringify(body, null, 2));
      } else {
        res.end(body.toString());
      }
    } else {
      res.end();
    }
  }
  
  close() {
    if (this.server) {
      this.server.close(() => {
        console.log('HTTP 服务器已关闭');
      });
    }
  }
}

// 使用 HTTP 服务器
const server = new HTTPServer({ port: 3000, host: 'localhost' });

// 添加中间件
server.use(async (ctx, next) => {
  console.log(`${ctx.req.method} ${ctx.url.pathname}`);
  const start = Date.now();
  await next();
  const ms = Date.now() - start;
  console.log(`请求处理耗时: ${ms}ms`);
});

// 定义路由
server.get('/', (ctx) => {
  ctx.body = { message: 'Hello World!', timestamp: new Date() };
});

server.get('/users', (ctx) => {
  ctx.body = [
    { id: 1, name: 'Alice' },
    { id: 2, name: 'Bob' }
  ];
});

server.post('/users', (ctx) => {
  const newUser = ctx.body;
  newUser.id = Date.now();
  ctx.status = 201;
  ctx.body = { message: 'User created', user: newUser };
});

server.start();

// 优雅关闭
process.on('SIGINT', () => {
  server.close();
  process.exit(0);
});

2. HTTP 客户端实现

javascript
// HTTP 客户端示例
const http = require('http');
const https = require('https');
const { URL } = require('url');

class HTTPClient {
  constructor(options = {}) {
    this.timeout = options.timeout || 5000;
    this.retryAttempts = options.retryAttempts || 3;
  }
  
  async request(url, options = {}) {
    const parsedUrl = new URL(url);
    const isHttps = parsedUrl.protocol === 'https:';
    const client = isHttps ? https : http;
    
    const requestOptions = {
      hostname: parsedUrl.hostname,
      port: parsedUrl.port || (isHttps ? 443 : 80),
      path: parsedUrl.pathname + parsedUrl.search,
      method: options.method || 'GET',
      headers: options.headers || {},
      timeout: this.timeout
    };
    
    return this.makeRequest(client, requestOptions, options.body, 0);
  }
  
  makeRequest(client, options, body, attempt) {
    return new Promise((resolve, reject) => {
      const req = client.request(options, (res) => {
        let data = '';
        
        res.on('data', (chunk) => {
          data += chunk;
        });
        
        res.on('end', () => {
          try {
            const response = {
              statusCode: res.statusCode,
              headers: res.headers,
              body: this.parseResponseBody(data, res.headers['content-type'])
            };
            resolve(response);
          } catch (err) {
            reject(err);
          }
        });
      });
      
      req.on('error', (err) => {
        if (attempt < this.retryAttempts) {
          console.log(`请求失败,正在重试 (${attempt + 1}/${this.retryAttempts})...`);
          setTimeout(() => {
            this.makeRequest(client, options, body, attempt + 1)
              .then(resolve)
              .catch(reject);
          }, 1000 * (attempt + 1));
        } else {
          reject(err);
        }
      });
      
      req.on('timeout', () => {
        req.destroy();
        const err = new Error('请求超时');
        if (attempt < this.retryAttempts) {
          console.log(`请求超时,正在重试 (${attempt + 1}/${this.retryAttempts})...`);
          setTimeout(() => {
            this.makeRequest(client, options, body, attempt + 1)
              .then(resolve)
              .catch(reject);
          }, 1000 * (attempt + 1));
        } else {
          reject(err);
        }
      });
      
      // 发送请求体
      if (body) {
        if (typeof body === 'object') {
          req.write(JSON.stringify(body));
        } else {
          req.write(body.toString());
        }
      }
      
      req.end();
    });
  }
  
  parseResponseBody(body, contentType) {
    if (contentType && contentType.includes('application/json')) {
      return JSON.parse(body);
    }
    return body;
  }
  
  async get(url, options = {}) {
    return this.request(url, { ...options, method: 'GET' });
  }
  
  async post(url, body, options = {}) {
    const headers = {
      'Content-Type': 'application/json',
      ...options.headers
    };
    
    return this.request(url, {
      ...options,
      method: 'POST',
      headers,
      body
    });
  }
  
  async put(url, body, options = {}) {
    const headers = {
      'Content-Type': 'application/json',
      ...options.headers
    };
    
    return this.request(url, {
      ...options,
      method: 'PUT',
      headers,
      body
    });
  }
  
  async delete(url, options = {}) {
    return this.request(url, { ...options, method: 'DELETE' });
  }
}

// 使用 HTTP 客户端
const client = new HTTPClient({ timeout: 5000, retryAttempts: 3 });

async function example() {
  try {
    // GET 请求
    const getResponse = await client.get('http://localhost:3000/users');
    console.log('GET 响应:', getResponse);
    
    // POST 请求
    const postResponse = await client.post('http://localhost:3000/users', {
      name: 'Charlie',
      email: 'charlie@example.com'
    });
    console.log('POST 响应:', postResponse);
    
  } catch (err) {
    console.error('请求失败:', err);
  }
}

// example();

四、WebSocket 实现

1. WebSocket 服务器

javascript
// 简单的 WebSocket 服务器实现
const http = require('http');
const crypto = require('crypto');

class WebSocketServer {
  constructor(options = {}) {
    this.port = options.port || 8080;
    this.clients = new Set();
  }
  
  start() {
    this.server = http.createServer((req, res) => {
      res.writeHead(200, { 'Content-Type': 'text/plain' });
      res.end('WebSocket server is running here');
    });
    
    this.server.on('upgrade', (req, socket, head) => {
      this.handleUpgrade(req, socket, head);
    });
    
    this.server.listen(this.port, () => {
      console.log(`WebSocket 服务器运行在端口 ${this.port}`);
    });
  }
  
  handleUpgrade(req, socket, head) {
    // 检查是否为 WebSocket 升级请求
    if (req.headers.upgrade !== 'websocket') {
      socket.destroy();
      return;
    }
    
    // 验证 WebSocket 密钥
    const key = req.headers['sec-websocket-key'];
    if (!key) {
      socket.destroy();
      return;
    }
    
    // 生成接受密钥
    const acceptKey = this.generateAcceptKey(key);
    
    // 发送升级响应
    const headers = [
      'HTTP/1.1 101 Switching Protocols',
      'Upgrade: websocket',
      'Connection: Upgrade',
      `Sec-WebSocket-Accept: ${acceptKey}`,
      '\r\n'
    ];
    
    socket.write(headers.join('\r\n'));
    
    // 处理 WebSocket 连接
    this.handleConnection(socket, req);
  }
  
  generateAcceptKey(key) {
    const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
    const sha1 = crypto.createHash('sha1');
    sha1.update(key + GUID);
    return sha1.digest('base64');
  }
  
  handleConnection(socket, req) {
    const clientId = `${req.socket.remoteAddress}:${req.socket.remotePort}`;
    console.log(`WebSocket 客户端 ${clientId} 已连接`);
    
    this.clients.add(socket);
    
    socket.on('data', (data) => {
      try {
        const message = this.parseWebSocketFrame(data);
        console.log(`从 ${clientId} 接收到:`, message);
        
        // 广播消息给所有客户端
        this.broadcast(message, socket);
      } catch (err) {
        console.error('解析 WebSocket 帧时出错:', err);
      }
    });
    
    socket.on('close', () => {
      console.log(`WebSocket 客户端 ${clientId} 已断开`);
      this.clients.delete(socket);
    });
    
    socket.on('error', (err) => {
      console.error(`WebSocket 客户端 ${clientId} 错误:`, err);
      this.clients.delete(socket);
    });
  }
  
  parseWebSocketFrame(buffer) {
    // 简化的 WebSocket 帧解析
    const opcode = buffer[0] & 0x0F;
    const masked = (buffer[1] & 0x80) === 0x80;
    let payloadLength = buffer[1] & 0x7F;
    
    let offset = 2;
    if (payloadLength === 126) {
      payloadLength = buffer.readUInt16BE(2);
      offset = 4;
    } else if (payloadLength === 127) {
      payloadLength = buffer.readUInt32BE(6);
      offset = 10;
    }
    
    let maskKey;
    if (masked) {
      maskKey = buffer.slice(offset, offset + 4);
      offset += 4;
    }
    
    let payload = buffer.slice(offset, offset + payloadLength);
    if (masked) {
      for (let i = 0; i < payload.length; i++) {
        payload[i] ^= maskKey[i % 4];
      }
    }
    
    return payload.toString();
  }
  
  createWebSocketFrame(message) {
    const payload = Buffer.from(message);
    const payloadLength = payload.length;
    
    let frame;
    if (payloadLength <= 125) {
      frame = Buffer.alloc(2 + payloadLength);
      frame[1] = payloadLength;
    } else if (payloadLength <= 65535) {
      frame = Buffer.alloc(4 + payloadLength);
      frame[1] = 126;
      frame.writeUInt16BE(payloadLength, 2);
      frame = Buffer.concat([frame, payload]);
    } else {
      frame = Buffer.alloc(10 + payloadLength);
      frame[1] = 127;
      frame.writeUInt32BE(0, 2);
      frame.writeUInt32BE(payloadLength, 6);
      frame = Buffer.concat([frame, payload]);
    }
    
    frame[0] = 0x81; // FIN + text frame
    
    if (frame.length === 2 + payloadLength) {
      payload.copy(frame, 2);
    }
    
    return frame;
  }
  
  broadcast(message, excludeSocket = null) {
    const frame = this.createWebSocketFrame(message);
    for (const client of this.clients) {
      if (client !== excludeSocket && client.readyState === 'open') {
        client.write(frame);
      }
    }
  }
  
  close() {
    if (this.server) {
      this.server.close(() => {
        console.log('WebSocket 服务器已关闭');
      });
    }
  }
}

// 使用 WebSocket 服务器
const wsServer = new WebSocketServer({ port: 8080 });
wsServer.start();

// 优雅关闭
process.on('SIGINT', () => {
  wsServer.close();
  process.exit(0);
});

五、总结

Node.js 提供了强大的网络编程能力,支持多种网络协议:

  1. TCP:可靠的面向连接的协议,适用于需要保证数据完整性的应用
  2. UDP:无连接的协议,适用于实时性要求高但可以容忍少量数据丢失的应用
  3. HTTP/HTTPS:应用层协议,是 Web 开发的基础
  4. WebSocket:全双工通信协议,适用于实时通信应用

通过深入理解这些协议的实现机制,我们可以:

  • 构建高性能的网络应用
  • 选择合适的协议满足不同业务需求
  • 实现自定义协议处理器
  • 优化网络通信性能

在实际开发中,应根据具体需求选择合适的协议和实现方式,同时注意处理异常情况、实现重连机制、优化性能等,以确保网络应用的稳定性和可靠性。