WebSocket

深入理解 WebSocket 协议原理、与 HTTP 的对比、API 详解、实际应用场景,以及 reconnect、重连策略等工程实践。

WebSocket(WebSocket)

一、WebSocket 概述

1.1 为什么需要 WebSocket

HTTP 是请求-响应模型,客户端发起请求,服务器返回响应。这种模式不适合需要服务器主动推送的场景,比如:

  • 实时聊天应用
  • 股票行情更新
  • 在线游戏
  • 协作编辑
  • 进度通知

传统解决方案如轮询(polling)和长轮询(long polling)都有性能或延迟问题。WebSocket 提供了真正的双向通信能力。

1.2 WebSocket 与 HTTP 的对比

特性 HTTP WebSocket
连接方向 客户端发起请求 双向对等
服务器推送 不支持 完全支持
头部开销 大(每个请求携带完整 Header) 小(握手后几乎没有额外头部)
连接次数 每次请求建立新连接 建立一次连接
数据格式 请求-响应 帧(frame)
适用场景 RESTful API 实时应用

1.3 协议升级过程

WebSocket 使用 HTTP 的 Upgrade 机制建立连接:

GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25nZQ==
Sec-WebSocket-Version: 13

服务器响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

二、WebSocket API

2.1 建立连接

const ws = new WebSocket('ws://example.com/ws');

// wss:// 表示加密的 WebSocket(生产环境推荐)
// const ws = new WebSocket('wss://example.com/ws');

ws.addEventListener('open', (event) => {
  console.log('Connection opened');
  // 发送消息
  ws.send('Hello, server!');
});

ws.addEventListener('message', (event) => {
  console.log('Message received:', event.data);
});

ws.addEventListener('error', (event) => {
  console.error('Error:', event);
});

ws.addEventListener('close', (event) => {
  console.log('Connection closed');
  console.log('Code:', event.code);
  console.log('Reason:', event.reason);
});

2.2 发送和接收数据

// 发送字符串
ws.send('Hello, world!');

// 发送 JSON
ws.send(JSON.stringify({ type: 'message', content: 'Hello!' }));

// 发送二进制数据
const buffer = new ArrayBuffer(8);
ws.send(buffer);

// 发送 Blob
const blob = new Blob(['Hello'], { type: 'text/plain' });
ws.send(blob);

// 接收消息
ws.addEventListener('message', (event) => {
  // event.data 可以是 String、Blob、ArrayBuffer
  if (typeof event.data === 'string') {
    console.log('Text message:', event.data);
  } else if (event.data instanceof Blob) {
    event.data.text().then(text => {
      console.log('Blob message:', text);
    });
  } else if (event.data instanceof ArrayBuffer) {
    console.log('ArrayBuffer message:', event.data);
  }
});

2.3 连接状态

const ws = new WebSocket('ws://example.com/ws');

console.log(ws.readyState);
// 0: CONNECTING
// 1: OPEN
// 2: CLOSING
// 3: CLOSED

// 检查连接状态
if (ws.readyState === WebSocket.OPEN) {
  ws.send('Hello!');
}

// 监听状态变化
ws.addEventListener('open', () => {
  console.log('Connected');
});

ws.addEventListener('close', () => {
  console.log('Disconnected');
});

2.4 关闭连接

// 客户端主动关闭
ws.close(1000, 'Normal closure');

// 关闭码说明:
// 1000: 正常关闭
// 1001: 服务器关闭
// 1002: 协议错误
// 1003: 不支持的数据类型
// 1005: 无关闭码(内部使用)
// 1006: 连接异常关闭(不应该在代码中使用)
// 1007: 消息编码错误
// 1008: 违反政策
// 1009: 消息太大
// 1010: 缺少扩展
// 1011: 服务器错误

三、重连策略

3.1 基本重连实现

class ReconnectingWebSocket {
  constructor(url, options = {}) {
    this.url = url;
    this.reconnectInterval = options.reconnectInterval || 1000;
    this.maxReconnectInterval = options.maxReconnectInterval || 30000;
    this.reconnectDecay = options.reconnectDecay || 1.5;
    this.maxReconnectAttempts = options.maxReconnectAttempts || Infinity;
    this.readyState = WebSocket.CONNECTING;

    this.reconnectAttempts = 0;
    this.shouldReconnect = true;

    this.connect();
  }

