尝试整套 go-grpc-middleware

在gRPC中,可以拦截在函数执行时指定的处理。在使用golang创建gRPC应用程序时,使用这种拦截功能非常方便的中间件是go-grpc-middleware。

我想试试这个go-grpc-middleware的所有功能。

这次我们制作的示例应用程序如下。
https://github.com/morix1500/sample-go-grpc-middleware

准备好

首先,我们来创建一个gRPC服务器。

syntax = "proto3";

package hello;

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

service HelloService {
  rpc Hello(HelloRequest) returns (HelloResponse) {}
}

使用以下命令生成go文件。

docker run --rm -v $(pwd):$(pwd) \
-w $(pwd) znly/protoc:0.4.0 \
-I ./proto \
--go_out=plugins=grpc:./proto/ \
proto/hello.proto

让我们使用golang编写服务器代码。

package main

import (
        "context"
        pb "github.com/morix1500/sample-go-grpc-middleware/proto"
        "google.golang.org/grpc"
        "net"
)

type HelloService struct{}

func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
        return &pb.HelloResponse{
                Message: "Hello, " + in.Name,
        }, nil
}

func main() {
        s := grpc.NewServer()
        pb.RegisterHelloServiceServer(s, HelloService{})

        lis, err := net.Listen("tcp", ":5000")
        if err != nil {
                panic(err)
        }
        if err := s.Serve(lis); err != nil {
                panic(err)
        }
}

最后,启动并使用grpcurl进行操作验证。

$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}

确认动作完成。

尝试一下

在撰写本文时,go-grpc-middleware提供了以下功能。

    • grpc_auth – a customizable (via AuthFunc) piece of auth middleware

 

    • grpc_ctxtags – a library that adds a Tag map to context, with data populated from request body

 

    • grpc_zap – integration of zap logging library into gRPC handlers.

 

    • grpc_logrus – integration of logrus logging library into gRPC handlers.

 

    • grpc_prometheus – Prometheus client-side and server-side monitoring middleware

 

    • otgrpc – OpenTracing client-side and server-side interceptors

 

    • grpc_opentracing – OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags

 

    • grpc_validator – codegen inbound message validation from .proto options

 

    grpc_recovery – turn panics into gRPC errors

gRPC身份验证

如果想在执行处理之前进行认证处理,可以使用中间件。接收来自客户端的认证令牌,并编写处理程序以验证是否为预期的令牌,如果不是则抛出认证错误。

// NewServerのところでInterceptorを設定します
s := grpc.NewServer(
                grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authFunc)),
        )
---

// 認証用の関数
func authFunc(ctx context.Context) (context.Context, error) {
        token, err := grpc_auth.AuthFromMD(ctx, "bearer")
        if err != nil {
                return nil, err
        }
        fmt.Printf("receive token: %s\n", token)
        if token != "hoge" {
                return nil, grpc.Errorf(codes.Unauthenticated, "invalid token")
        }
        newCtx := context.WithValue(ctx, "result", "ok")
        return newCtx, nil
}

我会尝试将Bearer Token嵌入到Authorization头中,如果Token为”hoge”,则请求将成功。

# ヘッダーなし
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
ERROR:
  Code: Unauthenticated
  Message: Request unauthenticated with bearer

# ヘッダーあり、token間違い
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "Authorization: bearer fuga" localhost:5000 hello.HelloService/Hello
ERROR:
  Code: Unauthenticated
  Message: invalid token

# ヘッダーあり(正常)
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "Authorization: bearer hoge" localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}

这样一来,您可以轻松地编写通用的身份验证处理程序。

grpc_ctxtags 接口库

这个将请求信息嵌入到上下文的元数据中的工具,可以与其他中间件一起使用。

grpc_zap 可以被转述为 gRPC Zap。

Zap是一个用于将Logger设置为gRPC日志记录器并输出日志的中间件。

        opts := []grpc_zap.Option{
                grpc_zap.WithDurationField(func(duration time.Duration) zapcore.Field {
                        return zap.Int64("grpc.time_ns", duration.Nanoseconds())
                }),
        }
        zapLogger, _ := zap.NewProduction()
        grpc_zap.ReplaceGrpcLogger(zapLogger)

        s := grpc.NewServer(
                grpc_middleware.WithUnaryServerChain(
                        grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
                        grpc_zap.UnaryServerInterceptor(zapLogger, opts...),
                ),
        )

