系列文章:Go 聊天室实战系列(4/6)

项目地址https://gitee.com/gaogaohang/go-land_-learning

技术栈:Go 1.23 + Gin + PostgreSQL + Redis + Vue3


一、WebSocket 基础

1.1 WebSocket vs HTTP

特性

HTTP

WebSocket

连接方式

短连接(请求 - 响应)

长连接(持久化)

通信方向

单向(客户端发起)

双向(全双工)

头部开销

大(每次请求)

小(握手后 2-10 字节)

实时性

低(需轮询)

高(服务端推送)

适用场景

REST API、页面请求

聊天、通知、实时数据

1.2 WebSocket 握手流程

客户端                              服务端
  │                                   │
  │  ───────── GET /ws ──────────►   │
  │     Upgrade: websocket            │
  │     Connection: Upgrade           │
  │     Sec-WebSocket-Key: xxx        │
  │                                   │
  │  ◄──────── 101 Switching ──────   │
  │     Protocols: websocket          │
  │     Sec-WebSocket-Accept: yyy     │
  │                                   │
  │  ════════ WebSocket 连接建立 ═════ │
  │                                   │
  │  ◄─────── [消息推送] ─────────►   │
  │         (双向通信)                │

二、连接管理器设计

2.1 核心结构

// internal/websocket/client.go
​
package websocket
​
import (
    "time"
    "github.com/gorilla/websocket"
)
​
// Client WebSocket 客户端连接
type Client struct {
    Hub        *Hub                // 所属 Hub
    Conn       *websocket.Conn     // WebSocket 连接
    UserID     int64               // 用户 ID
    Send       chan []byte         // 发送队列
    Heartbeat  *time.Ticker        // 心跳定时器
    done       chan struct{}       // 结束信号
}
​
// Hub 连接管理中心
type Hub struct {
    Clients    map[int64]*Client   // 在线用户 map
    Register   chan *Client        // 注册通道
    Unregister chan *Client        // 注销通道
    Broadcast  chan *BroadcastMsg  // 广播通道
}
​
// BroadcastMsg 广播消息
type BroadcastMsg struct {
    Type    string      `json:"type"`
    Data    interface{} `json:"data"`
    Exclude int64       `json:"exclude"` // 排除的用户 ID
}
​
// NewHub 创建 Hub
func NewHub() *Hub {
    return &Hub{
        Clients:    make(map[int64]*Client),
        Register:   make(chan *Client),
        Unregister: make(chan *Client),
        Broadcast:  make(chan *BroadcastMsg, 256),
    }
}
​
// Run 启动 Hub
func (h *Hub) Run() {
    for {
        select {
        case client := <-h.Register:
            // 注册新连接
            h.Clients[client.UserID] = client
            
        case client := <-h.Unregister:
            // 注销连接
            if _, ok := h.Clients[client.UserID]; ok {
                delete(h.Clients, client.UserID)
                close(client.Send)
            }
            
        case msg := <-h.Broadcast:
            // 广播消息
            for userID, client := range h.Clients {
                if msg.Exclude > 0 && userID == msg.Exclude {
                    continue // 排除指定用户
                }
                select {
                case client.Send <- msg.Data:
                default:
                    // 发送失败,关闭连接
                    close(client.Send)
                    delete(h.Clients, userID)
                }
            }
        }
    }
}

2.2 客户端管理

