系列文章:Go 聊天室实战系列(4/6)
项目地址:https://gitee.com/gaogaohang/go-land_-learning
技术栈:Go 1.23 + Gin + PostgreSQL + Redis + Vue3
一、WebSocket 基础
1.1 WebSocket vs HTTP
1.2 WebSocket 握手流程
客户端 服务端
│ │
│ ───────── GET /ws ──────────► │
│ Upgrade: websocket │
│ Connection: Upgrade │
│ Sec-WebSocket-Key: xxx │
│ │
│ ◄──────── 101 Switching ────── │
│ Protocols: websocket │
│ Sec-WebSocket-Accept: yyy │
│ │
│ ════════ WebSocket 连接建立 ═════ │
│ │
│ ◄─────── [消息推送] ─────────► │
│ (双向通信) │二、连接管理器设计
2.1 核心结构
// internal/websocket/client.go
package websocket
import (
"time"
"github.com/gorilla/websocket"
)
// Client WebSocket 客户端连接
type Client struct {
Hub *Hub // 所属 Hub
Conn *websocket.Conn // WebSocket 连接
UserID int64 // 用户 ID
Send chan []byte // 发送队列
Heartbeat *time.Ticker // 心跳定时器
done chan struct{} // 结束信号
}
// Hub 连接管理中心
type Hub struct {
Clients map[int64]*Client // 在线用户 map
Register chan *Client // 注册通道
Unregister chan *Client // 注销通道
Broadcast chan *BroadcastMsg // 广播通道
}
// BroadcastMsg 广播消息
type BroadcastMsg struct {
Type string `json:"type"`
Data interface{} `json:"data"`
Exclude int64 `json:"exclude"` // 排除的用户 ID
}
// NewHub 创建 Hub
func NewHub() *Hub {
return &Hub{
Clients: make(map[int64]*Client),
Register: make(chan *Client),
Unregister: make(chan *Client),
Broadcast: make(chan *BroadcastMsg, 256),
}
}
// Run 启动 Hub
func (h *Hub) Run() {
for {
select {
case client := <-h.Register:
// 注册新连接
h.Clients[client.UserID] = client
case client := <-h.Unregister:
// 注销连接
if _, ok := h.Clients[client.UserID]; ok {
delete(h.Clients, client.UserID)
close(client.Send)
}
case msg := <-h.Broadcast:
// 广播消息
for userID, client := range h.Clients {
if msg.Exclude > 0 && userID == msg.Exclude {
continue // 排除指定用户
}
select {
case client.Send <- msg.Data:
default:
// 发送失败,关闭连接
close(client.Send)
delete(h.Clients, userID)
}
}
}
}
}2.2 客户端管理
// internal/websocket/client.go
const (
writeWait = 10 * time.Second // 写入超时
pongWait = 60 * time.Second // 读取超时
pingPeriod = (pongWait * 9) / 10 // 心跳间隔
maxMessageSize = 512 * 1024 // 最大消息 512KB
)
// Start 启动客户端
func (c *Client) Start() {
go c.readPump()
go c.writePump()
go c.startHeartbeat()
}
// readPump 读取消息
func (c *Client) readPump() {
defer func() {
c.Hub.Unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadLimit(maxMessageSize)
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure) {
logger.Errorf("WebSocket 错误:%v", err)
}
break
}
// 处理消息
c.Hub.HandleMessage(c, message)
}
}
// writePump 写入消息
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// Hub 关闭了通道
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.Conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// 批量发送
n := len(c.Send)
for i := 0; i < n; i++ {
w.Write([]byte{'\n'})
w.Write(<-c.Send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// startHeartbeat 心跳
func (c *Client) startHeartbeat() {
c.Heartbeat = time.NewTicker(30 * time.Second)
defer c.Heartbeat.Stop()
for {
select {
case <-c.Heartbeat.C:
// Redis 续期在线状态
redis.SetOnline(context.Background(), c.UserID)
case <-c.done:
return
}
}
}三、消息路由
3.1 消息类型定义
// internal/websocket/message.go
package websocket
// MessageType 消息类型
type MessageType string
const (
MsgTypeHeartbeat MessageType = "heartbeat" // 心跳
MsgTypeText MessageType = "text" // 文本消息
MsgTypeImage MessageType = "image" // 图片
MsgTypeSystem MessageType = "system" // 系统消息
MsgTypeAck MessageType = "ack" // 确认
MsgTypeOnline MessageType = "online" // 上线通知
MsgTypeOffline MessageType = "offline" // 下线通知
)
// Message 消息结构
type Message struct {
Type MessageType `json:"type"`
FromID int64 `json:"from_id"`
ToID int64 `json:"to_id,omitempty"`
SessionID int64 `json:"session_id,omitempty"`
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
ClientSeq int64 `json:"client_seq"` // 客户端序列号
ServerSeq int64 `json:"server_seq"` // 服务端序列号
}
// HandleMessage 处理消息
func (h *Hub) HandleMessage(client *Client, data []byte) {
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("消息解析失败:%v", err)
return
}
msg.FromID = client.UserID
msg.Timestamp = time.Now().UnixMilli()
switch msg.Type {
case MsgTypeHeartbeat:
// 心跳响应
h.sendToClient(client.UserID, Message{
Type: MsgTypeAck,
})
case MsgTypeText:
// 私聊消息
h.handlePrivateMessage(&msg)
case MsgTypeSystem:
// 系统消息
h.handleSystemMessage(&msg)
}
}
// handlePrivateMessage 处理私聊消息
func (h *Hub) handlePrivateMessage(msg *Message) {
// 1. 检查接收者是否在线
if targetClient, ok := h.Clients[msg.ToID]; ok {
// 在线:直接推送
h.sendToClient(msg.ToID, *msg)
} else {
// 离线:存储到数据库
storeOfflineMessage(msg)
}
// 2. 发送确认
h.sendToClient(msg.FromID, Message{
Type: MsgTypeAck,
ServerSeq: generateServerSeq(),
})
}
// sendToClient 发送消息给指定用户
func (h *Hub) sendToClient(userID int64, msg Message) {
client, ok := h.Clients[userID]
if !ok {
return
}
data, _ := json.Marshal(msg)
select {
case client.Send <- data:
default:
// 发送队列已满
logger.Warnf("用户 %d 发送队列已满", userID)
}
}3.2 WebSocket 路由
// internal/api/ws.go
package api
import (
"go-chat/internal/middleware"
"go-chat/internal/websocket"
"net/http"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有来源(生产环境需限制)
},
}
// WSConnect WebSocket 连接
// GET /ws
func WSConnect(c *gin.Context) {
userID, exists := c.Get("userID")
if !exists {
c.JSON(http.StatusUnauthorized, gin.H{"error": "未认证"})
return
}
// 升级为 WebSocket 连接
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Errorf("WebSocket 升级失败:%v", err)
return
}
// 创建客户端
client := &websocket.Client{
Hub: websocket.GlobalHub,
Conn: conn,
UserID: userID.(int64),
Send: make(chan []byte, 256),
done: make(chan struct{}),
}
// 注册到 Hub
client.Hub.Register <- client
// 启动客户端
client.Start()
// 发送上线通知
client.Hub.Broadcast <- &websocket.BroadcastMsg{
Type: "online",
Data: gin.H{"user_id": client.UserID},
Exclude: client.UserID,
}
}四、断线重连
4.1 客户端重连逻辑
// web/src/utils/websocket.ts
class WebSocketClient {
private ws: WebSocket | null = null
private reconnectCount = 0
private maxReconnect = 5
private reconnectDelay = 3000 // 3 秒
connect() {
const token = getToken()
const wsUrl = `ws://localhost:8080/ws?token=${token}`
this.ws = new WebSocket(wsUrl)
this.ws.onopen = () => {
console.log('WebSocket 已连接')
this.reconnectCount = 0
}
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data)
this.handleMessage(msg)
}
this.ws.onclose = () => {
console.log('WebSocket 已关闭')
this.attemptReconnect()
}
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error)
}
}
attemptReconnect() {
if (this.reconnectCount >= this.maxReconnect) {
console.error('重连次数已达上限')
return
}
this.reconnectCount++
console.log(`尝试重连 (${this.reconnectCount}/${this.maxReconnect})`)
setTimeout(() => {
this.connect()
}, this.reconnectDelay * this.reconnectCount) // 指数退避
}
send(message: any) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message))
} else {
console.warn('WebSocket 未连接,消息发送失败')
}
}
disconnect() {
this.reconnectCount = this.maxReconnect // 阻止重连
this.ws?.close()
}
}4.2 消息队列缓存
// internal/service/offline_message.go
package service
import (
"go-chat/internal/model"
"go-chat/pkg/logger"
"encoding/json"
)
// OfflineMessageService 离线消息服务
type OfflineMessageService struct {
redis *redis.Client
db *gorm.DB
}
// StoreOffline 存储离线消息
func (s *OfflineMessageService) StoreOffline(msg *websocket.Message) error {
key := fmt.Sprintf("offline_msg:%d", msg.ToID)
data, _ := json.Marshal(msg)
// Redis 缓存(最新 100 条)
s.redis.LPush(key, data)
s.redis.LTrim(key, 0, 99)
// 持久化到数据库
dbMsg := &model.Message{
SessionID: msg.SessionID,
SenderID: msg.FromID,
ReceiverID: msg.ToID,
Content: msg.Content,
Type: int(msg.Type),
Status: 0, // 未读
}
return s.db.Create(dbMsg).Error
}
// GetOfflineMessages 获取离线消息
func (s *OfflineMessageService) GetOfflineMessages(userID int64) ([]*websocket.Message, error) {
key := fmt.Sprintf("offline_msg:%d", userID)
data, err := s.redis.LRange(key, 0, -1).Result()
if err != nil {
return nil, err
}
messages := make([]*websocket.Message, 0, len(data))
for _, item := range data {
var msg websocket.Message
if err := json.Unmarshal([]byte(item), &msg); err != nil {
logger.Errorf("离线消息解析失败:%v", err)
continue
}
messages = append(messages, &msg)
}
// 清除已读取的离线消息
s.redis.Del(key)
return messages, nil
}五、前端对接
5.1 Vue3 WebSocket Hook
// web/src/hooks/useWebSocket.ts
import { ref, onMounted, onUnmounted } from 'vue'
export function useWebSocket() {
const connected = ref(false)
const messages = ref<any[]>([])
let ws: WebSocket | null = null
const connect = () => {
const token = localStorage.getItem('token')
ws = new WebSocket(`ws://localhost:8080/ws?token=${token}`)
ws.onopen = () => {
connected.value = true
console.log('✅ WebSocket 已连接')
}
ws.onmessage = (event) => {
const msg = JSON.parse(event.data)
messages.value.push(msg)
handleMessage(msg)
}
ws.onclose = () => {
connected.value = false
console.log('❌ WebSocket 已断开')
// 自动重连
setTimeout(connect, 3000)
}
ws.onerror = (error) => {
console.error('WebSocket 错误:', error)
}
}
const send = (type: string, data: any) => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type, ...data }))
}
}
const disconnect = () => {
ws?.close()
}
onMounted(() => {
connect()
})
onUnmounted(() => {
disconnect()
})
return {
connected,
messages,
send,
disconnect
}
}5.2 聊天页面
<!-- web/src/views/Chat.vue -->
<template>
<div class="chat-container">
<!-- 消息列表 -->
<div class="message-list">
<div v-for="msg in messages" :key="msg.server_seq"
:class="['message', msg.from_id === userID ? 'self' : 'other']">
<div class="content">{{ msg.content }}</div>
<div class="time">{{ formatTime(msg.timestamp) }}</div>
</div>
</div>
<!-- 输入框 -->
<div class="input-area">
<el-input
v-model="inputText"
@keyup.enter="sendMessage"
placeholder="输入消息..."
/>
<el-button type="primary" @click="sendMessage">发送</el-button>
</div>
</div>
</template>
<script setup lang="ts">
import { ref } from 'vue'
import { useWebSocket } from '@/hooks/useWebSocket'
const { connected, send } = useWebSocket()
const inputText = ref('')
const sendMessage = () => {
if (!inputText.value.trim()) return
send('text', {
to_id: targetUserId,
content: inputText.value,
client_seq: Date.now()
})
inputText.value = ''
}
</script>六、性能优化
6.1 连接池
// 限制最大连接数
const maxConnections = 10000
func (h *Hub) RegisterClient(client *Client) {
if len(h.Clients) >= maxConnections {
logger.Warn("连接数已达上限")
client.Conn.Close()
return
}
h.Clients[client.UserID] = client
}6.2 消息压缩
// 启用压缩
upgrader.EnableCompression = true
// 大消息压缩
func compressMessage(data []byte) []byte {
if len(data) > 1024 { // 大于 1KB 才压缩
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
w.Write(data)
w.Close()
return buf.Bytes()
}
return data
}七、总结
本篇完成了:
✅ WebSocket 连接管理
✅ 消息路由与分发
✅ 断线重连机制
✅ 离线消息存储
✅ 前后端对接
下一篇预告:私聊与群聊功能 📨