【GraphQL】使用ApolloServer v4实现Subscription的方法

首先,
在本次实现中,我们使用了GraphQL和ApolloServer v4来进行Subscription的功能开发。
(由于v4版本上的资料比较少,所以有些困难…)

我打算把它作为备忘录保存下来。

订阅是指

首先,简单来说,Subscription指的是当服务器发生注册或更新等事件时,可以立即更新(反映)数据的功能。这是Apollo Server可以确保结果一致性的特性。

这是一个类似于现实订阅的定期购买,可以通过将数据推送给已在服务器上注册订阅的客户端来即时更新数据。(如果看实际的代码,应该会更好理解)

引入图书馆。

请使用以下命令来安装所需的库。

yarn add graphql-ws ws @types/ws @graphql-tools/schema graphql-subscriptions cors @types/cors express 

激活订阅

首先,我们将创建HTTP服务器、WebSocket服务器和ApolloServer来处理订阅。 (由于这些内容是固定的,因此最好按照官方文档进行操作)

首先是完成版本的代码。

import 'dotenv/config'

import { ApolloServer } from '@apollo/server'
import { expressMiddleware } from '@apollo/server/express4'
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer'
import { GraphQLFileLoader } from '@graphql-tools/graphql-file-loader'
import { loadSchemaSync } from '@graphql-tools/load'
import { addResolversToSchema } from '@graphql-tools/schema'
import { PrismaClient } from '@prisma/client'
import bodyParser from 'body-parser'
import cors from 'cors'
import express from 'express'
import { PubSub } from 'graphql-subscriptions'
import { useServer } from 'graphql-ws/lib/use/ws'
import { createServer } from 'http'
import { join } from 'path'
import { WebSocketServer } from 'ws'

import { user } from './resolvers/Link'
import { login, post, singUp } from './resolvers/Mutation'
import { feed } from './resolvers/Query'
import { links } from './resolvers/User'
import type { Context } from './types/Context'
import { getUserId } from './utils'

const PORT = 4000
const pubsub = new PubSub()

const prisma = new PrismaClient()

const app = express()

const schema = loadSchemaSync(join(__dirname, './schema.graphql'), {
  loaders: [new GraphQLFileLoader()],
})

// リゾルバー関数
const resolvers = {
  Query: {
    feed: feed,
  },

  Mutation: {
    signUp: singUp,
    login: login,
    post: post,
  },

  Subscription: {
    newLink: {
      subscribe: () => pubsub.asyncIterator('NEW_LINK'),
    },
  },

  Link: {
    user: user,
  },

  User: {
    links: links,
  },
}

const schemaWithResolvers = addResolversToSchema({ schema, resolvers })

const httpServer = createServer(app)

const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
})

const serverCleanup = useServer({ schema: schemaWithResolvers }, wsServer)

const server = new ApolloServer<Context>({
  schema: schemaWithResolvers,
  plugins: [
    ApolloServerPluginDrainHttpServer({ httpServer }),

    {
      async serverWillStart() {
        return {
          async drainServer() {
            await serverCleanup.dispose()
          },
        }
      },
    },
  ],
})

;(async () => {
  try {
    await server.start()

    app.use(
      '/graphql',
      cors<cors.CorsRequest>(),
      bodyParser.json(),
      expressMiddleware(server, {
        context: async ({ req }) => ({
          ...req,
          prisma,
          pubsub,
          userId: req && req.headers.authorization ? getUserId(req) : undefined,
        }),
      })
    )

    httpServer.listen(PORT, () => {
      console.log(`? Query endpoint ready at http://localhost:${PORT}/graphql`)
      console.log(
        `? Subscription endpoint ready at ws://localhost:${PORT}/graphql`
      )
    })
  } catch (error) {
    console.error('Error starting server: ', error)
  }
})()

我們將按照順序作出解釋。

GraphQL的Schema和Resolver的定义

const PORT = 4000
const pubsub = new PubSub()

const prisma = new PrismaClient()

const app = express()

const schema = loadSchemaSync(join(__dirname, './schema.graphql'), {
  loaders: [new GraphQLFileLoader()],
})

// リゾルバー関数
const resolvers = {
  Query: {
    feed: feed,
  },

  Mutation: {
    signUp: singUp,
    login: login,
    post: post,
  },

  Subscription: {
    newLink: {
      subscribe: () => pubsub.asyncIterator('NEW_LINK'),
    },
  },

  Link: {
    user: user,
  },

  User: {
    links: links,
  },
}

由于PubSub实例将在后文进行解释,因此在这里省略不提。

这部分是在处理express的存储和模式解析器的定义,这是一种常见的处理方式。

顺便提一下,以下是模式结构。

type Query {
  feed: [Link]!
}

type Mutation {
  post(url: String!, description: String!): Link!
  signUp(email: String!, password: String!, name: String!): AuthPayload
  login(email: String!, password: String!): AuthPayload
}

type Subscription {
  newLink: Link
}

type Link {
  id: ID!
  description: String!
  url: String!
  user: User
}

type User {
  id: ID!
  name: String!
  email: String!
  links: [Link!]!
}

type AuthPayload {
  token: String
  user: User
}

