使用GAE Go和Datastore开发GraphQL API

我体验了使用Google Cloud Platform的托管服务来开发GraphQL API。由于对GraphQL还是初学者,所以我会记录下在实际实现过程中学到的关于GraphQL的知识。

使用了的技术

App Engine SE Go 1.11
Go 1.11が11月にβリリース
2nd genと呼ばれる次世代ランタイム

Cloud Datastore
スケーラビリティの高いNoSQLデータベース

Stackdriver
Stackdriver Trace for Goを使いたかったため使用

只要个人使用的话,可以免费使用。而且操作速度也很快!在GAE的第二代上,只要普通地实现服务器,不需要过多地意识到平台的事情。尽管如此,在需要的时候,只需要启动所需的部分,速度很快,非常好。

主要图书馆

使用的主要库如下:
– googleapis/google-cloud-go
GCP提供的SDK
– graphql-go/graphql
用于Golang的GraphQL库
– graph-gophers/dataloader
用于延迟解析器评估和批处理的库(如下所述)

据说Go语言的GraphQL库有两种主要的方法:
– 从代码生成模式(反射)生成模式
– 从模式生成代码

我想尝试两种方法,但这次我试的是前者。
不过实际尝试后,我觉得后者的方法更好。在Go语言中,代码生成是主流方式,而且很容易依赖于空接口和类型断言。
下次我想试试后者的方法(组合使用99designs/gqlgen和vektah/dataloaden)。
需要注意的是,在本文中,主要关注GraphQL和在GAE上的运行,所以不会深入讨论这些内容。

尝试制作了一些东西

代码已在GitHub上公开。

我选择将以下内容用中文进行本地化改写:

我选择以博客发布API为主题。这个主题没有特别大的意义。
在Datastore中,结构体的定义就成了模式。
Datastore没有外键这样的概念,所以通过给博客结构体添加一个与用户结构体ID字段相同的UserID字段来表示关联。
这是一个简单的一对多关系,用户可以拥有多个博客。

type User struct {
    ID    string `json:"id" datastore:"-"`
    Name  string `json:"name"`
    EMail string `json:"email"`
}

type Blog struct {
    ID        string    `json:"id" datastore:"-"`
    UserID    string    `json:"userId"`
    CreatedAt time.Time `json:"createdAt"`
    Title     string    `json:"title"`
    Content   string    `json:"content"`
}

type BlogList struct {
    Nodes      []Blog `json:"nodes"`
    TotalCount int    `json:"totalCount"`
}

开发过程

以下是一个简要的步骤:
1. 定义架构
2. 实现解析器
3. 实现终端(服务器)

那么,我们按顺序一一来看吧。

1. 定义模式

在Go中,我们使用结构体来表示Schema。我们将在SchemaConfig中定义Query和Mutation。
链接:https://github.com/monmaru/gae-graphql/blob/master/application/gql/scheme.go#L9-L13

func NewSchema(ur repository.UserRepository, br repository.BlogRepository) (graphql.Schema, error) {
    resolver := newResolver(ur, br)
    return graphql.NewSchema(graphql.SchemaConfig{
        Query:    newQuery(resolver),
        Mutation: newMutation(resolver),
    })
}

这是Mutation的一个例子。即使是查询(Query),做的事情也是一样的。
我们将定义所需的字段。
本次例子中,我们将创建一个名为createUser的用户和一个名为createBlog的博客。
我们将使用Type来定义在API中处理的数据,并使用Args来定义从请求中接收的参数。
我们将在Resolve中传递一个函数来解析字段。(详见下文)
https://github.com/monmaru/gae-graphql/blob/master/application/gql/mutation.go#L6-L29

func newMutation(r resolver) *graphql.Object {
    return graphql.NewObject(graphql.ObjectConfig{
        Name: "Mutation",
        Fields: graphql.Fields{
            "createUser": &graphql.Field{
                Type:        newCreateUserInputType(),
                Description: "Add a user",
                Args: graphql.FieldConfigArgument{
                    "name":  &graphql.ArgumentConfig{Type: graphql.NewNonNull(graphql.String)},
                    "email": &graphql.ArgumentConfig{Type: graphql.NewNonNull(graphql.String)},
                },
                Resolve: r.createUsersBatch,
            },
            "createBlog": &graphql.Field{
                Type:        newCreateBlogInputType(),
                Description: "Add a blog",
                Args: graphql.FieldConfigArgument{
                    "userId":  &graphql.ArgumentConfig{Type: graphql.NewNonNull(graphql.String)},
                    "title":   &graphql.ArgumentConfig{Type: graphql.NewNonNull(graphql.String)},
                    "content": &graphql.ArgumentConfig{Type: graphql.NewNonNull(graphql.String)},
                },
                Resolve: r.createBlogsBatch,
            },
        },
    })
}