  connect() {
    try {
      this.ws = new WebSocket(this.url);
    } catch (error) {
      this.scheduleReconnect();
      return;
    }

    this.ws.addEventListener('open', () => {
      this.readyState = WebSocket.OPEN;
      this.reconnectAttempts = 0;
    });

    this.ws.addEventListener('message', (event) => {
      this.onmessage(event);
    });

    this.ws.addEventListener('close', (event) => {
      this.readyState = WebSocket.CLOSED;

      if (this.shouldReconnect) {
        this.scheduleReconnect();
      }

      this.onclose(event);
    });

    this.ws.addEventListener('error', (event) => {
      this.onerror(event);
    });
  }

  scheduleReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.log('Max reconnect attempts reached');
      return;
    }

    const delay = this.reconnectInterval * Math.pow(this.reconnectDecay, this.reconnectAttempts);
    console.log(`Reconnecting in ${delay}ms...`);

    setTimeout(() => {
      this.reconnectAttempts++;
      this.connect();
    }, delay);
  }

  send(data) {
    if (this.ws && this.readyState === WebSocket.OPEN) {
      this.ws.send(data);
    } else {
      console.error('WebSocket is not open');
    }
  }

  close() {
    this.shouldReconnect = false;
    this.ws.close();
  }

  // 可被重写的回调
  onopen(event) {}
  onmessage(event) {}
  onclose(event) {}
  onerror(event) {}
}

3.2 使用示例

const ws = new ReconnectingWebSocket('wss://example.com/ws', {
  reconnectInterval: 1000,
  maxReconnectAttempts: 10
});

ws.onopen = () => console.log('Connected');
ws.onmessage = (event) => console.log('Message:', event.data);
ws.onclose = (event) => console.log('Closed:', event.code, event.reason);
ws.onerror = (event) => console.error('Error:', event);

四、心跳机制

4.1 心跳检测连接状态

class HeartbeatWebSocket {
  constructor(url, options = {}) {
    this.url = url;
    this.pingInterval = options.pingInterval || 30000;
    this.pongTimeout = options.pongTimeout || 10000;

    this.ws = null;
    this.pingTimer = null;
    this.pongTimer = null;

    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.addEventListener('open', () => {
      this.startHeartbeat();
    });

    this.ws.addEventListener('message', (event) => {
      if (event.data === 'pong') {
        this.onPong();
      } else {
        this.onmessage(event);
      }
    });

    this.ws.addEventListener('close', () => {
      this.stopHeartbeat();
    });
  }

  startHeartbeat() {
    this.stopHeartbeat();

    this.pingTimer = setInterval(() => {
      this.send('ping');
      this.resetPongTimer();
    }, this.pingInterval);
  }

  stopHeartbeat() {
    if (this.pingTimer) {
      clearInterval(this.pingTimer);
      this.pingTimer = null;
    }
    this.stopPongTimer();
  }

  resetPongTimer() {
    this.stopPongTimer();
    this.pongTimer = setTimeout(() => {
      console.log('Pong timeout, reconnecting...');
      this.ws.close();
    }, this.pongTimeout);
  }

  stopPongTimer() {
    if (this.pongTimer) {
      clearTimeout(this.pongTimer);
      this.pongTimer = null;
    }
  }

  onPong() {
    this.resetPongTimer();
  }

  send(data) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(data);
    }
  }

  close() {
    this.stopHeartbeat();
    this.ws.close();
  }

  onmessage(event) {}
}

五、消息协议设计

5.1 JSON 消息格式

// 消息类型定义
const MessageType = {
  CHAT: 'chat',
  NOTIFICATION: 'notification',
  PRESENCE: 'presence',
  ERROR: 'error',
  PING: 'ping',
  PONG: 'pong'
};

// 发送消息
function sendMessage(ws, type, payload) {
  const message = {
    type,
    payload,
    timestamp: Date.now()
  };
  ws.send(JSON.stringify(message));
}

// 接收消息
ws.addEventListener('message', (event) => {
  try {
    const message = JSON.parse(event.data);

    switch (message.type) {
      case MessageType.CHAT:
        handleChat(message.payload);
        break;
      case MessageType.NOTIFICATION:
        handleNotification(message.payload);
        break;
      case MessageType.ERROR:
        handleError(message.payload);
        break;
      case MessageType.PING:
        ws.send(JSON.stringify({ type: MessageType.PONG }));
        break;
    }
  } catch (error) {
    console.error('Failed to parse message:', error);
  }
});

