尝试整套 go-grpc-middleware
在gRPC中,可以拦截在函数执行时指定的处理。在使用golang创建gRPC应用程序时,使用这种拦截功能非常方便的中间件是go-grpc-middleware。
我想试试这个go-grpc-middleware的所有功能。
这次我们制作的示例应用程序如下。
https://github.com/morix1500/sample-go-grpc-middleware
准备好
首先,我们来创建一个gRPC服务器。
syntax = "proto3";
package hello;
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
service HelloService {
rpc Hello(HelloRequest) returns (HelloResponse) {}
}
使用以下命令生成go文件。
docker run --rm -v $(pwd):$(pwd) \
-w $(pwd) znly/protoc:0.4.0 \
-I ./proto \
--go_out=plugins=grpc:./proto/ \
proto/hello.proto
让我们使用golang编写服务器代码。
package main
import (
"context"
pb "github.com/morix1500/sample-go-grpc-middleware/proto"
"google.golang.org/grpc"
"net"
)
type HelloService struct{}
func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
return &pb.HelloResponse{
Message: "Hello, " + in.Name,
}, nil
}
func main() {
s := grpc.NewServer()
pb.RegisterHelloServiceServer(s, HelloService{})
lis, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
if err := s.Serve(lis); err != nil {
panic(err)
}
}
最后,启动并使用grpcurl进行操作验证。
$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
确认动作完成。
尝试一下
在撰写本文时,go-grpc-middleware提供了以下功能。
-
- grpc_auth – a customizable (via AuthFunc) piece of auth middleware
-
- grpc_ctxtags – a library that adds a Tag map to context, with data populated from request body
-
- grpc_zap – integration of zap logging library into gRPC handlers.
-
- grpc_logrus – integration of logrus logging library into gRPC handlers.
-
- grpc_prometheus – Prometheus client-side and server-side monitoring middleware
-
- otgrpc – OpenTracing client-side and server-side interceptors
-
- grpc_opentracing – OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags
-
- grpc_validator – codegen inbound message validation from .proto options
- grpc_recovery – turn panics into gRPC errors
gRPC身份验证
如果想在执行处理之前进行认证处理,可以使用中间件。接收来自客户端的认证令牌,并编写处理程序以验证是否为预期的令牌,如果不是则抛出认证错误。
// NewServerのところでInterceptorを設定します
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authFunc)),
)
---
// 認証用の関数
func authFunc(ctx context.Context) (context.Context, error) {
token, err := grpc_auth.AuthFromMD(ctx, "bearer")
if err != nil {
return nil, err
}
fmt.Printf("receive token: %s\n", token)
if token != "hoge" {
return nil, grpc.Errorf(codes.Unauthenticated, "invalid token")
}
newCtx := context.WithValue(ctx, "result", "ok")
return newCtx, nil
}
我会尝试将Bearer Token嵌入到Authorization头中,如果Token为”hoge”,则请求将成功。
# ヘッダーなし
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
ERROR:
Code: Unauthenticated
Message: Request unauthenticated with bearer
# ヘッダーあり、token間違い
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "Authorization: bearer fuga" localhost:5000 hello.HelloService/Hello
ERROR:
Code: Unauthenticated
Message: invalid token
# ヘッダーあり(正常)
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "Authorization: bearer hoge" localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
这样一来,您可以轻松地编写通用的身份验证处理程序。
grpc_ctxtags 接口库
这个将请求信息嵌入到上下文的元数据中的工具,可以与其他中间件一起使用。
grpc_zap 可以被转述为 gRPC Zap。
Zap是一个用于将Logger设置为gRPC日志记录器并输出日志的中间件。
opts := []grpc_zap.Option{
grpc_zap.WithDurationField(func(duration time.Duration) zapcore.Field {
return zap.Int64("grpc.time_ns", duration.Nanoseconds())
}),
}
zapLogger, _ := zap.NewProduction()
grpc_zap.ReplaceGrpcLogger(zapLogger)
s := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_zap.UnaryServerInterceptor(zapLogger, opts...),
),
)
执行结果如下所示。 shì)
$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "te
st: hoge" localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
# gRPCサーバーのログに以下が出力される
{"level":"info","ts":1547104771.884754,"caller":"zap/server_interceptors.go:40","msg":"finished unary call with code OK","peer.address":"[::1]:50033","grpc.start_time":"2019-01-10T16:19:31+09:00","system":"grpc","span.kind":"server","grpc.service":"hello.HelloService","grpc.method":"Hello","peer.address":"[::1]:50033","grpc.code":"OK","grpc.time_ns":162981}
gRPC的访问日志被良好地输出了。
谷歌通用远程过程调用库和登录系统
与 grpc_zap 相同,提供了适用于 logrus 的中间件。
logrus.SetLevel(logrus.DebugLevel)
logrus.SetOutput(os.Stdout)
logrus.SetFormatter(&logrus.JSONFormatter{})
logger := logrus.WithFields(logrus.Fields{})
opts := []grpc_logrus.Option{
grpc_logrus.WithDurationField(func(duration time.Duration) (key string, value interface{}) {
return "grpc.time_ns", duration.Nanoseconds()
}),
}
grpc_logrus.ReplaceGrpcLogger(logger)
s := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.UnaryServerInterceptor(logger, opts...),
),
)
我会尝试执行。
$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "te
st: hoge" localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
# gRPCサーバーのログに以下が出力される
{"grpc.code":"OK","grpc.method":"Hello","grpc.service":"hello.HelloService","grpc.start_time":"2019-01-10T16:34:13+09:00","grpc.time_ns":16512,"level":"info","msg":"finished unary call with code OK","peer.address":"[::1]:53643","span.kind":"server","system":"grpc","time":"2019-01-10T16:34:13+09:00"}
这跟zap一样。
gRPC普罗米修斯
这是一个用于将在 Prometheus 监控中输出指标的中间件。
由于更改的地方很多,因此我会将所有变更都包含在内。
package main
import (
"context"
"fmt"
grpc_prome "github.com/grpc-ecosystem/go-grpc-prometheus"
pb "github.com/morix1500/sample-go-grpc-middleware/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"net"
"net/http"
)
var (
grpcMetrics = grpc_prome.NewServerMetrics()
reg = prometheus.NewRegistry()
customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "demo_server_say_hello_method_handle_count",
Help: "Total number of RPCs handled on the server.",
}, []string{"name"})
)
type HelloService struct{}
func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
// Custom Metrics Count up
customizedCounterMetric.WithLabelValues("Test").Inc()
return &pb.HelloResponse{
Message: "Hello, " + in.Name,
}, nil
}
func main() {
lis, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
// カスタムメトリクス登録
reg.MustRegister(grpcMetrics, customizedCounterMetric)
customizedCounterMetric.WithLabelValues("Test")
// create http server
httpServer := &http.Server{
Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
Addr: fmt.Sprintf("0.0.0.0:%d", 5001),
}
s := grpc.NewServer(
grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
)
pb.RegisterHelloServiceServer(s, HelloService{})
// メトリクス初期化
grpcMetrics.InitializeMetrics(s)
go func() {
if err := httpServer.ListenAndServe(); err != nil {
panic(err)
}
}()
if err := s.Serve(lis); err != nil {
panic(err)
}
}
进行中
$ go run main.go
# helloを3回呼び出したあとに
$ curl localhost:5001
# HELP demo_server_say_hello_method_handle_count Total number of RPCs handled on the server.
# TYPE demo_server_say_hello_method_handle_count counter
demo_server_say_hello_method_handle_count{name="Test"} 3
# HELP grpc_server_handled_total Total number of RPCs completed on the server, regardless of success or failure.
# TYPE grpc_server_handled_total counter
grpc_server_handled_total{grpc_code="Aborted",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="AlreadyExists",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Canceled",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="DataLoss",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="DeadlineExceeded",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="FailedPrecondition",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Internal",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="InvalidArgument",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="NotFound",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="OK",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
grpc_server_handled_total{grpc_code="OutOfRange",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="PermissionDenied",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="ResourceExhausted",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unauthenticated",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unimplemented",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unknown",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
# HELP grpc_server_msg_received_total Total number of RPC stream messages received on the server.
# TYPE grpc_server_msg_received_total counter
grpc_server_msg_received_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
# HELP grpc_server_msg_sent_total Total number of gRPC stream messages sent by the server.
# TYPE grpc_server_msg_sent_total counter
grpc_server_msg_sent_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
# HELP grpc_server_started_total Total number of RPCs started on the server.
# TYPE grpc_server_started_total counter
grpc_server_started_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
Prometheus以这种形式输出指标。当然,您也可以设置自定义指标。
otgrpc / grpc_opentracing 可以改写为「otgrpc / grpc_opentracing」
一个使用OpenTracing的中间件。由于不了解OpenTracing的用法,所以省略。
GRPC验证器
这是一个用于验证请求的中间件。
我们将在proto文件中记录验证方法。
syntax = "proto3";
import "github.com/mwitkow/go-proto-validators/validator.proto";
package hello;
message HelloRequest {
string name = 1 [(validator.field) = {string_not_empty: true}];
}
发布Go文件的命令也稍作更改。
docker run --rm -v $(pwd):$(pwd) \
-w $(pwd) znly/protoc:0.4.0 \
-I ./proto \
--go_out=plugins=grpc:./proto/ \
--govalidators_out=./proto/ \
proto/hello.proto
用中文将以下语句重述一遍,只需要一种方式:
这样就可以在proto目录下输出一个名为 hello.validator.pb.go 的文件。
剩下的是设置gRPC的Intercepter。
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_validator.UnaryServerInterceptor(),
)),
)
我会尝试执行这个。
$ go run main.go
# 正常
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
# リクエストなし
$ grpcurl -plaintext -import-path . -proto proto/hello.proto localhost:5000 hello.HelloSer
vice/Hello
ERROR:
Code: InvalidArgument
Message: invalid field Name: value '' must not be an empty string
由于proto文件中定义并验证了接口,因此可以将接口定义文件的职责转移到proto文件中。
gRPC的恢复机制
这是一种中间件,可以将恐慌转换为gRPC错误。
// 意図的にpanicが起こるようにします
func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
if in.Name == "panic" {
panic("failed")
}
---
func main() {
opts := []grpc_recovery.Option{
grpc_recovery.WithRecoveryHandler(recoveryFunc),
}
s := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
grpc_recovery.UnaryServerInterceptor(opts...),
),
)
---
// gRPCのerrorを返します
func recoveryFunc(p interface{}) error {
fmt.Printf("p: %+v\n", p)
return grpc.Errorf(codes.Internal, "Unexpected error")
}
执行
$ go run main.go
# 正常
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localh
ost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
# 以上
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "panic"}' localhost:5000 hello.HelloService/Hello
ERROR:
Code: Internal
Message: Unexpected error
当发生恐慌时,服务器将停止运行,但现在可以将其作为gPRC错误返回。
贴士
gRPC错误代码
在生成gRPC错误时,可以从以下代码中选择:
https://github.com/grpc/grpc-go/blob/master/codes/codes.go
操作context的元数据的方便函数
編輯與上下文相關的元數據有點麻煩,但是 go-grpc-middleware 在這方面提供了這樣的功能。
https://github.com/grpc-ecosystem/go-grpc-middleware/tree/master/util/metautils
// これでMetadataのKeyからValueが取れる
val := metautils.ExtractIncoming(ctx).Get("Hoge")