目录

  1. 什么是WebSocket

  1. WebSocket在Go中的使用

  1. 兜底策略Outbox模式

  1. 问题排查与解决实录

  1. 核心代码与最佳实践


什么是WebSocket

定义

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间可以建立持久连接,双方都可以随时发送数据,而不需要像HTTP那样每次通信都需要重新建立连接。

与HTTP的对比

特性

HTTP

WebSocket

连接方式

短连接,每次请求新建连接

长连接,一次握手后保持连接

通信模式

单向:客户端请求,服务端响应

双向:双方随时发送

实时性

差,需要轮询

好,服务器可主动推送

头部开销

大,每次请求都带完整头部

小,首次握手后数据帧轻量

适用场景

REST API、静态资源

实时聊天、股票行情、在线游戏

WebSocket握手过程

客户端请求(HTTP Upgrade):
GET /ws HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Version: 13

服务端响应(101 Switching Protocols):
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=

关闭状态码

  • 1000 - 正常关闭

  • 1001 - 终端离开(如浏览器导航到其他页面)

  • 1006 - 异常关闭(连接意外中断,未发送Close帧)

  • 1008 - 策略违反

  • 1011 - 服务器遇到异常


WebSocket在Go中的使用

1. 常用库

最流行的是 gorilla/websocket

import "github.com/gorilla/websocket"

2. 基本服务端实现

package main

import (
    "log"
    "net/http"
    "github.com/gorilla/websocket"
)

// 配置Upgrader
var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        // 允许所有跨域,生产环境应该限制
        return true
    },
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
    // 升级HTTP连接为WebSocket
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("upgrade failed: %v", err)
        return
    }
    defer conn.Close()

    // 读取消息循环
    for {
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Printf("read error: %v", err)
            break
        }
        log.Printf("received: %s", message)
        
        // 回写消息
        if err := conn.WriteMessage(messageType, message); err != nil {
            log.Printf("write error: %v", err)
            break
        }
    }
}

func main() {
    http.HandleFunc("/ws", handleWebSocket)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

3. 连接管理(Hub模式)

实际项目中需要管理多个连接:

type Hub struct {
    mu    sync.RWMutex
    users map[string]map[*websocket.Conn]struct{} // userID -> connections
}

func (h *Hub) Add(userID string, conn *websocket.Conn) {
    h.mu.Lock()
    defer h.mu.Unlock()
    if _, ok := h.users[userID]; !ok {
        h.users[userID] = make(map[*websocket.Conn]struct{})
    }
    h.users[userID][conn] = struct{}{}
}

func (h *Hub) Remove(userID string, conn *websocket.Conn) {
    h.mu.Lock()
    defer h.mu.Unlock()
    if conns, ok := h.users[userID]; ok {
        delete(conns, conn)
        if len(conns) == 0 {
            delete(h.users, userID)
        }
    }
}

func (h *Hub) Publish(userID string, data []byte) error {
    h.mu.RLock()
    conns := h.users[userID]
    h.mu.RUnlock()
    
    for conn := range conns {
        if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
            h.Remove(userID, conn)
            conn.Close()
        }
    }
}

4. 心跳保活机制

防止NAT超时和检测死连接:

func (h *Handler) RealtimeWS(c *gin.Context) {
    conn, _ := upgrader.Upgrade(c.Writer, c.Request, nil)
    defer conn.Close()
    
    // 设置读超时和Pong处理器
    conn.SetReadDeadline(time.Now().Add(60 * time.Second))
    conn.SetPongHandler(func(string) error {
        conn.SetReadDeadline(time.Now().Add(60 * time.Second))
        return nil
    })
    
    // 启动心跳ticker
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    go func() {
        for range ticker.C {
            conn.WriteMessage(websocket.PingMessage, nil)
        }
    }()
    
    // 读取循环...
}

5. 关键注意事项

  1. 并发安全:WebSocket连接不是并发安全的,需要加锁

  1. 写超时:写入时必须设置超时,防止阻塞

  1. 优雅关闭:服务端关闭时发送Close帧,不要直接断连

  1. 连接泄漏:确保defer close,防止句柄泄漏


兜底策略Outbox模式

为什么需要兜底

WebSocket实时推送存在以下风险:

  1. 用户离线:推送时用户没有WebSocket连接

  1. 推送失败:网络抖动导致推送失败

  1. 服务重启:WebSocket连接丢失

Outbox模式原理

┌─────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│ 业务操作 │───>│ Outbox表 │───>│ 定时任务 │───>│ 推送服务 │
└─────────┘    └──────────┘    └──────────┘    └────┬─────┘
                                                     │
                             ┌───────────────────────┘
                             ▼
                      ┌──────────────┐
                      │ WebSocket推送 │──── 成功 ────> 删除记录
                      └──────────────┘
                             │
                             └──── 失败 ────> 重试/标记死信

数据库表设计

CREATE TABLE notify_outbox (
    event_id VARCHAR(64) PRIMARY KEY,      -- 事件唯一ID
    user_id VARCHAR(64) NOT NULL,          -- 目标用户
    event_type VARCHAR(64) NOT NULL,       -- 事件类型
    payload JSONB NOT NULL,                -- 消息内容
    status VARCHAR(20) DEFAULT 'pending',  -- pending/sent/failed/dead
    retry_count INT DEFAULT 0,             -- 重试次数
    next_retry_at TIMESTAMP,               -- 下次重试时间
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

Worker实现

func (w *OutboxWorker) Start(ctx context.Context) {
    ticker := time.NewTicker(3 * time.Second)
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            w.processBatch(ctx)
        }
    }
}