5.2 二进制协议设计

// 固定头部长度 8 字节
// | 1 byte  | 4 bytes    | 2 bytes    | 1 byte |
// | version | timestamp   | payloadLen | type   |

const PROTOCOL_VERSION = 1;

function encodeMessage(type, payload) {
  const payloadBytes = new TextEncoder().encode(JSON.stringify(payload));
  const buffer = new ArrayBuffer(8 + payloadBytes.length);
  const view = new DataView(buffer);

  // 写入版本
  view.setUint8(0, PROTOCOL_VERSION);

  // 写入时间戳
  view.setUint32(1, Date.now());

  // 写入长度
  view.setUint16(5, payloadBytes.length);

  // 写入类型
  view.setUint8(7, type);

  // 写入数据
  const payloadView = new Uint8Array(buffer, 8);
  payloadView.set(payloadBytes);

  return buffer;
}

function decodeMessage(buffer) {
  const view = new DataView(buffer);

  const version = view.getUint8(0);
  const timestamp = view.getUint32(1);
  const payloadLen = view.getUint16(5);
  const type = view.getUint8(7);

  const payloadBytes = new Uint8Array(buffer, 8, payloadLen);
  const payload = JSON.parse(new TextDecoder().decode(payloadBytes));

  return { version, timestamp, type, payload };
}

六、安全考虑

6.1 验证与认证

// 通过 URL 参数传递 token(不安全)
// const ws = new WebSocket(`wss://example.com/ws?token=${token}`);

// 更好的方式:握手后立即发送认证消息
const ws = new WebSocket('wss://example.com/ws');

ws.addEventListener('open', () => {
  ws.send(JSON.stringify({
    type: 'auth',
    token: 'your-jwt-token'
  }));
});

ws.addEventListener('message', (event) => {
  const message = JSON.parse(event.data);
  if (message.type === 'auth_success') {
    console.log('Authenticated');
  } else if (message.type === 'auth_error') {
    console.error('Authentication failed');
    ws.close();
  }
});

6.2 跨域 WebSocket

// 客户端
const ws = new WebSocket('wss://api.example.com/ws');

// 服务器需要正确设置 CORS 头
// Access-Control-Allow-Origin: https://your-domain.com

七、性能优化

7.1 消息分片

WebSocket 支持消息分片,用于发送大消息:

// 发送大消息时分片
const MESSAGE_THRESHOLD = 64 * 1024; // 64KB

function sendLargeMessage(ws, data) {
  const message = typeof data === 'string' ? data : JSON.stringify(data);

  if (message.length > MESSAGE_THRESHOLD) {
    // 分片发送
    const fragments = [];
    let offset = 0;

    while (offset < message.length) {
      const chunk = message.slice(offset, offset + MESSAGE_THRESHOLD);
      const isLast = offset + chunk.length >= message.length;

      ws.send(JSON.stringify({
        type: 'fragment',
        data: chunk,
        isLast,
        offset
      }));

      offset += chunk.length;
    }
  } else {
    ws.send(message);
  }
}

7.2 连接池

class WebSocketPool {
  constructor(url, poolSize = 5) {
    this.url = url;
    this.poolSize = poolSize;
    this.available = [];
    this.inUse = new Set();

    // 初始化连接池
    for (let i = 0; i < poolSize; i++) {
      this.available.push(this.createConnection());
    }
  }

  createConnection() {
    const ws = new WebSocket(this.url);

    ws.addEventListener('close', () => {
      // 连接关闭时重新创建
      const index = this.available.indexOf(ws);
      if (index > -1) {
        this.available.splice(index, 1);
        this.available.push(this.createConnection());
      }
      this.inUse.delete(ws);
    });

    return ws;
  }

  acquire() {
    if (this.available.length === 0) {
      return this.createConnection();
    }
    const ws = this.available.pop();
    this.inUse.add(ws);
    return ws;
  }

  release(ws) {
    this.inUse.delete(ws);
    if (ws.readyState === WebSocket.OPEN) {
      this.available.push(ws);
    } else {
      this.available.push(this.createConnection());
    }
  }
}

参考资料

延展阅读