第一次使用Golang加上WebSocket

首先

这次我打算使用Golang和WebSocket来创建一个简单的消息传送服务,顺带作为备忘录和信息共享的初次文章。

由于没有太深奥的话题,所以我还是个Golanger新手。但是我想说的是,我每天都因为渴望用Golang编写WebSocket服务器而无法入眠,所以这句话适用于那些处于相同情况的人。

样本代码 mǎ)

发送消息后,该服务将从服务器返回一条消息。这是一个链接地址:https://github.com/nekozuki-dev/go-websocket-sample。

通信数据格式

首先我们来确定客户端/服务器之间的通信格式。
这次我们将采用易于处理且代码简洁的JSON格式。
在id中放入消息的识别ID,在body中放入数据主体,以便在接收时知道应该调用哪个处理函数。

{ "id":"", "body":"" }

在进行正式的开发时,考虑将流化和分块化结合起来,可以使用字节数组,在前4个字节中放入用于识别的ID和数据大小。

MSGIDBODY SIZEBODY BYTES2 bytes2 bytesn bytes識別IDデータ本体の長さデータ本体

此外,如果最开始使用JSON…但之后想要更加专注!的话,将负责编码的部分进行抽象化会更容易实现切换,这是一个不错的选择。

软件包架构

我尝试使用以下方式来构建本次开发的服务的包结构。
尽量使其扁平化,但在开发中到大型项目时,最好使用某种体系结构。
由于Golang对循环引用很敏感,所以我们要注意依赖关系的单向性,即external → app。
这样在阅读时,顺序为main → external → app,更易于阅读,真是令人愉快的事情。

.
├── app
│   ├── conn.go
│   ├── handlers.go
│   ├── packet.go
│   └── user.go
├── external
│   ├── wsservice
│   │   ├── conn.go
│   │   └── listener.go
│   └── router.go
└── main.go

讲解

我将从每个源代码中提取片段并进行解释。

wsservice.Listenner (WebSocket 服务监听器)

管理连接/断开连接的客户端,并调用回调函数的角色由HogeAsync担任。作为个人规则,需要进行互斥处理的变量被包含在HogeAsync中并嵌入到Hoge中,以便更容易进行确认。

type AcceptHandler func(Conn)
type CloseHandler func(Conn)

type Listener interface {
    Run()
    RegisterAcceptHandler(AcceptHandler)
    RegisterCloseHandler(CloseHandler)
}

在Run函数中,开始接受来自客户端的连接。
当有连接到来时,通过http.HandleFunc(“/ws”, lis.handleConnection)调用handleConnection函数。

func (lis *listener) Run() {
    http.HandleFunc("/ws", lis.handleConnection)

    servAddr := fmt.Sprintf(":%d", lis.port)
    fmt.Println("BeginListener", servAddr)
    if err := http.ListenAndServe(servAddr, nil); err != nil {
        panic(err)
    }
}
func (lis *listener) handleConnection(w http.ResponseWriter, r *http.Request) {
    ws, err := lis.upgrader.Upgrade(w, r, nil)
    if err != nil {
        fmt.Println("Error", err.Error())
        return
    }
    defer lis.closeConnection(ws)

    addr := ws.RemoteAddr().String()
    fmt.Println("NewConnection", addr)

    c := NewConn(ws)
    lis.m.Lock()
    lis.conns[ws] = c
    lis.m.Unlock()

    if lis.acceptHandler != nil {
        lis.acceptHandler(c)
    }
}

wsservice连接

WebSocket.Conn的封装器,负责进行消息的收发和WebSocket状态的管理。

type Conn interface {
    Run(readCh chan []byte, closeCh chan bool)
    Write([]byte)
    Close()
}

启动一个可发送和接收消息的 Goroutine,并等待其被断开连接。
通过使用 sync.WaitGroup,确保所有的 Goroutine 结束后再退出函数。

func (c *conn) Run(readCh chan []byte, closeCh chan bool) {
    c.wg = &sync.WaitGroup{}
    c.writeCh = make(chan []byte)

    // Wait Read Goroutine
    errCh := make(chan error)
    c.wg.Add(1)
    go c.waitRead(readCh, errCh)

    // Wait Write Groutine
    c.wg.Add(1)
    go c.waitWrite()

    // Process
    for {
        select {
        case <-errCh:
            close(c.writeCh)
            c.wg.Wait()

            close(closeCh)
            return
        }
    }
}

当发生错误时,通过Read/Write操作,我们会关闭Socket。

通过将值发送到WriteChanel,可以使Write操作在接收到值时运行,以防止处理过程被阻塞。

func (c *conn) waitWrite() {
    defer c.wg.Done()

    fmt.Println("Begin WaitWrite Goroutine.")
    for bytes := range c.writeCh {
        if err := c.ws.WriteMessage(websocket.TextMessage, bytes); err != nil {
            fmt.Println("Error", err)
            break
        }
    }
    c.Close()
    fmt.Println("End WaitWrite Goroutine.")
}

func (c *conn) waitRead(readCh chan []byte, errCh chan error) {
    defer c.wg.Done()

    fmt.Println("Begin WaitRead Goroutine.")
    for {
        _, readBytes, err := c.ws.ReadMessage()
        if err != nil {
            errCh <- err
            break
        }
        readCh <- readBytes
    }
    c.Close()
    fmt.Println("End WaitRead Goroutine.")
}

外部路由器 qì)

这个应用是用户的入口,并且在连接时还负责创建用户。