// internal/websocket/client.go
​
const (
    writeWait      = 10 * time.Second    // 写入超时
    pongWait       = 60 * time.Second    // 读取超时
    pingPeriod     = (pongWait * 9) / 10 // 心跳间隔
    maxMessageSize = 512 * 1024          // 最大消息 512KB
)
​
// Start 启动客户端
func (c *Client) Start() {
    go c.readPump()
    go c.writePump()
    go c.startHeartbeat()
}
​
// readPump 读取消息
func (c *Client) readPump() {
    defer func() {
        c.Hub.Unregister <- c
        c.Conn.Close()
    }()
    
    c.Conn.SetReadLimit(maxMessageSize)
    c.Conn.SetReadDeadline(time.Now().Add(pongWait))
    c.Conn.SetPongHandler(func(string) error {
        c.Conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })
    
    for {
        _, message, err := c.Conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, 
                websocket.CloseGoingAway, 
                websocket.CloseAbnormalClosure) {
                logger.Errorf("WebSocket 错误:%v", err)
            }
            break
        }
        
        // 处理消息
        c.Hub.HandleMessage(c, message)
    }
}
​
// writePump 写入消息
func (c *Client) writePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.Conn.Close()
    }()
    
    for {
        select {
        case message, ok := <-c.Send:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if !ok {
                // Hub 关闭了通道
                c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            
            w, err := c.Conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            w.Write(message)
            
            // 批量发送
            n := len(c.Send)
            for i := 0; i < n; i++ {
                w.Write([]byte{'\n'})
                w.Write(<-c.Send)
            }
            
            if err := w.Close(); err != nil {
                return
            }
            
        case <-ticker.C:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}
​
// startHeartbeat 心跳
func (c *Client) startHeartbeat() {
    c.Heartbeat = time.NewTicker(30 * time.Second)
    defer c.Heartbeat.Stop()
    
    for {
        select {
        case <-c.Heartbeat.C:
            // Redis 续期在线状态
            redis.SetOnline(context.Background(), c.UserID)
            
        case <-c.done:
            return
        }
    }
}

三、消息路由

3.1 消息类型定义

// internal/websocket/message.go
​
package websocket
​
// MessageType 消息类型
type MessageType string
​
const (
    MsgTypeHeartbeat   MessageType = "heartbeat"    // 心跳
    MsgTypeText        MessageType = "text"         // 文本消息
    MsgTypeImage       MessageType = "image"        // 图片
    MsgTypeSystem      MessageType = "system"       // 系统消息
    MsgTypeAck         MessageType = "ack"          // 确认
    MsgTypeOnline      MessageType = "online"       // 上线通知
    MsgTypeOffline     MessageType = "offline"      // 下线通知
)
​
// Message 消息结构
type Message struct {
    Type      MessageType `json:"type"`
    FromID    int64       `json:"from_id"`
    ToID      int64       `json:"to_id,omitempty"`
    SessionID int64       `json:"session_id,omitempty"`
    Content   string      `json:"content"`
    Timestamp int64       `json:"timestamp"`
    ClientSeq int64       `json:"client_seq"` // 客户端序列号
    ServerSeq int64       `json:"server_seq"` // 服务端序列号
}
​
// HandleMessage 处理消息
func (h *Hub) HandleMessage(client *Client, data []byte) {
    var msg Message
    if err := json.Unmarshal(data, &msg); err != nil {
        logger.Errorf("消息解析失败:%v", err)
        return
    }
    
    msg.FromID = client.UserID
    msg.Timestamp = time.Now().UnixMilli()
    
    switch msg.Type {
    case MsgTypeHeartbeat:
        // 心跳响应
        h.sendToClient(client.UserID, Message{
            Type: MsgTypeAck,
        })
        
    case MsgTypeText:
        // 私聊消息
        h.handlePrivateMessage(&msg)
        
    case MsgTypeSystem:
        // 系统消息
        h.handleSystemMessage(&msg)
    }
}
​
// handlePrivateMessage 处理私聊消息
func (h *Hub) handlePrivateMessage(msg *Message) {
    // 1. 检查接收者是否在线
    if targetClient, ok := h.Clients[msg.ToID]; ok {
        // 在线:直接推送
        h.sendToClient(msg.ToID, *msg)
    } else {
        // 离线:存储到数据库
        storeOfflineMessage(msg)
    }
    
    // 2. 发送确认
    h.sendToClient(msg.FromID, Message{
        Type:      MsgTypeAck,
        ServerSeq: generateServerSeq(),
    })
}
​
// sendToClient 发送消息给指定用户
func (h *Hub) sendToClient(userID int64, msg Message) {
    client, ok := h.Clients[userID]
    if !ok {
        return
    }
    
    data, _ := json.Marshal(msg)
    select {
    case client.Send <- data:
    default:
        // 发送队列已满
        logger.Warnf("用户 %d 发送队列已满", userID)
    }
}

3.2 WebSocket 路由

// internal/api/ws.go
​
package api
​
import (
    "go-chat/internal/middleware"
    "go-chat/internal/websocket"
    "net/http"
​
    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
)
​
var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true // 允许所有来源(生产环境需限制)
    },
}
​
// WSConnect WebSocket 连接
// GET /ws
func WSConnect(c *gin.Context) {
    userID, exists := c.Get("userID")
    if !exists {
        c.JSON(http.StatusUnauthorized, gin.H{"error": "未认证"})
        return
    }
    
    // 升级为 WebSocket 连接
    conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
    if err != nil {
        logger.Errorf("WebSocket 升级失败:%v", err)
        return
    }
    
    // 创建客户端
    client := &websocket.Client{
        Hub:    websocket.GlobalHub,
        Conn:   conn,
        UserID: userID.(int64),
        Send:   make(chan []byte, 256),
        done:   make(chan struct{}),
    }
    
    // 注册到 Hub
    client.Hub.Register <- client
    
    // 启动客户端
    client.Start()
    
    // 发送上线通知
    client.Hub.Broadcast <- &websocket.BroadcastMsg{
        Type: "online",
        Data: gin.H{"user_id": client.UserID},
        Exclude: client.UserID,
    }
}

