学习如何在Go语言中使用WebSocket

在Go语言中使用gorilla/websocket库来实现WebSocket

准备实战

当然学习技术还是用来解决实际问题的,所以在简单地了解了WebSocket的特性之后,就开始着手在项目中搭建一个小型WebSocket服务。

明确需求

项目需求比较明确,一共有两个接口要实现:

  1. 推送正在进行的实验列表数据 :一旦有实验开始或结束,立马推送消息给订阅该消息的前端。

  2. 推送正在进行的实验日志数据 :服务端每30秒轮询,判断前端所订阅的某个实验/某个用户的实验日志数据是否有更新,有更新则推送最新的日志数据。

开始动手

首先是选择了比较成熟的go语言WebSocket包—gorilla/websocket。websocket本质上是把一个http请求升级成长连接。

所以第一步是升级协议 Upgrader就是用来升级协议,同时可以指定缓冲区大小/超时时间/请求头检查等操作:

// WsServer 服务器结构体
type WsServer struct {
    listener net.Listener
    addr     string
    upgrade  *websocket.Upgrader
    APIs     map[string]Routers
    Conns    []*Connection
}

// NewWsServer 新建websocket服务器
func NewWsServer() *WsServer {
    ws := new(WsServer)
    ws.APIs = make(map[string]Routers)
    ws.addr = "0.0.0.0:10215"
    ws.upgrade = &websocket.Upgrader{
        ReadBufferSize:  4096,
        WriteBufferSize: 1024,
        CheckOrigin: func(r *http.Request) bool {
            if r.Method != "GET" {
                fmt.Println("method is not GET")
                return false
            }
            if _, ok := ws.APIs[r.URL.Path]; !ok {
                fmt.Println("path error")
                return false
            }
            return true
        },
    }
    go HandleConns(ws)
    return ws
}

然后启动HTTP服务 这一步骤想必都很熟悉了,无论是使用GIN框架还是原生HTTP服务都不陌生,实现Handler接口ServeHTTP方法,接收到连接后开始处理业务:

// Start 服务器开始运行
func (ws *WsServer) Start() (err error) {
    ws.listener, err = net.Listen("tcp", ws.addr)
    if err != nil {
        fmt.Println("net listen error:", err)
        return
    }
    fmt.Println("[Success] Start WebServer succ")
    err = http.Serve(ws.listener, ws)
    if err != nil {
        fmt.Println("http serve error:", err)
        return
    }

    return nil
}

// ServeHTTP 接收连接,并升级成websocket
func (ws *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    if _, ok := ws.APIs[r.URL.Path]; !ok {
        httpCode := http.StatusInternalServerError
        reasePhrase := http.StatusText(httpCode)
        fmt.Println("path error ", reasePhrase)
        http.Error(w, reasePhrase, httpCode)
        return
    }

    conn, err := ws.upgrade.Upgrade(w, r, nil)
    if err != nil {
        fmt.Println("websocket error:", err)
        return
    }
    fmt.Println("websocket client connect :", conn.RemoteAddr())
    go ws.connHandle(conn, r.URL.Path)

}

然后开始处理连接 完成接受消息/处理消息/判断掉线/发送消息等业务:

// ConnHandle 处理连接
func (ws *WsServer) connHandle(conn *websocket.Conn, url string) {
    Conn := &Connection{
        Conn:    conn,
        MsgChan: make(chan []byte, 10),
    }
    defer func() {
        conn.Close()
    }()
    ws.Conns = append(ws.Conns, Conn)
    stopCh := make(chan int)
    go ws.send(Conn, stopCh)
    for {
        //conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(5)))
        _, msg, err := conn.ReadMessage()
        if err != nil {
            close(stopCh)
            // 判断是不是超时
            if netErr, ok := err.(net.Error); ok {
                if netErr.Timeout() {
                    fmt.Printf("ReadMessage timeout remote: %v\n", conn.RemoteAddr())
                    return
                }
            }
            // 其他错误,如果是 1001 和 1000 就不打印日志
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
                fmt.Printf("ReadMessage other remote:%v error: %v \n", conn.RemoteAddr(), err)
            }
            return
        }
        //检查是否有该路由,有则执行Handle方法
        if handler, ok := ws.APIs[url]; ok {
            if !Conn.IsOn {
                go handler.Handle(Conn, msg)
            }
        }
    }
}

// 发送消息协程
func (ws *WsServer) send(conn *Connection, stopCh chan int) {

    for {
        select {
        case <-stopCh:
            fmt.Println("connect closed")
            return
        case msg := <-conn.MsgChan:
            err := conn.Conn.WriteMessage(conn.Type, msg)
            if err != nil {
                fmt.Println("send msg faild ", err)
                return
            }
        }
    }
}

最后就是实现具体的业务功能 在服务器开始时就启动了一个业务协程,用来检查是否有要推送的消息:

// HandleConns 处理消息推送
func HandleConns(ws *WsServer) {

    ticker := time.NewTicker(time.Second * 30)
    defer ticker.Stop()

    for {
        select {
        //房间信息一旦变动就推送给客户端
        case <-core.WorldMgr.RoomChan:
            rooms := core.WorldMgr.GetaAllRooms()
            data, _ := json.Marshal(serializer.BuildRooms(rooms, true))
            for _, v := range ws.Conns {
                if v.IsOn && v.Type == 1 {
                    v.MsgChan <- data
                }
            }

        //每30秒检查日志信息更新
        case <-ticker.C:
            for _, v := range ws.Conns {
                if v.IsOn && v.Type == 2 {
                    room := core.WorldMgr.GetRoomByID(int32(v.Client.RoomID))

                    //如果实验已中止则通知客户端
                    if room.IsOn == 0 {
                        v.MsgChan <- []byte("实验已结束")
                        continue
                    }

                    //若没有变化则不通知
                    if v.Client.Index == len(room.Experiment.RecordList)-1 {
                        continue
                    }
                    //具体业务省略
                }
            }
        }
    }
}

总结时间

至此基本上已经完成了一个能满足需求的Web服务,当然实现上可能比较简陋,性能和扩展性都还有待提高(比如后期接口数量增加/订阅者数量增加)后续应该增加协程池来完成多客户端的消息推送。总之不能仅仅以完成任务的心态来工作,希望自己能多以架构层面来考虑软件的结构,加油哦~

Licensed under CC BY-NC-SA 4.0
加载中...
感谢Jimmy 隐私政策