type Router interface {
    Run(port int)
}
func (r *router) Run(port int) {
    wsListener := wsservice.NewListener(port)
    wsListener.RegisterAcceptHandler(r.OnAccept)
    wsListener.RegisterCloseHandler(r.OnClose)
    wsListener.Run()
}

func (r *router) OnAccept(c wsservice.Conn) {
    fmt.Println("OnAccept")
    u := app.NewUser(c)
    u.Run()
}

func (r *router) OnClose(c wsservice.Conn) {
    fmt.Println("OnClose")
}

应用程序的用户

这里会描述与客户端的通信以及接收时的处理。
在内部,我们有将wsservice.Conn接口化为app.Conn,并在实际通信时使用它。
通过不直接使用wsservice.Conn,将依赖关系设置为单向的外部 -> app。

type User interface {
    Run()
    Write(msgid uint16, body interface{})
}

当向wsservice.Conn传递的用于接收的信道收到数值后,调用了doHandler。
需要注意的是,如果在select内没有设置default,此处的处理将被阻塞,因此如果希望在for循环内执行各种处理等,务必要加上default。

func (u *user) Run() {
    u.msgHandlers.Register(1, u.handleMessage)

    readCh := make(chan []byte)
    closeCh := make(chan bool)

    go u.conn.Run(readCh, closeCh)

    for {
        select {
        case bytes := <-readCh:
            u.doHandler(bytes)

        case <-closeCh:
            fmt.Println("CloseUser")
            return
        default:
        }
    }
}

在数据发送时,我们使用json.Marshal将`struct`转换为字节数组,然后将其传递给`wsservice.Conn`的`Write`函数。

func (u *user) Write(msgid uint16, body interface{}) {
    packet := &Packet{
        ID:   msgid,
        Body: body,
    }
    bytes, err := json.Marshal(packet)
    if err != nil {
        u.conn.Close()
        return
    }
    u.conn.Write(bytes)
}

如果接收到数据,将字节数组转换为数据包,并提取出识别ID和数据主体。
然后,从消息处理程序中获取回调并进行调用。
handleMessage函数将提供的数据主体转换为可用的数据包。

这次使用了下述的包。
mapstructure:github.com/mitchellh/mapstructure

func (u *user) doHandler(bytes []byte) error {
    packet := &Packet{}
    if err := json.Unmarshal(bytes, packet); err != nil {
        return err
    }

    handler := u.msgHandlers.Get(packet.ID)
    if handler != nil {
        handler(packet.Body)
    }
    return nil
}

func (u *user) handleMessage(body interface{}) {
    req := &MessagePacket{}
    if err := mapstructure.Decode(body, req); err != nil {
        fmt.Println(err.Error())
        return
    }
    fmt.Println(req.Msg)

    res := &MessagePacket{
        Msg: "ばななをあげる",
    }
    u.Write(1, res)
}

应用程序处理器

以中国哥特字为键的映射将被用来管理接收数据的识别ID和回调函数的注册。

type MessageHandleFunc func(interface{})

type MessageHandlers interface {
    Get(msgid uint16) MessageHandleFunc
    Register(msgid uint16, handler MessageHandleFunc)
    Unregister(msgid uint16)
}

func NewMessageHandlers() MessageHandlers {
    return &messageHandlers{
        handlers: make(map[uint16]MessageHandleFunc),
    }
}

type messageHandlers struct {
    handlers map[uint16]MessageHandleFunc
}

func (m *messageHandlers) Get(msgid uint16) MessageHandleFunc {
    return m.handlers[msgid]
}

func (m *messageHandlers) Register(msgid uint16, handler MessageHandleFunc) {
    m.handlers[msgid] = handler
}

func (m *messageHandlers) Unregister(msgid uint16) {
    delete(m.handlers, msgid)
}

应用程序的数据包

我正在进行传输和接收数据的结构定义。
由于这次使用JSON进行交互,我将指定JSON键并将其属性与之匹配。

type (
    Packet struct {
        ID   uint16      `json:"id"`
        Body interface{} `json:"body"`
    }

    MessagePacket struct {
        Msg string `json:"msg"`
    }
)

主要.go

创建路由器,并仅接受指定端口的连接。

func main() {
    router := external.NewRouter()
    router.Run(9080)
}

客户端

给别人香蕉,自己也能得到香蕉。

<html lang="ja">
    <head>
        <meta charset="UTF-8">
        <title>WebSocketSample</title>
    </head>
    <script>
        var sock = new WebSocket('ws://127.0.0.1:9080/ws');

        var send = function(msgid, body) {
            var packet = {
                'id': msgid,
                'body': body
            };
            var json = JSON.stringify(packet)
            sock.send(json)
        };

        sock.addEventListener('open', function(e) {
            console.log('Connect success.')
            document.getElementById('banana').addEventListener('click',function(e) {
                var msg = {
                    'msg': 'ばななをあげる'
                };
                send(1, msg)
            });
        });

        sock.addEventListener('close', function(e) {
            console.log('Connect close.')
        });

        sock.addEventListener('message', function(e) {
            var json = JSON.parse(e.data)
            var msgid = json.id;
            var body = json.body;
            if (msgid == 1) {
                console.log(body.msg);
            }
        });
    </script>
    <body>
        <input type="button" id="banana" value="バナナを送る" />
    </body>
</html>

结束了

我希望Golanger能够遍布全世界。那么,再见。

广告
将在 10 秒后关闭
bannerAds