2. 实现Resolver

以下是对上述内容的中文释义:

通过一个简单的例子来解释。解析器只需要在实现中从graphql.ResolveParams中提取上下文和所需参数,然后将其传递给Datastore。这就像在RDB中执行INSERT语句一样。在这方面,过度依赖空接口和类型断言可能会有些困难。

详见:https://github.com/monmaru/gae-graphql/blob/master/application/gql/resolver.go#L93-L103

func (r *graphQLResolver) createUser(params graphql.ResolveParams) (interface{}, error) {
    ctx := params.Context
    defer log.Duration(ctx, time.Now(), "[graphQLResolver.createUser]")
    name, _ := params.Args["name"].(string)
    email, _ := params.Args["email"].(string)
    user := &model.User{
        Name:  name,
        EMail: email,
    }
    return r.ur.Create(ctx, user)
}

3. 实现终端点(服务器)。

只需要定义一个终端点即可。
将请求主体作为字符串全部读取,与架构一起传递给graphql.Do。
最后,以JSON格式返回接收到的结果。
以上!

http.HandleFunc("/graphql", func(w http.ResponseWriter, r *http.Request) {
    body, err := ioutil.ReadAll(r.Body)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    result := graphql.Do(graphql.Params{
        Schema:        schema,
        RequestString: string(body),
    })
    if len(result.Errors) > 0 {
        fmt.Printf("wrong result, unexpected errors: %v", result.Errors)
    }
    json.NewEncoder(w).Encode(result)
})

此外,使用graphql-go/handler,您可以轻松地创建graphiql和playground的Web页面。https://github.com/monmaru/gae-graphql/blob/master/interfaces/handler/graphiql.go

让我们追踪这个动作

我们将获取两个用户的信息以及用户所发布的文章。

{
  u1: user(id: "5635703144710144") {
    name
    posts(limit: 3) {
      nodes {
        title
        createdAt
      }
    }
  }
  u2: user(id: "5153049148391424") {
    name
    posts(limit: 3) {
      nodes {
        title
      }
    }
  }
}

一切安好,有所动静了!??

{
  "data": {
    "u1": {
      "name": "Jiro",
      "posts": {
        "nodes": [
          {
            "createdAt": "2018-12-16T06:19:15.683822Z",
            "title": "pogeff"
          },
          {
            "createdAt": "2018-12-16T06:19:15.68376Z",
            "title": "mogefff"
          },
          {
            "createdAt": "2018-12-16T06:19:15.683629Z",
            "title": "hogeff"
          }
        ]
      }
    },
    "u2": {
      "name": "Taro",
      "posts": {
        "nodes": []
      }
    }
  }
}

好吧,那我们来看看内部是如何处理的吧? 我们使用 Stackdriver Trace。

Stackdriver Trace (翻译:堆栈驱动跟踪)

由于第二代无法自动收集数据,因此需要进行一些实施工作。首先,需要注册Exporter。

// Stackdriver Trace
exporter, err := stackdriver.NewExporter(stackdriver.Options{
    ProjectID: projID,
})
if err != nil {
    log.Fatal(err)
}
trace.RegisterExporter(exporter)

设置传播并启动服务器。

server := &http.Server{
    Addr: fmt.Sprintf(":%s", port),
    Handler: &ochttp.Handler{
        Handler:     router,
        Propagation: &propagation.HTTPFormat{},
    },
}

添加这样的代码并进行部署,就能了解它的运行方式。
通过与Datastore和RPC进行大约4次通信,我发现它是按顺序运行的。

slow-trace.png

这与其他文章中提到的N+1问题相关。

N+1问题

这是由于GraphQL单独且递归地执行解析器而导致的问题。
简而言之,很容易发生每次插入一个迭代,而不是将多个N个数据一起插入。

在中国,只需要一个选项,释义以下内容:
处理时间会变得很长,而且希望能避免多次通信。
GraphQL似乎通过延迟评估来解决这个问题。

延迟评估

需要将重复执行N次的SQL语句合并为一个。
通过先将它们暂存起来,不立即对其进行解析,而是在之后一起进行解析和评估,从而解决N+1问题。

