阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > WebSocket实现

WebSocket实现

作者:陈川 阅读数:7593人阅读 分类: Node.js

WebSocket简介

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它允许服务端主动向客户端推送数据,解决了HTTP协议无法实现服务器主动推送的问题。WebSocket协议在2011年被IETF定为标准RFC 6455,后被W3C定为标准。

WebSocket与HTTP对比

HTTP协议是无状态的,每次请求都需要建立新的连接,而WebSocket只需要建立一次连接:

  • HTTP是半双工,WebSocket是全双工
  • HTTP每次请求都包含头部信息,WebSocket建立连接后数据传输量小
  • HTTP需要客户端轮询获取新数据,WebSocket支持服务端主动推送
// HTTP轮询示例
setInterval(() => {
  fetch('/api/data')
    .then(res => res.json())
    .then(data => console.log(data))
}, 1000);

// WebSocket示例
const ws = new WebSocket('ws://example.com');
ws.onmessage = (event) => {
  console.log(event.data);
};

Node.js中的WebSocket实现

使用ws模块

ws是Node.js中最流行的WebSocket库,轻量且高效:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  console.log('新客户端连接');
  
  ws.on('message', (message) => {
    console.log(`收到消息: ${message}`);
    // 广播给所有客户端
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(message);
      }
    });
  });

  ws.send('欢迎连接WebSocket服务器');
});

使用Socket.io

Socket.io提供了更高级的功能,如自动重连、房间支持等:

const server = require('http').createServer();
const io = require('socket.io')(server);

io.on('connection', (socket) => {
  console.log('用户连接');
  
  socket.on('chat message', (msg) => {
    io.emit('chat message', msg);
  });
  
  socket.on('disconnect', () => {
    console.log('用户断开连接');
  });
});

server.listen(3000);

WebSocket握手过程

WebSocket连接通过HTTP升级请求建立:

  1. 客户端发送握手请求:
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
  1. 服务端响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

消息帧格式

WebSocket数据以帧的形式传输,包含:

  • FIN: 是否是最后一帧
  • Opcode: 操作码(0x1文本,0x2二进制)
  • Mask: 是否掩码
  • Payload length: 数据长度
  • Masking-key: 掩码键(如果有)
  • Payload data: 实际数据
// 手动解析WebSocket帧(简化版)
function parseFrame(buffer) {
  const firstByte = buffer.readUInt8(0);
  const secondByte = buffer.readUInt8(1);
  
  const opcode = firstByte & 0x0F;
  const isMasked = Boolean((secondByte >>> 7) & 0x01);
  let payloadLength = secondByte & 0x7F;
  
  let offset = 2;
  if (payloadLength === 126) {
    payloadLength = buffer.readUInt16BE(offset);
    offset += 2;
  } else if (payloadLength === 127) {
    payloadLength = buffer.readBigUInt64BE(offset);
    offset += 8;
  }
  
  // 省略掩码处理...
  return {
    opcode,
    payloadLength,
    payloadData: buffer.slice(offset)
  };
}

心跳机制

保持连接活跃的心跳机制实现:

// 服务端心跳
function setupHeartbeat(ws) {
  ws.isAlive = true;
  ws.on('pong', () => { ws.isAlive = true; });
  
  const interval = setInterval(() => {
    if (!ws.isAlive) return ws.terminate();
    
    ws.isAlive = false;
    ws.ping(null, false);
  }, 30000);
  
  ws.on('close', () => clearInterval(interval));
}

// 客户端心跳
const ws = new WebSocket('ws://example.com');
const heartbeatInterval = setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.ping();
  }
}, 25000);

ws.on('close', () => clearInterval(heartbeatInterval));

安全考虑

认证授权

// Token认证示例
const wss = new WebSocket.Server({ noServer: true });

server.on('upgrade', (request, socket, head) => {
  const token = request.url.split('token=')[1];
  
  if (!validateToken(token)) {
    socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
    socket.destroy();
    return;
  }
  
  wss.handleUpgrade(request, socket, head, (ws) => {
    wss.emit('connection', ws, request);
  });
});

消息验证

ws.on('message', (message) => {
  try {
    const data = JSON.parse(message);
    if (!isValid(data)) {
      throw new Error('Invalid message');
    }
    // 处理有效消息
  } catch (err) {
    ws.close(1008, 'Invalid message format');
  }
});

性能优化

消息压缩

const WebSocket = require('ws');
const zlib = require('zlib');

