第一次使用Golang加上WebSocket
首先
这次我打算使用Golang和WebSocket来创建一个简单的消息传送服务,顺带作为备忘录和信息共享的初次文章。
由于没有太深奥的话题,所以我还是个Golanger新手。但是我想说的是,我每天都因为渴望用Golang编写WebSocket服务器而无法入眠,所以这句话适用于那些处于相同情况的人。
样本代码 mǎ)
发送消息后,该服务将从服务器返回一条消息。这是一个链接地址:https://github.com/nekozuki-dev/go-websocket-sample。
通信数据格式
首先我们来确定客户端/服务器之间的通信格式。
这次我们将采用易于处理且代码简洁的JSON格式。
在id中放入消息的识别ID,在body中放入数据主体,以便在接收时知道应该调用哪个处理函数。
{ "id":"", "body":"" }
在进行正式的开发时,考虑将流化和分块化结合起来,可以使用字节数组,在前4个字节中放入用于识别的ID和数据大小。
此外,如果最开始使用JSON…但之后想要更加专注!的话,将负责编码的部分进行抽象化会更容易实现切换,这是一个不错的选择。
软件包架构
我尝试使用以下方式来构建本次开发的服务的包结构。
尽量使其扁平化,但在开发中到大型项目时,最好使用某种体系结构。
由于Golang对循环引用很敏感,所以我们要注意依赖关系的单向性,即external → app。
这样在阅读时,顺序为main → external → app,更易于阅读,真是令人愉快的事情。
.
├── app
│ ├── conn.go
│ ├── handlers.go
│ ├── packet.go
│ └── user.go
├── external
│ ├── wsservice
│ │ ├── conn.go
│ │ └── listener.go
│ └── router.go
└── main.go
讲解
我将从每个源代码中提取片段并进行解释。
wsservice.Listenner (WebSocket 服务监听器)
管理连接/断开连接的客户端,并调用回调函数的角色由HogeAsync担任。作为个人规则,需要进行互斥处理的变量被包含在HogeAsync中并嵌入到Hoge中,以便更容易进行确认。
type AcceptHandler func(Conn)
type CloseHandler func(Conn)
type Listener interface {
Run()
RegisterAcceptHandler(AcceptHandler)
RegisterCloseHandler(CloseHandler)
}
在Run函数中,开始接受来自客户端的连接。
当有连接到来时,通过http.HandleFunc(“/ws”, lis.handleConnection)调用handleConnection函数。
func (lis *listener) Run() {
http.HandleFunc("/ws", lis.handleConnection)
servAddr := fmt.Sprintf(":%d", lis.port)
fmt.Println("BeginListener", servAddr)
if err := http.ListenAndServe(servAddr, nil); err != nil {
panic(err)
}
}
func (lis *listener) handleConnection(w http.ResponseWriter, r *http.Request) {
ws, err := lis.upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("Error", err.Error())
return
}
defer lis.closeConnection(ws)
addr := ws.RemoteAddr().String()
fmt.Println("NewConnection", addr)
c := NewConn(ws)
lis.m.Lock()
lis.conns[ws] = c
lis.m.Unlock()
if lis.acceptHandler != nil {
lis.acceptHandler(c)
}
}
wsservice连接
WebSocket.Conn的封装器,负责进行消息的收发和WebSocket状态的管理。
type Conn interface {
Run(readCh chan []byte, closeCh chan bool)
Write([]byte)
Close()
}
启动一个可发送和接收消息的 Goroutine,并等待其被断开连接。
通过使用 sync.WaitGroup,确保所有的 Goroutine 结束后再退出函数。
func (c *conn) Run(readCh chan []byte, closeCh chan bool) {
c.wg = &sync.WaitGroup{}
c.writeCh = make(chan []byte)
// Wait Read Goroutine
errCh := make(chan error)
c.wg.Add(1)
go c.waitRead(readCh, errCh)
// Wait Write Groutine
c.wg.Add(1)
go c.waitWrite()
// Process
for {
select {
case <-errCh:
close(c.writeCh)
c.wg.Wait()
close(closeCh)
return
}
}
}
当发生错误时,通过Read/Write操作,我们会关闭Socket。
通过将值发送到WriteChanel,可以使Write操作在接收到值时运行,以防止处理过程被阻塞。
func (c *conn) waitWrite() {
defer c.wg.Done()
fmt.Println("Begin WaitWrite Goroutine.")
for bytes := range c.writeCh {
if err := c.ws.WriteMessage(websocket.TextMessage, bytes); err != nil {
fmt.Println("Error", err)
break
}
}
c.Close()
fmt.Println("End WaitWrite Goroutine.")
}
func (c *conn) waitRead(readCh chan []byte, errCh chan error) {
defer c.wg.Done()
fmt.Println("Begin WaitRead Goroutine.")
for {
_, readBytes, err := c.ws.ReadMessage()
if err != nil {
errCh <- err
break
}
readCh <- readBytes
}
c.Close()
fmt.Println("End WaitRead Goroutine.")
}
外部路由器 qì)
这个应用是用户的入口,并且在连接时还负责创建用户。
type Router interface {
Run(port int)
}
func (r *router) Run(port int) {
wsListener := wsservice.NewListener(port)
wsListener.RegisterAcceptHandler(r.OnAccept)
wsListener.RegisterCloseHandler(r.OnClose)
wsListener.Run()
}
func (r *router) OnAccept(c wsservice.Conn) {
fmt.Println("OnAccept")
u := app.NewUser(c)
u.Run()
}
func (r *router) OnClose(c wsservice.Conn) {
fmt.Println("OnClose")
}
应用程序的用户
这里会描述与客户端的通信以及接收时的处理。
在内部,我们有将wsservice.Conn接口化为app.Conn,并在实际通信时使用它。
通过不直接使用wsservice.Conn,将依赖关系设置为单向的外部 -> app。
type User interface {
Run()
Write(msgid uint16, body interface{})
}
当向wsservice.Conn传递的用于接收的信道收到数值后,调用了doHandler。
需要注意的是,如果在select内没有设置default,此处的处理将被阻塞,因此如果希望在for循环内执行各种处理等,务必要加上default。
func (u *user) Run() {
u.msgHandlers.Register(1, u.handleMessage)
readCh := make(chan []byte)
closeCh := make(chan bool)
go u.conn.Run(readCh, closeCh)
for {
select {
case bytes := <-readCh:
u.doHandler(bytes)
case <-closeCh:
fmt.Println("CloseUser")
return
default:
}
}
}
在数据发送时,我们使用json.Marshal将`struct`转换为字节数组,然后将其传递给`wsservice.Conn`的`Write`函数。
func (u *user) Write(msgid uint16, body interface{}) {
packet := &Packet{
ID: msgid,
Body: body,
}
bytes, err := json.Marshal(packet)
if err != nil {
u.conn.Close()
return
}
u.conn.Write(bytes)
}
如果接收到数据,将字节数组转换为数据包,并提取出识别ID和数据主体。
然后,从消息处理程序中获取回调并进行调用。
handleMessage函数将提供的数据主体转换为可用的数据包。
这次使用了下述的包。
mapstructure:github.com/mitchellh/mapstructure
func (u *user) doHandler(bytes []byte) error {
packet := &Packet{}
if err := json.Unmarshal(bytes, packet); err != nil {
return err
}
handler := u.msgHandlers.Get(packet.ID)
if handler != nil {
handler(packet.Body)
}
return nil
}
func (u *user) handleMessage(body interface{}) {
req := &MessagePacket{}
if err := mapstructure.Decode(body, req); err != nil {
fmt.Println(err.Error())
return
}
fmt.Println(req.Msg)
res := &MessagePacket{
Msg: "ばななをあげる",
}
u.Write(1, res)
}
应用程序处理器
以中国哥特字为键的映射将被用来管理接收数据的识别ID和回调函数的注册。
type MessageHandleFunc func(interface{})
type MessageHandlers interface {
Get(msgid uint16) MessageHandleFunc
Register(msgid uint16, handler MessageHandleFunc)
Unregister(msgid uint16)
}
func NewMessageHandlers() MessageHandlers {
return &messageHandlers{
handlers: make(map[uint16]MessageHandleFunc),
}
}
type messageHandlers struct {
handlers map[uint16]MessageHandleFunc
}
func (m *messageHandlers) Get(msgid uint16) MessageHandleFunc {
return m.handlers[msgid]
}
func (m *messageHandlers) Register(msgid uint16, handler MessageHandleFunc) {
m.handlers[msgid] = handler
}
func (m *messageHandlers) Unregister(msgid uint16) {
delete(m.handlers, msgid)
}
应用程序的数据包
我正在进行传输和接收数据的结构定义。
由于这次使用JSON进行交互,我将指定JSON键并将其属性与之匹配。
type (
Packet struct {
ID uint16 `json:"id"`
Body interface{} `json:"body"`
}
MessagePacket struct {
Msg string `json:"msg"`
}
)
主要.go
创建路由器,并仅接受指定端口的连接。
func main() {
router := external.NewRouter()
router.Run(9080)
}
客户端
给别人香蕉,自己也能得到香蕉。
<html lang="ja">
<head>
<meta charset="UTF-8">
<title>WebSocketSample</title>
</head>
<script>
var sock = new WebSocket('ws://127.0.0.1:9080/ws');
var send = function(msgid, body) {
var packet = {
'id': msgid,
'body': body
};
var json = JSON.stringify(packet)
sock.send(json)
};
sock.addEventListener('open', function(e) {
console.log('Connect success.')
document.getElementById('banana').addEventListener('click',function(e) {
var msg = {
'msg': 'ばななをあげる'
};
send(1, msg)
});
});
sock.addEventListener('close', function(e) {
console.log('Connect close.')
});
sock.addEventListener('message', function(e) {
var json = JSON.parse(e.data)
var msgid = json.id;
var body = json.body;
if (msgid == 1) {
console.log(body.msg);
}
});
</script>
<body>
<input type="button" id="banana" value="バナナを送る" />
</body>
</html>
结束了
我希望Golanger能够遍布全世界。那么,再见。