目录
什么是WebSocket
定义
WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间可以建立持久连接,双方都可以随时发送数据,而不需要像HTTP那样每次通信都需要重新建立连接。
与HTTP的对比
WebSocket握手过程
客户端请求(HTTP Upgrade):
GET /ws HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Version: 13
服务端响应(101 Switching Protocols):
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=关闭状态码
1000- 正常关闭
1001- 终端离开(如浏览器导航到其他页面)
1006- 异常关闭(连接意外中断,未发送Close帧)
1008- 策略违反
1011- 服务器遇到异常
WebSocket在Go中的使用
1. 常用库
最流行的是 gorilla/websocket:
import "github.com/gorilla/websocket"2. 基本服务端实现
package main
import (
"log"
"net/http"
"github.com/gorilla/websocket"
)
// 配置Upgrader
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 允许所有跨域,生产环境应该限制
return true
},
}
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
// 升级HTTP连接为WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("upgrade failed: %v", err)
return
}
defer conn.Close()
// 读取消息循环
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Printf("read error: %v", err)
break
}
log.Printf("received: %s", message)
// 回写消息
if err := conn.WriteMessage(messageType, message); err != nil {
log.Printf("write error: %v", err)
break
}
}
}
func main() {
http.HandleFunc("/ws", handleWebSocket)
log.Fatal(http.ListenAndServe(":8080", nil))
}3. 连接管理(Hub模式)
实际项目中需要管理多个连接:
type Hub struct {
mu sync.RWMutex
users map[string]map[*websocket.Conn]struct{} // userID -> connections
}
func (h *Hub) Add(userID string, conn *websocket.Conn) {
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.users[userID]; !ok {
h.users[userID] = make(map[*websocket.Conn]struct{})
}
h.users[userID][conn] = struct{}{}
}
func (h *Hub) Remove(userID string, conn *websocket.Conn) {
h.mu.Lock()
defer h.mu.Unlock()
if conns, ok := h.users[userID]; ok {
delete(conns, conn)
if len(conns) == 0 {
delete(h.users, userID)
}
}
}
func (h *Hub) Publish(userID string, data []byte) error {
h.mu.RLock()
conns := h.users[userID]
h.mu.RUnlock()
for conn := range conns {
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
h.Remove(userID, conn)
conn.Close()
}
}
}4. 心跳保活机制
防止NAT超时和检测死连接:
func (h *Handler) RealtimeWS(c *gin.Context) {
conn, _ := upgrader.Upgrade(c.Writer, c.Request, nil)
defer conn.Close()
// 设置读超时和Pong处理器
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
// 启动心跳ticker
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
go func() {
for range ticker.C {
conn.WriteMessage(websocket.PingMessage, nil)
}
}()
// 读取循环...
}5. 关键注意事项
并发安全:WebSocket连接不是并发安全的,需要加锁
写超时:写入时必须设置超时,防止阻塞
优雅关闭:服务端关闭时发送Close帧,不要直接断连
连接泄漏:确保defer close,防止句柄泄漏
兜底策略Outbox模式
为什么需要兜底
WebSocket实时推送存在以下风险:
用户离线:推送时用户没有WebSocket连接
推送失败:网络抖动导致推送失败
服务重启:WebSocket连接丢失
Outbox模式原理
┌─────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 业务操作 │───>│ Outbox表 │───>│ 定时任务 │───>│ 推送服务 │
└─────────┘ └──────────┘ └──────────┘ └────┬─────┘
│
┌───────────────────────┘
▼
┌──────────────┐
│ WebSocket推送 │──── 成功 ────> 删除记录
└──────────────┘
│
└──── 失败 ────> 重试/标记死信数据库表设计
CREATE TABLE notify_outbox (
event_id VARCHAR(64) PRIMARY KEY, -- 事件唯一ID
user_id VARCHAR(64) NOT NULL, -- 目标用户
event_type VARCHAR(64) NOT NULL, -- 事件类型
payload JSONB NOT NULL, -- 消息内容
status VARCHAR(20) DEFAULT 'pending', -- pending/sent/failed/dead
retry_count INT DEFAULT 0, -- 重试次数
next_retry_at TIMESTAMP, -- 下次重试时间
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);Worker实现
func (w *OutboxWorker) Start(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.processBatch(ctx)
}
}
}
func (w *OutboxWorker) processBatch(ctx context.Context) {
// 1. 拉取待处理任务
jobs := w.repo.QueryPending(100)
for _, job := range jobs {
// 2. 尝试WebSocket推送
delivered, err := w.hub.Publish(job.UserID, job.Payload)
if delivered > 0 {
// 成功:标记为已发送
w.repo.MarkSent(job.EventID)
} else {
// 失败:指数退避重试
w.retryWithBackoff(job)
}
}
}指数退避策略
func backoffDuration(retryCount int) time.Duration {
d := time.Second * time.Duration(1 << retryCount)
if d > 10 * time.Minute {
return 10 * time.Minute
}
return d
}
// retry=0: 1s
// retry=1: 2s
// retry=2: 4s
// retry=3: 8s
// ...问题排查与解决实录
问题现象
用户A解绑情侣关系后,用户B的WebSocket立即断开,报错1006(异常关闭),且收到重复消息。
排查过程
第一步:查看日志
[Unbind] realtime push success: target_user=u_B event=evt_001 delivered=0
[WS] read error: user_id=u_B err=read tcp ... use of closed network connection关键发现:
delivered=0- 消息没有成功送达
use of closed network connection- 连接被关闭
第二步:分析代码
发现 Publish 方法写入时没有设置写超时:
// 问题代码
conn.WriteMessage(websocket.TextMessage, body) // 可能无限阻塞第三步:验证假设
解绑触发推送
WriteMessage阻塞等待(客户端没及时读取)
客户端(Apifox)超时后断开
服务端写入时发现连接已关闭 → 报错1006
解决方案
修复1:添加写超时
func (h *Hub) Publish(userID string, event RealtimeEvent) (int, error) {
for _, conn := range conns {
// 设置写超时,防止阻塞
_ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := conn.WriteJSON(event); err != nil {
h.Remove(userID, conn) // 只移除,不关闭
continue
}
}
}修复2:避免重复推送
原始逻辑问题:实时推送和Outbox兜底都发了消息
// 修复前:两者都执行
realtimeHub.Publish(...) // 发送第1次
outboxRepo.Enqueue(...) // Outbox再发第2次
// 修复后:只有实时失败才走兜底
delivered, _ := realtimeHub.Publish(...)
if delivered == 0 {
outboxRepo.Enqueue(...) // 兜底
}修复后验证
[WS] connection added to hub: user_id=u_B
[WS] connected ack sent: user_id=u_B
[Unbind] realtime push start: couple_id=xxx target_user=u_B
[Unbind] realtime push success: target_user=u_B delivered=1✅ 只收到1条消息
✅ WebSocket连接保持
核心代码与最佳实践
完整WebSocket Handler(生产级)
func (h *Handler) RealtimeWS(c *gin.Context) {
claims := authClaimsFromContext(c)
conn, err := h.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
h.hub.Add(claims.UserID, conn)
defer func() {
h.hub.Remove(claims.UserID, conn)
conn.Close()
}()
// 配置
conn.SetReadLimit(1024)
conn.SetReadDeadline(time.Now().Add(90 * time.Second))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(90 * time.Second))
return nil
})
// 发送连接确认
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
conn.WriteJSON(RealtimeEvent{EventType: "CONNECTED"})
// 心跳
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
readDone := make(chan struct{})
go func() {
defer close(readDone)
for {
if _, _, err := conn.ReadMessage(); err != nil {
return
}
}
}()
for {
select {
case <-readDone:
return
case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}推送兜底策略
func (h *Handler) Unbind(c *gin.Context) {
// 1. 执行解绑
relation, err := h.coupleRepo.Unbind(ctx, claims.UserID)
// 2. 尝试实时推送
delivered := 0
if h.realtimeHub != nil {
delivered, _ = h.realtimeHub.Publish(relation.PartnerUser, event)
}
// 3. 实时失败才入Outbox兜底
if delivered == 0 && h.outboxRepo != nil {
h.outboxRepo.Enqueue(ctx, event)
}
}总结
关键经验
WebSocket必须设置写超时,否则会导致连接异常关闭
实时推送和兜底策略要互斥,避免重复通知
完善的日志是排查问题的关键,每个关键节点都要记录
使用delivered字段判断推送结果,而不是仅看error
常用调试命令
# 使用wscat测试WebSocket
npm install -g wscat
wscat -c ws://localhost:8080/ws -H "Authorization: Bearer TOKEN"
# 浏览器控制台测试
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onmessage = e => console.log(e.data);