发送Kafka消息

好吧,那么我想要从应用程序中使用Kafka。
具体来说,我想要将以POST方法保存TODO的过程异步化。

今天我们来看一下该活动的发送部分。

UW实验室/基底

我们公司还发布了一个用于抽象化消息的Go库。即使您使用的是除Kafka之外的流处理器,也可以用相同的方式进行实现。本次让我们一起来看看这个库的用法。

substrate.新的同步消息接收点()

在这个substrate库中,默认情况下异步化完成消息处理的信号,以提高性能。但是,这次我想要使用同步版本的substrate.NewSynchronousMessageSink()的API。
在main.go中,我会定义一个名为`initializeKafkaSink`的方法来设置消息sink,如下所示。

func initialiseKafkaSink(version, brokers, topic *string, keyFunc func(substrate.Message) []byte) (substrate.SynchronousMessageSink, error) {
    sink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{
        Brokers: strings.Split(*brokers, ","),
        Topic:   *topic,
        KeyFunc: keyFunc,
        Version: *version,
    })
    if err != nil {
        return nil, err
    }

    return substrate.NewSynchronousMessageSink(sink), nil
}

调用方的感觉如下所示。

unc main() {
    ...

    app.Action = func() {
        ...

        gSrv := initialiseGRPCServer(newServer(store))

        actionSink, err := initialiseKafkaSink(sinkKafkaVersion, sinkBrokers, actionTopic, actionKeyFunc)
        if err != nil {
            log.Fatalln("init action event kafka sink:", err)
        }
        defer actionSink.Close()

        errCh := make(chan error, 2)

        ...
    }

    ...
}

我們對於 Kafka 的版本以及其他必要的環境變數定義如下。

sinkKafkaVersion := app.String(cli.StringOpt{
    Name:   "sink-kafka-version",
    Desc:   "sink kafka version",
    EnvVar: "SINK_KAFKA_VERSION",
})
sinkBrokers := app.String(cli.StringOpt{
    Name:   "sink-brokers",
    Desc:   "kafka sink brokers",
    EnvVar: "SINK_BROKERS",
    Value:  "localhost:9092",
})

actionTopic := app.String(cli.StringOpt{
    Name:   "action-topic",
    Desc:   "action topic",
    EnvVar: "ACTION_TOPIC",
    Value:  "qiita.action",
})

后续我们会详细讨论的是传给initializeKafkaSink的actionKeyFunc函数。

事件的定义

我們現在想要定義一個名為CreateTodoActionEvent的事件,這個事件將會被實際發送出去。

syntax = "proto3";

package event;

message CreateTodoActionEvent {
    string id = 1;
    string title = 2;
    string description = 3;
}

在proto/service.proto中定义了Todo消息,因此可以重用它,但由于编译有些麻烦,所以我将其作为重构元素留下。

原型任务的内容如下。

.PHONY: protos
protos:
    mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR) $(GENERATED_EVENT_DIR) $(GENERATED_ENVELOPE_DIR)
    protoc \
        -I $(PROTO_DIR) \
        -I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
        --gogoslick_out=plugins=grpc:$(GENERATED_SERVICE_DIR) \
        service.proto
    protoc \
        -I $(PROTO_DIR) \
        -I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
        --gogoslick_out=paths=source_relative:$(GENERATED_EVENT_DIR) \
        event.proto
    protoc \
        -I $(PROTO_DIR) \
        -I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
        --gogoslick_out=paths=source_relative,$(ENVELOPE_PROTO_MAPPINGS):$(GENERATED_ENVELOPE_DIR) \
        envelope.proto

关键函数的实现

这是一个名为actionKeyFunc的函数,之前我们推迟了它。
这个函数用于定义事件的键。
在Kafka中,共享相同键的消息将进入同一个分区,这是一个非常重要的性质,所以我们不能忽视使用什么作为键的问题。
在之前的事件定义中,我们添加了一个id字段,所以这次我们想要使用它作为键。

func actionKeyFunc(msg substrate.Message) []byte {
    var env envelope.Event
    if err := proto.Unmarshal(msg.Data(), &env); err != nil {
        panic(err)
    }

    if types.Is(env.Payload, &event.CreateTodoActionEvent{}) {
        var ev event.CreateTodoActionEvent
        if err := types.UnmarshalAny(env.Payload, &ev); err != nil {
            panic(err)
        }

        return []byte(ev.Id)
    }

    panic("unknown event")
}

在这里,只需要一个选项,对以下内容进行中文的本土化改写:
– github.com/gogo/protobuf/proto
– github.com/gogo/protobuf/types

这里包括:
– github.com/gogo/protobuf/proto
– github.com/gogo/protobuf/types

你可以使用这两个包来从信封中取出事件,也可以确认这个过程。

substrate.Message的实现方式

我想开始实现消息发送部分的功能,但在此之前需要一个必要的步骤。
消息发送的APIPublishMessage()函数接受substrate.Message接口作为参数。
因此,我们需要准备一个实现该接口的类型。

只需一个必要的方法。

type Message interface {
    Data() []byte
}

定义如下的类型。

type message []byte

func (m message) Data() []byte {
    return m
}

发送消息

好的,那么现在是消息发送处理。
server.CreateTodo()方法将如下所示。

func (s *server) CreateTodo(ctx context.Context, req *service.CreateTodoRequest) (*service.CreateTodoResponse, error) {
    todoID := uuid.New().String()
    ev := &event.CreateTodoActionEvent{
        Id:          todoID,
        Title:       req.Todo.Title,
        Description: req.Todo.Description,
    }

    any, err := types.MarshalAny(ev)
    if err != nil {
        return nil, err
    }

    env := envelope.Event{
        Id:        uuid.New().String(),
        Timestamp: types.TimestampNow(),
        Payload:   any,
    }

    b, err := proto.Marshal(&env)
    if err != nil {
        return nil, err
    }

    if err := s.sink.PublishMessage(ctx, message(b)); err != nil {
        return nil, err
    }

    return &service.CreateTodoResponse{
        Success: true,
        Id:      todoID,
    }, nil
}

请关注将事件作为有效负载封装在信封中的部分,然后将整个内容转换为[]byte进行编组的处理。

服务器结构体依赖于substrate.SynchrounousMessageSink。

type (
    ...

    server struct {
        sink substrate.SynchronousMessageSink
    }
)

我們應該寫一段測試傳送訊息程式碼,不過這次我們稍微偷懶了。希望在未來的某一天能夠提到它。

依赖关系的注入

我們需要在 main.go 中注入以下的依賴關係。

diff --git a/main.go b/main.go
index be109e8..907cd41 100644
--- a/main.go
+++ b/main.go
@@ -104,8 +104,6 @@ func main() {
        }
        defer lis.Close()

-       gSrv := initialiseGRPCServer(newServer(nil))
-
        actionSink, err := initialiseKafkaSink(sinkKafkaVersion, sinkBrokers, actionTopic, actionKeyFunc)
        if err != nil {
            log.Fatalln("init payment account kafka sink:", err)
@@ -122,6 +120,9 @@ func main() {
        }()

        var wg sync.WaitGroup
+
+       gSrv := initialiseGRPCServer(newServer(actionSink))
+
        wg.Add(1)
        go func() {
            defer wg.Done()

我认为在发送消息之前,实际上需要通过验证等手段来检查是否可以真正保存,但总的来说,这已经完成了消息的发送部分。


发现比想象中的任务更多,所以事情进行地有些仓促。非常抱歉。

由于消息发送过程已经完成,接下来我们希望继续处理接收消息并将其保存到数据库中。

广告
将在 10 秒后关闭
bannerAds