const wss = new WebSocket.Server({
  port: 8080,
  perMessageDeflate: {
    zlibDeflateOptions: {
      chunkSize: 1024,
      memLevel: 7,
      level: 3
    },
    zlibInflateOptions: {
      chunkSize: 10 * 1024
    },
    threshold: 1024 // 仅压缩大于此字节的消息
  }
});

连接限制

const wss = new WebSocket.Server({ port: 8080 });
const connections = new Set();

wss.on('connection', (ws) => {
  if (connections.size >= 1000) {
    ws.close(1013, 'Server too busy');
    return;
  }
  
  connections.add(ws);
  ws.on('close', () => connections.delete(ws));
});

实际应用场景

实时聊天应用

// 服务端
wss.on('connection', (ws) => {
  ws.userData = { id: generateId() };
  
  ws.on('message', (message) => {
    const data = JSON.parse(message);
    
    switch(data.type) {
      case 'join':
        ws.userData.room = data.room;
        break;
      case 'message':
        broadcastToRoom(ws.userData.room, {
          type: 'message',
          user: ws.userData.id,
          text: data.text,
          timestamp: Date.now()
        });
        break;
    }
  });
});

function broadcastToRoom(room, message) {
  wss.clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN && 
        client.userData.room === room) {
      client.send(JSON.stringify(message));
    }
  });
}

实时数据监控

// 服务端推送传感器数据
const sensorData = {
  temperature: 0,
  humidity: 0
};

setInterval(() => {
  // 模拟数据变化
  sensorData.temperature = 20 + 5 * Math.random();
  sensorData.humidity = 50 + 10 * Math.random();
  sensorData.timestamp = Date.now();
  
  wss.clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN) {
      client.send(JSON.stringify({
        type: 'sensorUpdate',
        data: sensorData
      }));
    }
  });
}, 1000);

错误处理与调试

常见错误代码

  • 1006: 异常关闭
  • 1001: 端点离开
  • 1002: 协议错误
  • 1008: 策略违规
  • 1011: 服务器错误
ws.on('close', (code, reason) => {
  console.log(`连接关闭,代码: ${code}, 原因: ${reason}`);
  if (code === 1006) {
    console.log('可能是网络问题导致连接异常断开');
  }
});

ws.on('error', (error) => {
  console.error('WebSocket错误:', error);
});

调试工具

使用浏览器开发者工具查看WebSocket流量:

  1. Chrome开发者工具 → Network → WS
  2. 点击WebSocket连接查看消息帧
  3. 可以过滤和查看每条消息内容

浏览器兼容性

现代浏览器都支持WebSocket API,但需要考虑降级方案:

function connectWebSocket() {
  if ('WebSocket' in window) {
    return new WebSocket('ws://example.com');
  } else if ('MozWebSocket' in window) {
    return new MozWebSocket('ws://example.com');
  } else {
    // 降级为长轮询
    setupPolling();
    return null;
  }
}

function setupPolling() {
  // 实现HTTP长轮询
  let timestamp = 0;
  
  function poll() {
    fetch(`/poll?since=${timestamp}`)
      .then(res => res.json())
      .then(data => {
        timestamp = data.timestamp;
        processMessages(data.messages);
        poll();
      });
  }
  
  poll();
}

WebSocket扩展

自定义协议

// 定义简单协议
// 格式: type|payload
const PROTOCOL = {
  MESSAGE: 'msg',
  NOTIFICATION: 'noti',
  COMMAND: 'cmd'
};

// 发送消息
function sendMessage(ws, type, payload) {
  ws.send(`${type}|${JSON.stringify(payload)}`);
}

// 接收处理
ws.on('message', (data) => {
  const [type, payload] = data.split('|');
  const parsedPayload = JSON.parse(payload);
  
  switch(type) {
    case PROTOCOL.MESSAGE:
      handleMessage(parsedPayload);
      break;
    case PROTOCOL.NOTIFICATION:
      handleNotification(parsedPayload);
      break;
  }
});

二进制数据传输

// 发送二进制数据
const buffer = new ArrayBuffer(8);
const view = new Uint8Array(buffer);
for (let i = 0; i < 8; i++) view[i] = i;

ws.onopen = () => {
  ws.send(buffer);
};

// 接收二进制数据
ws.onmessage = (event) => {
  if (event.data instanceof ArrayBuffer) {
    const view = new Uint8Array(event.data);
    console.log('收到二进制数据:', view);
  }
};

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

上一篇:TCP/UDP编程

下一篇:网络代理实现

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