使用GraphQL Subscriptions和Redis PubSub来创建实时聊天服务器

首先

wt4f2-5jcn0.gif

在这篇文章中,我们将主要介绍GraphQL的实现重点。源代码已经上传至GitHub,所以请您也一并查看。如果您查看README.md,您可以立即建立一个服务器,所以也许您可以先试一试。

undefined

架构

architecture_image.png

前端和后端之间使用GraphQL进行通信。
发送消息和创建用户使用普通的GraphQL Mutation,接收消息使用在Websocket上运行的GraphQL Subscriptions。

另外,我們使用Redis的PubSub功能來傳遞訊息。這是為了防止當GraphQL伺服器擴展時,聊天室被分割成不同的部分。

GraphQL Subscriptions是一种实时数据传输协议。

我认为很多人听说过GraphQL。
GraphQL Subscriptions是一种PubSub通信机制,在Websocket上按照GraphQL模式进行数据交换。

与使用原始的WebSocket通信相比,gRPC等其他协议,它具有类型安全的通信能力,并且更易于在浏览器中使用。

订阅服务 | Apollo Client

Redis PubSub 是一种消息传递机制。

Redis被广泛用作内存键值存储系统,并拥有发布订阅功能。
为了实现GraphQL服务器之间的同步,我们利用了发布订阅功能。

我最近才知道Redis的PubSub功能,但大型的平台如LINE LIVE的聊天服务器似乎也在使用Redis的PubSub功能。
LINE LIVE的聊天功能在处理每分钟超过30,000条评论投稿之前不会采用这个功能。

执行

这次我们将使用Golang来实现GraphQL服务器。

定义GraphQL的模式

首先,我们将定义GraphQL的模式。

type Message {
    user: String!
    message: String!
}

type Mutation {
    #チャットを投稿する
    postMessage(user: String!, message: String!): Message 
    #ユーザーを作成する(作成されていないユーザーではメッセージの受信や投稿が出来ない)
    createUser(user: String!): String! 
}

type Query {
    users: [String!]!
}

# この部分でSubscriptionを定義しています。
# 引数を指定してSubscribeをすると、サーバーからPublishされるデータを受け取れます。
type Subscription {
    messagePosted(user: String!): Message!
    userJoined(user: String!): String!
}

从模式中创建接口

实现GraphQL的Resolver(处理请求的部分)从头开始是很困难的。因此,我们可以使用一个工具,将GraphQL的模式输出为Golang的interface。这次我们将使用支持GraphQL Subscriptions的gqlgen工具。

go get -u github.com/99designs/gqlgen/cmd

首先,创建一个用于运行gqlgen的脚本。

package main

import "github.com/99designs/gqlgen/cmd"

func main() {
    cmd.Execute()
}

接下来,执行该脚本。

cd grapqh
go run scripts/gqlgen.go init

然后会生成一些文件。最重要的是resolver.go。
刚生成的文件中只有一个引发panic()的基本实现,所以我们要将其按照接口要求进行实现。

实现Resolver

undefined

生成实体

type Resolver struct {
    redisClient     *redis.Client //redisへアクセスするのに使用
    messageChannels map[string]chan Message
    userChannels    map[string]chan string
    mutex           sync.Mutex
}

func newResolver(redisClient *redis.Client) *Resolver {
    return &Resolver{
        redisClient:     redisClient,
        messageChannels: map[string]chan Message{},
        userChannels:    map[string]chan string{},
        mutex:           sync.Mutex{},
    }
}

因为每个用户的连接都要通过 channels 进行管理,所以我们将在结构体中添加一个 channels 的映射关系。

实施留言发布的功能

type mutationResolver struct{ *Resolver }

func (r *mutationResolver) PostMessage(ctx context.Context, user string, message string) (*Message, error) {
    isLogined, err := r.checkLogin(user)
    if !isLogined {
        return nil, errors.New("This user does not exists")
    }
    // ユーザー情報はAFK(Away From Keyboard)対策で60minで削除されるようにしている。
    // メッセージの投稿を行った場合はExpireまでの時間をリセットする。
    val, err := r.redisClient.SetXX(user, user, 60*time.Minute).Result()
    if val == false {
        return nil, errors.New("This user does not exists")
    }

    // 以下の部分で、[]byteに変換したMessageをredisのPubSubで配信しています。
    m := Message{
        User:    user,
        Message: message,
    }
    mb, err := json.Marshal(m)
    r.redisClient.Publish("room", mb)
    return &m, nil
}

实现订阅服务

type subscriptionResolver struct{ *Resolver }

func (r *subscriptionResolver) MessagePosted(ctx context.Context, user string) (<-chan Message, error) {
    isLogined, err := r.checkLogin(user)
    if !isLogined {
        return nil, errors.New("This user has not been created")
    }

    messageChan := make(chan Message, 1)
    r.mutex.Lock()
    r.messageChannels[user] = messageChan
    r.mutex.Unlock()

    go func() {
        <-ctx.Done()
        r.mutex.Lock()
        delete(r.messageChannels, user)
        r.mutex.Unlock()
        r.redisClient.Del(user)
    }()
    return messageChan, nil
}

需要为每个用户生成MessagePosted频道并返回。
将数据流入该频道后,会向每个用户分发数据,因此我们将在Resolver中保持该频道,以便以后使用。

另外,在goroutine中,我们还描述了当连接断开时的处理方法。我们会删除不再需要的通道和Redis用户数据。

接收由Redis发布的消息