Facebook 正在发布适用于 JavaScript 的库,但本次将使用 Go 实现的 graph-gophers/dataloader。这个库可以通过 GetMulti 或 PutMulti 而不是 Get 或 Put 在 Datastore 上实现批量处理。

尝试使用数据加载器(dataloader)。

使用dataloader需要以下的实现。

    • バッチ処理用の関数を定義

 

    • リゾルバーを実装

dataloader.Loaderを生成

定义用于批处理的函数。

从dataloader.Keys接收参数列表,并将它们一起传递给Datastore。同时,将处理结果以列表形式[]*dataloader.Result返回。
参考链接:https://github.com/monmaru/gae-graphql/blob/master/application/gql/batch_functions.go#L23-L51

return func(ctx context.Context, keys dataloader.Keys) []*dataloader.Result {
    defer log.Duration(ctx, time.Now(), "[GetUsersBatchFunc]")
    var strIDs []string
    for _, key := range keys {
        strID, ok := key.Raw().(string)
        if !ok {
            return handleError(ctx, errors.New("Invalid key value"))
        }
        strIDs = append(strIDs, strID)
    }

    users, err := ur.GetMulti(ctx, strIDs)
    if err != nil {
        log.Errorf(ctx, err.Error())
        return handleError(ctx, err)
    }

    var results []*dataloader.Result
    for _, user := range users {
        result := dataloader.Result{
            Data:  user,
            Error: nil,
        }
        results = append(results, &result)
    }

    log.Infof(ctx, "[GetUsersBatchFunc] batch size: %d", len(results))
    return results
}

实现解析器

解析器的实现如下:
从上下文中提取Loader。
将接收到的参数放入Key中并传递给Loader。
此时,并不会发生与数据存储之间的通信,而是暂时存储为“桶”。它将在前面提到的批处理函数中一次性处理。

func (r *graphQLResolver) getUsersBatch(params graphql.ResolveParams) (interface{}, error) {
    ctx := params.Context
    defer log.Duration(ctx, time.Now(), "[graphQLResolver.getUsersBatch]")
    strID, ok := params.Args["id"].(string)
    if !ok {
        return nil, errors.New("invalid id")
    }

    key := newGetUserKey(strID)
    v := ctx.Value(GetUsersKey)
    loader, ok := v.(*dataloader.Loader)
    if !ok {
        return nil, errors.New("loader is empty")
    }

    thunk := loader.Load(ctx, key)
    return func() (interface{}, error) {
        return thunk()
    }, nil
}

生成一个dataloader.Loader实例

我实现了一个中间件来处理它们并根据需要生成加载器。


func (i *contextInjector) Inject(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ctx := i.setupContext(r)
        defer log.Duration(ctx, time.Now(), fmt.Sprintf("[%s]", r.URL.Path))
        r = r.WithContext(ctx)
        next.ServeHTTP(w, r)
    })
}

func (i *contextInjector) setupContext(r *http.Request) context.Context {
    ctx := r.Context()
    ctx = context.WithValue(
        ctx,
        gql.GetUsersKey,
        dataloader.NewBatchedLoader(gql.GetUsersBatchFunc(i.ur)))
// 省略!!
}

努力进行并行处理

如果在Go中使用Datastore,由于无法进行OR查询,将它们延迟评估和批量处理可能没有意义。
在这种情况下,我们创建了辅助函数,并使用goroutine并行处理,以提高性能。
并行向Datastore发送请求是一个惯例。

func concurrentResolve(fn graphql.FieldResolveFn) graphql.FieldResolveFn {
    return func(params graphql.ResolveParams) (interface{}, error) {
        type result struct {
            data interface{}
            err  error
        }
        ch := make(chan *result, 1)
        go func() {
            defer close(ch)
            data, err := fn(params)
            ch <- &result{data: data, err: err}
        }()
        return func() (interface{}, error) {
            r := <-ch
            return r.data, r.err
        }, nil
    }
}

我再看一遍Trace

fast-trace.png

还有什么事情需要完成

我本想使用Datastore来实现Relay式游标分页,但是时间不够了。

总结

我使用GAE和Datastore来实现了一个GraphQL API。我觉得Datastore不仅可以轻松进行并行处理,而且与关系型数据库相比也更加兼容。我还可以利用Stackdriver Trace来方便地进行性能调优,这让开发变得非常容易。

如果有兴趣了解GraphQL或App Engine的人,请务必尝试一下❗

如果有任何错误或建议,请在评论中提出。感谢!m(_ _)m

广告
将在 10 秒后关闭
bannerAds