使用GraphQL Subscriptions和Redis PubSub来创建实时聊天服务器
首先
在这篇文章中,我们将主要介绍GraphQL的实现重点。源代码已经上传至GitHub,所以请您也一并查看。如果您查看README.md,您可以立即建立一个服务器,所以也许您可以先试一试。
架构
前端和后端之间使用GraphQL进行通信。
发送消息和创建用户使用普通的GraphQL Mutation,接收消息使用在Websocket上运行的GraphQL Subscriptions。
另外,我們使用Redis的PubSub功能來傳遞訊息。這是為了防止當GraphQL伺服器擴展時,聊天室被分割成不同的部分。
GraphQL Subscriptions是一种实时数据传输协议。
我认为很多人听说过GraphQL。
GraphQL Subscriptions是一种PubSub通信机制,在Websocket上按照GraphQL模式进行数据交换。
与使用原始的WebSocket通信相比,gRPC等其他协议,它具有类型安全的通信能力,并且更易于在浏览器中使用。
订阅服务 | Apollo Client
Redis PubSub 是一种消息传递机制。
Redis被广泛用作内存键值存储系统,并拥有发布订阅功能。
为了实现GraphQL服务器之间的同步,我们利用了发布订阅功能。
我最近才知道Redis的PubSub功能,但大型的平台如LINE LIVE的聊天服务器似乎也在使用Redis的PubSub功能。
LINE LIVE的聊天功能在处理每分钟超过30,000条评论投稿之前不会采用这个功能。
执行
这次我们将使用Golang来实现GraphQL服务器。
定义GraphQL的模式
首先,我们将定义GraphQL的模式。
type Message {
user: String!
message: String!
}
type Mutation {
#チャットを投稿する
postMessage(user: String!, message: String!): Message
#ユーザーを作成する(作成されていないユーザーではメッセージの受信や投稿が出来ない)
createUser(user: String!): String!
}
type Query {
users: [String!]!
}
# この部分でSubscriptionを定義しています。
# 引数を指定してSubscribeをすると、サーバーからPublishされるデータを受け取れます。
type Subscription {
messagePosted(user: String!): Message!
userJoined(user: String!): String!
}
从模式中创建接口
实现GraphQL的Resolver(处理请求的部分)从头开始是很困难的。因此,我们可以使用一个工具,将GraphQL的模式输出为Golang的interface。这次我们将使用支持GraphQL Subscriptions的gqlgen工具。
go get -u github.com/99designs/gqlgen/cmd
首先,创建一个用于运行gqlgen的脚本。
package main
import "github.com/99designs/gqlgen/cmd"
func main() {
cmd.Execute()
}
接下来,执行该脚本。
cd grapqh
go run scripts/gqlgen.go init
然后会生成一些文件。最重要的是resolver.go。
刚生成的文件中只有一个引发panic()的基本实现,所以我们要将其按照接口要求进行实现。
实现Resolver
生成实体
type Resolver struct {
redisClient *redis.Client //redisへアクセスするのに使用
messageChannels map[string]chan Message
userChannels map[string]chan string
mutex sync.Mutex
}
func newResolver(redisClient *redis.Client) *Resolver {
return &Resolver{
redisClient: redisClient,
messageChannels: map[string]chan Message{},
userChannels: map[string]chan string{},
mutex: sync.Mutex{},
}
}
因为每个用户的连接都要通过 channels 进行管理,所以我们将在结构体中添加一个 channels 的映射关系。
实施留言发布的功能
type mutationResolver struct{ *Resolver }
func (r *mutationResolver) PostMessage(ctx context.Context, user string, message string) (*Message, error) {
isLogined, err := r.checkLogin(user)
if !isLogined {
return nil, errors.New("This user does not exists")
}
// ユーザー情報はAFK(Away From Keyboard)対策で60minで削除されるようにしている。
// メッセージの投稿を行った場合はExpireまでの時間をリセットする。
val, err := r.redisClient.SetXX(user, user, 60*time.Minute).Result()
if val == false {
return nil, errors.New("This user does not exists")
}
// 以下の部分で、[]byteに変換したMessageをredisのPubSubで配信しています。
m := Message{
User: user,
Message: message,
}
mb, err := json.Marshal(m)
r.redisClient.Publish("room", mb)
return &m, nil
}
实现订阅服务
type subscriptionResolver struct{ *Resolver }
func (r *subscriptionResolver) MessagePosted(ctx context.Context, user string) (<-chan Message, error) {
isLogined, err := r.checkLogin(user)
if !isLogined {
return nil, errors.New("This user has not been created")
}
messageChan := make(chan Message, 1)
r.mutex.Lock()
r.messageChannels[user] = messageChan
r.mutex.Unlock()
go func() {
<-ctx.Done()
r.mutex.Lock()
delete(r.messageChannels, user)
r.mutex.Unlock()
r.redisClient.Del(user)
}()
return messageChan, nil
}
需要为每个用户生成MessagePosted频道并返回。
将数据流入该频道后,会向每个用户分发数据,因此我们将在Resolver中保持该频道,以便以后使用。
另外,在goroutine中,我们还描述了当连接断开时的处理方法。我们会删除不再需要的通道和Redis用户数据。
接收由Redis发布的消息
func (r *Resolver) startSubscribingRedis() {
go func() {
pubsub := r.redisClient.Subscribe("room")
defer pubsub.Close()
for {
msgi, err := pubsub.Receive()
switch msg := msgi.(type) {
case *redis.Message:
// Convert recieved string to Message.
m := Message{}
if err := json.Unmarshal([]byte(msg.Payload), &m); err != nil {
log.Println(err)
continue
}
// Notify new message.
r.mutex.Lock()
for _, ch := range r.messageChannels {
ch <- m
}
r.mutex.Unlock()
default:
}
}
}()
}
如果从Redis接收到的消息是Message类型,就将其转换为结构体并流入通道中。
在流入通道之后,库会帮助客户端发送数据(非常方便?)。
创建服务器
当运行 scripts/gqlgen.go init 命令时,将自动创建 graphql/server/server.go 文件,但这次我们已将其删除并重新编写到另一个位置。
因为我希望将”package main”放在项目根目录下。
type GraphQLServer struct {
redisClient *redis.Client
}
// NewGraphQLServer returns GraphQL server.
func NewGraphQLServer(redisClient *redis.Client) *GraphQLServer {
return &GraphQLServer{
redisClient: redisClient,
}
}
// Serve starts GraphQL server.
func (s *GraphQLServer) Serve(route string, port int) error {
mux := http.NewServeMux()
mux.Handle(
route,
handler.GraphQL(graphql.NewExecutableSchema(graphql.NewGraphQLConfig(s.redisClient)),
handler.WebsocketUpgrader(websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}),
),
)
mux.Handle("/", handler.Playground("GraphQL playground", route))
handler := cors.AllowAll().Handler(mux)
return http.ListenAndServe(fmt.Sprintf(":%d", port), handler)
}
type config struct {
RedisURL string `envconfig:"REDIS_URL"`
Port int `envconfig:"PORT"`
}
func main() {
var config config
err := envconfig.Process("", &config)
client, err := infrastructure.NewRedisClient(config.RedisURL)
defer client.Close()
s := infrastructure.NewGraphQLServer(client)
log.Fatal(s.Serve("/query", config.Port))
}
创建每个结构体并启动服务器。
使用docker-compose启动GraphQL服务器和Redis服务器。
最终,使用Docker来启动服务器。
FROM golang:1.11.4
WORKDIR /go/src/github.com/p1ass/graphql-redis-realtime-chat
COPY . .
ENV GO111MODULE=on
RUN go get github.com/pilu/fresh
version: '3'
volumes:
unsync:
services:
api:
build: .
volumes:
- ./:/go/src/github.com/p1ass/graphql-redis-realtime-chat
- unsync:/go/src/github.com/p1ass/graphql-redis-realtime-chat/frontend
ports:
- '8080:8080'
depends_on:
- redis
command: fresh
environment:
REDIS_URL: 'redis:6379'
PORT: '8080'
redis:
image: redis:latest
ports:
- '6379:6379'
我们使用fresh实现了在热重载环境中进行开发的能力。此外,为了防止挂载无用的目录,我们创建了一个名为unsync的虚拟卷。在此之后,一旦启动就完成了。
docker-compose up
在浏览器中打开http://localhost:【PORT】,GraphQL Playground将会开启,可用于尝试Query、Mutation和Subscription。
额外内容:创建Nuxt.js的客户端。
在GraphQL Playground上可以尝试包括订阅在内的所有内容,但为了使其更像一个Web应用程序,我使用Nuxt.js创建了一个客户端。
我们参考了以下文章来实现。
使用 GraphQL 和 Nuxt.js 创建聊天载体。
实际上,几乎所有实施方法都相同,但还有一个稍有不同的地方需要介绍一下。
简单订阅
简单订阅 | Vue Apollo
GraphQL的订阅(Subscription)经常用于与另一个查询(Query)交换相同类型的数据。
但是,这次的postMessage并没有与任何查询相关联。
在这种情况下,我们将不使用常规的subscribeToMore来实现,而是使用$subscribe来实现。
import SMessagePosted from '@/apollo/subscriptions/messagePosted.gql'
apollo: {
// Queryに紐付いているSubscription
users: {
query: QUsers,
subscribeToMore: {
document: SUserJoined,
variables() {
return {
user: this.user
}
},
updateQuery: (prev, { subscriptionData }) => {
// do something
}
}
},
// Queryに紐付いていないSubscription
$subscribe: {
messagePosted: {
query: SMessagePosted,
variables() {
return {
user: this.user
}
},
result(res) {
this.messages.unshift(res.data.messagePosted)
}
}
}
}
总结
-
- GraphQL Subscriptionsを使うと、型に沿ったリアルタイム通信ができる
-
- Redis PubSubを使うと複数台サーバーの時にデータの同期ができる
- Queryと紐付かないSubscriptionを作成するときはクライアントの実装に注意しよう(vue-apolloの場合)
请参考
-
- Real-time Chat with GraphQL Subscriptions in Go
-
- GoとRedisにおける簡単なチャットアプリケーション
-
- Redis の Pub/Sub を使って Node.js + WebSocket のスケールアウトを実現する方法
-
- Apollo inside of NuxtJS
-
- GraphQL と Nuxt.js でチャットを作る
- DockerでVolumeをマウントするとき一部を除外する方法