func (r *Resolver) startSubscribingRedis() {
    go func() {
        pubsub := r.redisClient.Subscribe("room")
        defer pubsub.Close()

        for {
            msgi, err := pubsub.Receive()
            switch msg := msgi.(type) {
            case *redis.Message:
                // Convert recieved string to Message.
                m := Message{}
                if err := json.Unmarshal([]byte(msg.Payload), &m); err != nil {
                    log.Println(err)
                    continue
                }
                // Notify new message.
                r.mutex.Lock()
                for _, ch := range r.messageChannels {
                    ch <- m
                }
                r.mutex.Unlock()
            default:
            }
        }
    }()
}

如果从Redis接收到的消息是Message类型,就将其转换为结构体并流入通道中。
在流入通道之后,库会帮助客户端发送数据(非常方便?)。

创建服务器

当运行 scripts/gqlgen.go init 命令时,将自动创建 graphql/server/server.go 文件,但这次我们已将其删除并重新编写到另一个位置。

因为我希望将”package main”放在项目根目录下。

type GraphQLServer struct {
    redisClient *redis.Client
}

// NewGraphQLServer returns GraphQL server.
func NewGraphQLServer(redisClient *redis.Client) *GraphQLServer {
    return &GraphQLServer{
        redisClient: redisClient,
    }
}

// Serve starts GraphQL server.
func (s *GraphQLServer) Serve(route string, port int) error {
    mux := http.NewServeMux()
    mux.Handle(
        route,
        handler.GraphQL(graphql.NewExecutableSchema(graphql.NewGraphQLConfig(s.redisClient)),
            handler.WebsocketUpgrader(websocket.Upgrader{
                CheckOrigin: func(r *http.Request) bool {
                    return true
                },
            }),
        ),
    )
    mux.Handle("/", handler.Playground("GraphQL playground", route))
    handler := cors.AllowAll().Handler(mux)
    return http.ListenAndServe(fmt.Sprintf(":%d", port), handler)
}

type config struct {
    RedisURL string `envconfig:"REDIS_URL"`
    Port     int    `envconfig:"PORT"`
}

func main() {
    var config config
    err := envconfig.Process("", &config)

    client, err := infrastructure.NewRedisClient(config.RedisURL)
    defer client.Close()

    s := infrastructure.NewGraphQLServer(client)
    log.Fatal(s.Serve("/query", config.Port))
}

创建每个结构体并启动服务器。

使用docker-compose启动GraphQL服务器和Redis服务器。

最终,使用Docker来启动服务器。

FROM golang:1.11.4

WORKDIR /go/src/github.com/p1ass/graphql-redis-realtime-chat
COPY . .
ENV GO111MODULE=on

RUN go get github.com/pilu/fresh
version: '3'
volumes:
    unsync:
services:
    api:
        build: .
        volumes:
            - ./:/go/src/github.com/p1ass/graphql-redis-realtime-chat
            - unsync:/go/src/github.com/p1ass/graphql-redis-realtime-chat/frontend
        ports:
            - '8080:8080'
        depends_on:
            - redis

        command: fresh

        environment:
            REDIS_URL: 'redis:6379'
            PORT: '8080'
    redis:
        image: redis:latest
        ports:
            - '6379:6379'

我们使用fresh实现了在热重载环境中进行开发的能力。此外,为了防止挂载无用的目录,我们创建了一个名为unsync的虚拟卷。在此之后,一旦启动就完成了。

docker-compose up

在浏览器中打开http://localhost:【PORT】,GraphQL Playground将会开启,可用于尝试Query、Mutation和Subscription。

额外内容:创建Nuxt.js的客户端。

在GraphQL Playground上可以尝试包括订阅在内的所有内容,但为了使其更像一个Web应用程序,我使用Nuxt.js创建了一个客户端。

我们参考了以下文章来实现。
使用 GraphQL 和 Nuxt.js 创建聊天载体。

实际上,几乎所有实施方法都相同,但还有一个稍有不同的地方需要介绍一下。

简单订阅

简单订阅 | Vue Apollo

GraphQL的订阅(Subscription)经常用于与另一个查询(Query)交换相同类型的数据。
但是,这次的postMessage并没有与任何查询相关联。
在这种情况下,我们将不使用常规的subscribeToMore来实现,而是使用$subscribe来实现。

import SMessagePosted from '@/apollo/subscriptions/messagePosted.gql'

apollo: {
    // Queryに紐付いているSubscription
    users: {
        query: QUsers,
        subscribeToMore: {
            document: SUserJoined,
            variables() {
                return {
                    user: this.user
                }
            },
            updateQuery: (prev, { subscriptionData }) => {
                // do something
            }
        }
    },

    // Queryに紐付いていないSubscription
    $subscribe: {
        messagePosted: {
            query: SMessagePosted,
            variables() {
                return {
                    user: this.user
                }
            },
            result(res) {
                this.messages.unshift(res.data.messagePosted)
            }
        }
    }
}

总结

    • GraphQL Subscriptionsを使うと、型に沿ったリアルタイム通信ができる

 

    • Redis PubSubを使うと複数台サーバーの時にデータの同期ができる

 

    Queryと紐付かないSubscriptionを作成するときはクライアントの実装に注意しよう(vue-apolloの場合)

请参考

    • Real-time Chat with GraphQL Subscriptions in Go

 

    • GoとRedisにおける簡単なチャットアプリケーション

 

    • Redis の Pub/Sub を使って Node.js + WebSocket のスケールアウトを実現する方法

 

    • Apollo inside of NuxtJS

 

    • GraphQL と Nuxt.js でチャットを作る

 

    DockerでVolumeをマウントするとき一部を除外する方法
广告
将在 10 秒后关闭
bannerAds