四、断线重连

4.1 客户端重连逻辑

// web/src/utils/websocket.ts
​
class WebSocketClient {
  private ws: WebSocket | null = null
  private reconnectCount = 0
  private maxReconnect = 5
  private reconnectDelay = 3000 // 3 秒
  
  connect() {
    const token = getToken()
    const wsUrl = `ws://localhost:8080/ws?token=${token}`
    
    this.ws = new WebSocket(wsUrl)
    
    this.ws.onopen = () => {
      console.log('WebSocket 已连接')
      this.reconnectCount = 0
    }
    
    this.ws.onmessage = (event) => {
      const msg = JSON.parse(event.data)
      this.handleMessage(msg)
    }
    
    this.ws.onclose = () => {
      console.log('WebSocket 已关闭')
      this.attemptReconnect()
    }
    
    this.ws.onerror = (error) => {
      console.error('WebSocket 错误:', error)
    }
  }
  
  attemptReconnect() {
    if (this.reconnectCount >= this.maxReconnect) {
      console.error('重连次数已达上限')
      return
    }
    
    this.reconnectCount++
    console.log(`尝试重连 (${this.reconnectCount}/${this.maxReconnect})`)
    
    setTimeout(() => {
      this.connect()
    }, this.reconnectDelay * this.reconnectCount) // 指数退避
  }
  
  send(message: any) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message))
    } else {
      console.warn('WebSocket 未连接,消息发送失败')
    }
  }
  
  disconnect() {
    this.reconnectCount = this.maxReconnect // 阻止重连
    this.ws?.close()
  }
}

4.2 消息队列缓存

// internal/service/offline_message.go
​
package service
​
import (
    "go-chat/internal/model"
    "go-chat/pkg/logger"
    "encoding/json"
)
​
// OfflineMessageService 离线消息服务
type OfflineMessageService struct {
    redis *redis.Client
    db    *gorm.DB
}
​
// StoreOffline 存储离线消息
func (s *OfflineMessageService) StoreOffline(msg *websocket.Message) error {
    key := fmt.Sprintf("offline_msg:%d", msg.ToID)
    
    data, _ := json.Marshal(msg)
    
    // Redis 缓存(最新 100 条)
    s.redis.LPush(key, data)
    s.redis.LTrim(key, 0, 99)
    
    // 持久化到数据库
    dbMsg := &model.Message{
        SessionID: msg.SessionID,
        SenderID:  msg.FromID,
        ReceiverID: msg.ToID,
        Content:   msg.Content,
        Type:      int(msg.Type),
        Status:    0, // 未读
    }
    
    return s.db.Create(dbMsg).Error
}
​
// GetOfflineMessages 获取离线消息
func (s *OfflineMessageService) GetOfflineMessages(userID int64) ([]*websocket.Message, error) {
    key := fmt.Sprintf("offline_msg:%d", userID)
    
    data, err := s.redis.LRange(key, 0, -1).Result()
    if err != nil {
        return nil, err
    }
    
    messages := make([]*websocket.Message, 0, len(data))
    for _, item := range data {
        var msg websocket.Message
        if err := json.Unmarshal([]byte(item), &msg); err != nil {
            logger.Errorf("离线消息解析失败:%v", err)
            continue
        }
        messages = append(messages, &msg)
    }
    
    // 清除已读取的离线消息
    s.redis.Del(key)
    
    return messages, nil
}