在模式中,定义了一个Subscription的newLink被返回为Link类型。

关于订阅处理的详细说明稍后会提供。

创建一个WebSocket服务器的HTTP服务器

這是程式碼。

const schemaWithResolvers = addResolversToSchema({ schema, resolvers })

const httpServer = createServer(app)

const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
})

const serverCleanup = useServer({ schema: schemaWithResolvers }, wsServer)

首先,有一个与官方不同的地方。就是下面这个部分。
官方的代码中,schemaWithResolvers部分是这样的。

const schema = makeExecutableSchema({ typeDefs, resolvers });

因为我在使用外部文件的模式,所以我使用了addResolversToSchema这个函数。

在这里,可以理解为能够将模式(Schema)和解析器(Resolver)一起提供。

下一步,通过createServer方法创建HTTP服务器。
在这里将app作为参数传递,并使Express在HTTP服务器上启动。

接下来是WebSocket服务器,WebSocket服务器可以用作订阅服务器。此外,通过指定httpServer到服务器,可以在与HTTP服务器相同的网络端口上启动WebSocket服务器。换句话说,这意味着可以与HTTP服务器共存。

除此之外,为了与HTTP服务器的同一端点匹配,将路径传递到`/graphql`,并进行创建。

使用 useServer 可以通过 WebSocket 处理 GraphQL 的 Subscription。

创建一个 Apollo Server

接下来我们将启动Apollo Server。

const server = new ApolloServer<Context>({
  schema: schemaWithResolvers,
  plugins: [
    ApolloServerPluginDrainHttpServer({ httpServer }),

    {
      async serverWillStart() {
        return {
          async drainServer() {
            await serverCleanup.dispose()
          },
        }
      },
    },
  ],
})

在这里,我们传递了GraphQL模式和解析器。
重要的是插件。
插件可用于自定义Apollo Server的功能。在这里,我们关闭了HTTP服务器和WebSocket服务器,以便启动Apollo Server。

ApolloServerPluginDrainHttpServer是一个插件,用于在HTTP服务器关闭时等待进行中的请求完成。

在serverWillStart中,您可以定义在服务器启动之前要执行的逻辑。另外,在drainServer中,我们调用了serverClenaup.dispose(),以确保在服务器关闭时进行清理处理。

启动HTTP服务器

;(async () => {
  try {
    await server.start()

    app.use(
      '/graphql',
      cors<cors.CorsRequest>(),
      bodyParser.json(),
      expressMiddleware(server, {
        context: async ({ req }) => ({
          ...req,
          prisma,
          pubsub,
          userId: req && req.headers.authorization ? getUserId(req) : undefined,
        }),
      })
    )

    httpServer.listen(PORT, () => {
      console.log(`? Query endpoint ready at http://localhost:${PORT}/graphql`)
      console.log(
        `? Subscription endpoint ready at ws://localhost:${PORT}/graphql`
      )
    })
  } catch (error) {
    console.error('Error starting server: ', error)
  }
})()

首先,启动Apollo Server。
在此期间,为了不启动HTTP服务器和WebSocket服务器,执行先前定义的插件处理。

接下来,我们对HTTP服务器的端点进行了CORS的定义、JSON的使用声明以及上下文的定义。

Context是指在整个请求中可以定义可用的值。例如,在Mutation等操作中,可以通过将Context作为参数传递,并通过context.prisma获取,从而执行诸如context.prisma.create等数据库操作。

关于Context的详细解释,请参考另一篇文章。

【GraphQL】使用Apollo Server v4和Prisma在上下文中共享数据的方法

实施订阅处理程序

最开始出现在前面,但是在接下来的部分进行了处理。

Subscription: {
    newLink: {
      subscribe: () => pubsub.asyncIterator('NEW_LINK'),
    },
  },

通过将触发字符串传递给PubSub实例的asyncIterator,可以定义订阅。

在这里,我们定义了订阅者。
也就是说,在有数据注册或更新等事件发生时,我们会实时获取Link类型的数据。

实际上,这是订阅事件处理的执行代码。下面是post函数。

export const post = async (
  _: unknown,
  args: { description: string; url: string },
  context: Context
) => {
  const { userId } = context

  const newLink = await context.prisma.link.create({
    data: {
      url: args.url,
      description: args.description,
      user: { connect: { id: userId as number } },
    },
  })

  // サブスクリプション送信(第一引数:トリガー名 / 第二引数:渡したい値)
  context.pubsub.publish('NEW_LINK', { newLink })

  return newLink
}

通过将触发器名设为NEW_LINK并进行发布,newLink对象将传递给具有NEW_LINK订阅的订阅者,并能够实时获取数据时刻当newLink被注册时。(可以将其类比为将服务传递给订阅者的常见订阅方式)

context.pubsub.publish('NEW_LINK', { newLink })

确认动作

c0a7fa95ef8e4e59e66a56fea7923de4_AdobeExpress.gif

订阅功能的实现已完成。

参考文献 – Reference materials

在Apollo Server中的订阅
API参考:expressMiddleware
使用Apollo Server V4的订阅和GraphQL构建一个实时新闻API(TypeScript教程)

广告
将在 10 秒后关闭
bannerAds