执行结果如下所示。 shì)

$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "te
st: hoge" localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}

# gRPCサーバーのログに以下が出力される
{"level":"info","ts":1547104771.884754,"caller":"zap/server_interceptors.go:40","msg":"finished unary call with code OK","peer.address":"[::1]:50033","grpc.start_time":"2019-01-10T16:19:31+09:00","system":"grpc","span.kind":"server","grpc.service":"hello.HelloService","grpc.method":"Hello","peer.address":"[::1]:50033","grpc.code":"OK","grpc.time_ns":162981}

gRPC的访问日志被良好地输出了。

谷歌通用远程过程调用库和登录系统

与 grpc_zap 相同,提供了适用于 logrus 的中间件。

logrus.SetLevel(logrus.DebugLevel)
        logrus.SetOutput(os.Stdout)
        logrus.SetFormatter(&logrus.JSONFormatter{})
        logger := logrus.WithFields(logrus.Fields{})

        opts := []grpc_logrus.Option{
                grpc_logrus.WithDurationField(func(duration time.Duration) (key string, value interface{}) {
                        return "grpc.time_ns", duration.Nanoseconds()
                }),
        }

        grpc_logrus.ReplaceGrpcLogger(logger)

        s := grpc.NewServer(
                grpc_middleware.WithUnaryServerChain(
                        grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
                        grpc_logrus.UnaryServerInterceptor(logger, opts...),
                ),
        )

我会尝试执行。

$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "te
st: hoge" localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}

# gRPCサーバーのログに以下が出力される
{"grpc.code":"OK","grpc.method":"Hello","grpc.service":"hello.HelloService","grpc.start_time":"2019-01-10T16:34:13+09:00","grpc.time_ns":16512,"level":"info","msg":"finished unary call with code OK","peer.address":"[::1]:53643","span.kind":"server","system":"grpc","time":"2019-01-10T16:34:13+09:00"}

这跟zap一样。

gRPC普罗米修斯

这是一个用于将在 Prometheus 监控中输出指标的中间件。

由于更改的地方很多,因此我会将所有变更都包含在内。

package main

import (
        "context"
        "fmt"
        grpc_prome "github.com/grpc-ecosystem/go-grpc-prometheus"
        pb "github.com/morix1500/sample-go-grpc-middleware/proto"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promhttp"
        "google.golang.org/grpc"
        "net"
        "net/http"
)

var (
        grpcMetrics = grpc_prome.NewServerMetrics()

        reg = prometheus.NewRegistry()

        customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
                Name: "demo_server_say_hello_method_handle_count",
                Help: "Total number of RPCs handled on the server.",
        }, []string{"name"})
)

type HelloService struct{}

func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
        // Custom Metrics Count up
        customizedCounterMetric.WithLabelValues("Test").Inc()
        return &pb.HelloResponse{
                Message: "Hello, " + in.Name,
        }, nil
}

func main() {
        lis, err := net.Listen("tcp", ":5000")
        if err != nil {
                panic(err)
        }

        // カスタムメトリクス登録
        reg.MustRegister(grpcMetrics, customizedCounterMetric)
        customizedCounterMetric.WithLabelValues("Test")

        // create http server
        httpServer := &http.Server{
                Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
                Addr:    fmt.Sprintf("0.0.0.0:%d", 5001),
        }

        s := grpc.NewServer(
                grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
        )
        pb.RegisterHelloServiceServer(s, HelloService{})

        // メトリクス初期化
        grpcMetrics.InitializeMetrics(s)

        go func() {
                if err := httpServer.ListenAndServe(); err != nil {
                        panic(err)
                }
        }()

        if err := s.Serve(lis); err != nil {
                panic(err)
        }
}

进行中

$ go run main.go
# helloを3回呼び出したあとに
$ curl localhost:5001