func (w *OutboxWorker) processBatch(ctx context.Context) {
    // 1. 拉取待处理任务
    jobs := w.repo.QueryPending(100)
    
    for _, job := range jobs {
        // 2. 尝试WebSocket推送
        delivered, err := w.hub.Publish(job.UserID, job.Payload)
        
        if delivered > 0 {
            // 成功:标记为已发送
            w.repo.MarkSent(job.EventID)
        } else {
            // 失败:指数退避重试
            w.retryWithBackoff(job)
        }
    }
}

指数退避策略

func backoffDuration(retryCount int) time.Duration {
    d := time.Second * time.Duration(1 << retryCount)
    if d > 10 * time.Minute {
        return 10 * time.Minute
    }
    return d
}
// retry=0: 1s
// retry=1: 2s
// retry=2: 4s
// retry=3: 8s
// ...

问题排查与解决实录

问题现象

用户A解绑情侣关系后,用户B的WebSocket立即断开,报错1006(异常关闭),且收到重复消息。

排查过程

第一步:查看日志

[Unbind] realtime push success: target_user=u_B event=evt_001 delivered=0
[WS] read error: user_id=u_B err=read tcp ... use of closed network connection

关键发现:

  1. delivered=0 - 消息没有成功送达

  1. use of closed network connection - 连接被关闭

第二步:分析代码

发现 Publish 方法写入时没有设置写超时:

// 问题代码
conn.WriteMessage(websocket.TextMessage, body)  // 可能无限阻塞

第三步:验证假设

  1. 解绑触发推送

  1. WriteMessage 阻塞等待(客户端没及时读取)

  1. 客户端(Apifox)超时后断开

  1. 服务端写入时发现连接已关闭 → 报错1006

解决方案

修复1:添加写超时

func (h *Hub) Publish(userID string, event RealtimeEvent) (int, error) {
    for _, conn := range conns {
        // 设置写超时,防止阻塞
        _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
        
        if err := conn.WriteJSON(event); err != nil {
            h.Remove(userID, conn)  // 只移除,不关闭
            continue
        }
    }
}

修复2:避免重复推送

原始逻辑问题:实时推送和Outbox兜底都发了消息

// 修复前:两者都执行
realtimeHub.Publish(...)  // 发送第1次
outboxRepo.Enqueue(...)   // Outbox再发第2次

// 修复后:只有实时失败才走兜底
delivered, _ := realtimeHub.Publish(...)
if delivered == 0 {
    outboxRepo.Enqueue(...)  // 兜底
}

修复后验证

[WS] connection added to hub: user_id=u_B
[WS] connected ack sent: user_id=u_B
[Unbind] realtime push start: couple_id=xxx target_user=u_B
[Unbind] realtime push success: target_user=u_B delivered=1

✅ 只收到1条消息
✅ WebSocket连接保持


核心代码与最佳实践

完整WebSocket Handler(生产级)

func (h *Handler) RealtimeWS(c *gin.Context) {
    claims := authClaimsFromContext(c)
    
    conn, err := h.upgrader.Upgrade(c.Writer, c.Request, nil)
    if err != nil {
        return
    }
    
    h.hub.Add(claims.UserID, conn)
    defer func() {
        h.hub.Remove(claims.UserID, conn)
        conn.Close()
    }()
    
    // 配置
    conn.SetReadLimit(1024)
    conn.SetReadDeadline(time.Now().Add(90 * time.Second))
    conn.SetPongHandler(func(string) error {
        conn.SetReadDeadline(time.Now().Add(90 * time.Second))
        return nil
    })
    
    // 发送连接确认
    conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
    conn.WriteJSON(RealtimeEvent{EventType: "CONNECTED"})
    
    // 心跳
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    readDone := make(chan struct{})
    go func() {
        defer close(readDone)
        for {
            if _, _, err := conn.ReadMessage(); err != nil {
                return
            }
        }
    }()
    
    for {
        select {
        case <-readDone:
            return
        case <-ticker.C:
            conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

推送兜底策略

func (h *Handler) Unbind(c *gin.Context) {
    // 1. 执行解绑
    relation, err := h.coupleRepo.Unbind(ctx, claims.UserID)
    
    // 2. 尝试实时推送
    delivered := 0
    if h.realtimeHub != nil {
        delivered, _ = h.realtimeHub.Publish(relation.PartnerUser, event)
    }
    
    // 3. 实时失败才入Outbox兜底
    if delivered == 0 && h.outboxRepo != nil {
        h.outboxRepo.Enqueue(ctx, event)
    }
}

总结

关键经验

  1. WebSocket必须设置写超时,否则会导致连接异常关闭

  1. 实时推送和兜底策略要互斥,避免重复通知

  1. 完善的日志是排查问题的关键,每个关键节点都要记录

  1. 使用delivered字段判断推送结果,而不是仅看error

常用调试命令

# 使用wscat测试WebSocket
npm install -g wscat
wscat -c ws://localhost:8080/ws -H "Authorization: Bearer TOKEN"

# 浏览器控制台测试
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onmessage = e => console.log(e.data);