五、前端对接

5.1 Vue3 WebSocket Hook

// web/src/hooks/useWebSocket.ts
​
import { ref, onMounted, onUnmounted } from 'vue'
​
export function useWebSocket() {
  const connected = ref(false)
  const messages = ref<any[]>([])
  let ws: WebSocket | null = null
  
  const connect = () => {
    const token = localStorage.getItem('token')
    ws = new WebSocket(`ws://localhost:8080/ws?token=${token}`)
    
    ws.onopen = () => {
      connected.value = true
      console.log('✅ WebSocket 已连接')
    }
    
    ws.onmessage = (event) => {
      const msg = JSON.parse(event.data)
      messages.value.push(msg)
      handleMessage(msg)
    }
    
    ws.onclose = () => {
      connected.value = false
      console.log('❌ WebSocket 已断开')
      // 自动重连
      setTimeout(connect, 3000)
    }
    
    ws.onerror = (error) => {
      console.error('WebSocket 错误:', error)
    }
  }
  
  const send = (type: string, data: any) => {
    if (ws && ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({ type, ...data }))
    }
  }
  
  const disconnect = () => {
    ws?.close()
  }
  
  onMounted(() => {
    connect()
  })
  
  onUnmounted(() => {
    disconnect()
  })
  
  return {
    connected,
    messages,
    send,
    disconnect
  }
}

5.2 聊天页面

<!-- web/src/views/Chat.vue -->
<template>
  <div class="chat-container">
    <!-- 消息列表 -->
    <div class="message-list">
      <div v-for="msg in messages" :key="msg.server_seq" 
           :class="['message', msg.from_id === userID ? 'self' : 'other']">
        <div class="content">{{ msg.content }}</div>
        <div class="time">{{ formatTime(msg.timestamp) }}</div>
      </div>
    </div>
    
    <!-- 输入框 -->
    <div class="input-area">
      <el-input 
        v-model="inputText"
        @keyup.enter="sendMessage"
        placeholder="输入消息..."
      />
      <el-button type="primary" @click="sendMessage">发送</el-button>
    </div>
  </div>
</template>
​
<script setup lang="ts">
import { ref } from 'vue'
import { useWebSocket } from '@/hooks/useWebSocket'
​
const { connected, send } = useWebSocket()
const inputText = ref('')
​
const sendMessage = () => {
  if (!inputText.value.trim()) return
  
  send('text', {
    to_id: targetUserId,
    content: inputText.value,
    client_seq: Date.now()
  })
  
  inputText.value = ''
}
</script>

六、性能优化

6.1 连接池

// 限制最大连接数
const maxConnections = 10000
​
func (h *Hub) RegisterClient(client *Client) {
    if len(h.Clients) >= maxConnections {
        logger.Warn("连接数已达上限")
        client.Conn.Close()
        return
    }
    h.Clients[client.UserID] = client
}

6.2 消息压缩

// 启用压缩
upgrader.EnableCompression = true
​
// 大消息压缩
func compressMessage(data []byte) []byte {
    if len(data) > 1024 { // 大于 1KB 才压缩
        var buf bytes.Buffer
        w := gzip.NewWriter(&buf)
        w.Write(data)
        w.Close()
        return buf.Bytes()
    }
    return data
}

七、总结

本篇完成了:

  • ✅ WebSocket 连接管理

  • ✅ 消息路由与分发

  • ✅ 断线重连机制

  • ✅ 离线消息存储

  • ✅ 前后端对接

下一篇预告:私聊与群聊功能 📨