# HELP demo_server_say_hello_method_handle_count Total number of RPCs handled on the server.
# TYPE demo_server_say_hello_method_handle_count counter
demo_server_say_hello_method_handle_count{name="Test"} 3
# HELP grpc_server_handled_total Total number of RPCs completed on the server, regardless of success or failure.
# TYPE grpc_server_handled_total counter
grpc_server_handled_total{grpc_code="Aborted",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="AlreadyExists",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Canceled",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="DataLoss",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="DeadlineExceeded",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="FailedPrecondition",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Internal",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="InvalidArgument",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="NotFound",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="OK",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
grpc_server_handled_total{grpc_code="OutOfRange",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="PermissionDenied",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="ResourceExhausted",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unauthenticated",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unimplemented",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unknown",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
# HELP grpc_server_msg_received_total Total number of RPC stream messages received on the server.
# TYPE grpc_server_msg_received_total counter
grpc_server_msg_received_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
# HELP grpc_server_msg_sent_total Total number of gRPC stream messages sent by the server.
# TYPE grpc_server_msg_sent_total counter
grpc_server_msg_sent_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
# HELP grpc_server_started_total Total number of RPCs started on the server.
# TYPE grpc_server_started_total counter
grpc_server_started_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3

Prometheus以这种形式输出指标。当然,您也可以设置自定义指标。

otgrpc / grpc_opentracing 可以改写为「otgrpc / grpc_opentracing」

一个使用OpenTracing的中间件。由于不了解OpenTracing的用法,所以省略。

GRPC验证器

这是一个用于验证请求的中间件。
我们将在proto文件中记录验证方法。

syntax = "proto3";

import "github.com/mwitkow/go-proto-validators/validator.proto";
package hello;

message HelloRequest {
  string name = 1 [(validator.field) = {string_not_empty: true}];
}

发布Go文件的命令也稍作更改。

docker run --rm -v $(pwd):$(pwd) \
-w $(pwd) znly/protoc:0.4.0 \
-I ./proto \
--go_out=plugins=grpc:./proto/ \
--govalidators_out=./proto/ \
proto/hello.proto

用中文将以下语句重述一遍,只需要一种方式:
这样就可以在proto目录下输出一个名为 hello.validator.pb.go 的文件。

剩下的是设置gRPC的Intercepter。

        s := grpc.NewServer(
                grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
                        grpc_validator.UnaryServerInterceptor(),
                )),
        )

我会尝试执行这个。

$ go run main.go
# 正常
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}
# リクエストなし
$ grpcurl -plaintext -import-path . -proto proto/hello.proto localhost:5000 hello.HelloSer
vice/Hello
ERROR:
  Code: InvalidArgument
  Message: invalid field Name: value '' must not be an empty string

由于proto文件中定义并验证了接口,因此可以将接口定义文件的职责转移到proto文件中。

gRPC的恢复机制

这是一种中间件,可以将恐慌转换为gRPC错误。

// 意図的にpanicが起こるようにします
func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
        if in.Name == "panic" {
                panic("failed")
        }
---
func main() {
        opts := []grpc_recovery.Option{
                grpc_recovery.WithRecoveryHandler(recoveryFunc),
        }

        s := grpc.NewServer(
                grpc_middleware.WithUnaryServerChain(
                        grpc_recovery.UnaryServerInterceptor(opts...),
                ),
        )

---

// gRPCのerrorを返します
func recoveryFunc(p interface{}) error {
        fmt.Printf("p: %+v\n", p)
        return grpc.Errorf(codes.Internal, "Unexpected error")
}

执行

$ go run main.go
# 正常
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localh
ost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}
# 以上
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "panic"}' localhost:5000 hello.HelloService/Hello
ERROR:
  Code: Internal
  Message: Unexpected error

当发生恐慌时,服务器将停止运行,但现在可以将其作为gPRC错误返回。

贴士

gRPC错误代码

在生成gRPC错误时,可以从以下代码中选择:
https://github.com/grpc/grpc-go/blob/master/codes/codes.go

操作context的元数据的方便函数

編輯與上下文相關的元數據有點麻煩,但是 go-grpc-middleware 在這方面提供了這樣的功能。
https://github.com/grpc-ecosystem/go-grpc-middleware/tree/master/util/metautils

// これでMetadataのKeyからValueが取れる
val := metautils.ExtractIncoming(ctx).Get("Hoge")
广告
将在 10 秒后关闭
bannerAds