准备实战
当然学习技术还是用来解决实际问题的,所以在简单地了解了WebSocket的特性之后,就开始着手在项目中搭建一个小型WebSocket服务。
明确需求
项目需求比较明确,一共有两个接口要实现:
-
推送正在进行的实验列表数据 :一旦有实验开始或结束,立马推送消息给订阅该消息的前端。
-
推送正在进行的实验日志数据 :服务端每30秒轮询,判断前端所订阅的某个实验/某个用户的实验日志数据是否有更新,有更新则推送最新的日志数据。
开始动手
首先是选择了比较成熟的go语言WebSocket包—gorilla/websocket。websocket本质上是把一个http请求升级成长连接。
所以第一步是升级协议 Upgrader就是用来升级协议,同时可以指定缓冲区大小/超时时间/请求头检查等操作:
// WsServer 服务器结构体
type WsServer struct {
listener net.Listener
addr string
upgrade *websocket.Upgrader
APIs map[string]Routers
Conns []*Connection
}
// NewWsServer 新建websocket服务器
func NewWsServer() *WsServer {
ws := new(WsServer)
ws.APIs = make(map[string]Routers)
ws.addr = "0.0.0.0:10215"
ws.upgrade = &websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
if r.Method != "GET" {
fmt.Println("method is not GET")
return false
}
if _, ok := ws.APIs[r.URL.Path]; !ok {
fmt.Println("path error")
return false
}
return true
},
}
go HandleConns(ws)
return ws
}
然后启动HTTP服务 这一步骤想必都很熟悉了,无论是使用GIN框架还是原生HTTP服务都不陌生,实现Handler接口ServeHTTP方法,接收到连接后开始处理业务:
// Start 服务器开始运行
func (ws *WsServer) Start() (err error) {
ws.listener, err = net.Listen("tcp", ws.addr)
if err != nil {
fmt.Println("net listen error:", err)
return
}
fmt.Println("[Success] Start WebServer succ")
err = http.Serve(ws.listener, ws)
if err != nil {
fmt.Println("http serve error:", err)
return
}
return nil
}
// ServeHTTP 接收连接,并升级成websocket
func (ws *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if _, ok := ws.APIs[r.URL.Path]; !ok {
httpCode := http.StatusInternalServerError
reasePhrase := http.StatusText(httpCode)
fmt.Println("path error ", reasePhrase)
http.Error(w, reasePhrase, httpCode)
return
}
conn, err := ws.upgrade.Upgrade(w, r, nil)
if err != nil {
fmt.Println("websocket error:", err)
return
}
fmt.Println("websocket client connect :", conn.RemoteAddr())
go ws.connHandle(conn, r.URL.Path)
}
然后开始处理连接 完成接受消息/处理消息/判断掉线/发送消息等业务:
// ConnHandle 处理连接
func (ws *WsServer) connHandle(conn *websocket.Conn, url string) {
Conn := &Connection{
Conn: conn,
MsgChan: make(chan []byte, 10),
}
defer func() {
conn.Close()
}()
ws.Conns = append(ws.Conns, Conn)
stopCh := make(chan int)
go ws.send(Conn, stopCh)
for {
//conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(5)))
_, msg, err := conn.ReadMessage()
if err != nil {
close(stopCh)
// 判断是不是超时
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
fmt.Printf("ReadMessage timeout remote: %v\n", conn.RemoteAddr())
return
}
}
// 其他错误,如果是 1001 和 1000 就不打印日志
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
fmt.Printf("ReadMessage other remote:%v error: %v \n", conn.RemoteAddr(), err)
}
return
}
//检查是否有该路由,有则执行Handle方法
if handler, ok := ws.APIs[url]; ok {
if !Conn.IsOn {
go handler.Handle(Conn, msg)
}
}
}
}
// 发送消息协程
func (ws *WsServer) send(conn *Connection, stopCh chan int) {
for {
select {
case <-stopCh:
fmt.Println("connect closed")
return
case msg := <-conn.MsgChan:
err := conn.Conn.WriteMessage(conn.Type, msg)
if err != nil {
fmt.Println("send msg faild ", err)
return
}
}
}
}
最后就是实现具体的业务功能 在服务器开始时就启动了一个业务协程,用来检查是否有要推送的消息:
// HandleConns 处理消息推送
func HandleConns(ws *WsServer) {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
//房间信息一旦变动就推送给客户端
case <-core.WorldMgr.RoomChan:
rooms := core.WorldMgr.GetaAllRooms()
data, _ := json.Marshal(serializer.BuildRooms(rooms, true))
for _, v := range ws.Conns {
if v.IsOn && v.Type == 1 {
v.MsgChan <- data
}
}
//每30秒检查日志信息更新
case <-ticker.C:
for _, v := range ws.Conns {
if v.IsOn && v.Type == 2 {
room := core.WorldMgr.GetRoomByID(int32(v.Client.RoomID))
//如果实验已中止则通知客户端
if room.IsOn == 0 {
v.MsgChan <- []byte("实验已结束")
continue
}
//若没有变化则不通知
if v.Client.Index == len(room.Experiment.RecordList)-1 {
continue
}
//具体业务省略
}
}
}
}
}
总结时间
至此基本上已经完成了一个能满足需求的Web服务,当然实现上可能比较简陋,性能和扩展性都还有待提高(比如后期接口数量增加/订阅者数量增加)后续应该增加协程池来完成多客户端的消息推送。总之不能仅仅以完成任务的心态来工作,希望自己能多以架构层面来考虑软件的结构,加油哦~