发送Kafka消息
好吧,那么我想要从应用程序中使用Kafka。
具体来说,我想要将以POST方法保存TODO的过程异步化。
今天我们来看一下该活动的发送部分。
UW实验室/基底
我们公司还发布了一个用于抽象化消息的Go库。即使您使用的是除Kafka之外的流处理器,也可以用相同的方式进行实现。本次让我们一起来看看这个库的用法。
substrate.新的同步消息接收点()
在这个substrate库中,默认情况下异步化完成消息处理的信号,以提高性能。但是,这次我想要使用同步版本的substrate.NewSynchronousMessageSink()的API。
在main.go中,我会定义一个名为`initializeKafkaSink`的方法来设置消息sink,如下所示。
func initialiseKafkaSink(version, brokers, topic *string, keyFunc func(substrate.Message) []byte) (substrate.SynchronousMessageSink, error) {
sink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{
Brokers: strings.Split(*brokers, ","),
Topic: *topic,
KeyFunc: keyFunc,
Version: *version,
})
if err != nil {
return nil, err
}
return substrate.NewSynchronousMessageSink(sink), nil
}
调用方的感觉如下所示。
unc main() {
...
app.Action = func() {
...
gSrv := initialiseGRPCServer(newServer(store))
actionSink, err := initialiseKafkaSink(sinkKafkaVersion, sinkBrokers, actionTopic, actionKeyFunc)
if err != nil {
log.Fatalln("init action event kafka sink:", err)
}
defer actionSink.Close()
errCh := make(chan error, 2)
...
}
...
}
我們對於 Kafka 的版本以及其他必要的環境變數定義如下。
sinkKafkaVersion := app.String(cli.StringOpt{
Name: "sink-kafka-version",
Desc: "sink kafka version",
EnvVar: "SINK_KAFKA_VERSION",
})
sinkBrokers := app.String(cli.StringOpt{
Name: "sink-brokers",
Desc: "kafka sink brokers",
EnvVar: "SINK_BROKERS",
Value: "localhost:9092",
})
actionTopic := app.String(cli.StringOpt{
Name: "action-topic",
Desc: "action topic",
EnvVar: "ACTION_TOPIC",
Value: "qiita.action",
})
后续我们会详细讨论的是传给initializeKafkaSink的actionKeyFunc函数。
事件的定义
我們現在想要定義一個名為CreateTodoActionEvent的事件,這個事件將會被實際發送出去。
syntax = "proto3";
package event;
message CreateTodoActionEvent {
string id = 1;
string title = 2;
string description = 3;
}
在proto/service.proto中定义了Todo消息,因此可以重用它,但由于编译有些麻烦,所以我将其作为重构元素留下。
原型任务的内容如下。
.PHONY: protos
protos:
mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR) $(GENERATED_EVENT_DIR) $(GENERATED_ENVELOPE_DIR)
protoc \
-I $(PROTO_DIR) \
-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
--gogoslick_out=plugins=grpc:$(GENERATED_SERVICE_DIR) \
service.proto
protoc \
-I $(PROTO_DIR) \
-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
--gogoslick_out=paths=source_relative:$(GENERATED_EVENT_DIR) \
event.proto
protoc \
-I $(PROTO_DIR) \
-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
--gogoslick_out=paths=source_relative,$(ENVELOPE_PROTO_MAPPINGS):$(GENERATED_ENVELOPE_DIR) \
envelope.proto
关键函数的实现
这是一个名为actionKeyFunc的函数,之前我们推迟了它。
这个函数用于定义事件的键。
在Kafka中,共享相同键的消息将进入同一个分区,这是一个非常重要的性质,所以我们不能忽视使用什么作为键的问题。
在之前的事件定义中,我们添加了一个id字段,所以这次我们想要使用它作为键。
func actionKeyFunc(msg substrate.Message) []byte {
var env envelope.Event
if err := proto.Unmarshal(msg.Data(), &env); err != nil {
panic(err)
}
if types.Is(env.Payload, &event.CreateTodoActionEvent{}) {
var ev event.CreateTodoActionEvent
if err := types.UnmarshalAny(env.Payload, &ev); err != nil {
panic(err)
}
return []byte(ev.Id)
}
panic("unknown event")
}
在这里,只需要一个选项,对以下内容进行中文的本土化改写:
– github.com/gogo/protobuf/proto
– github.com/gogo/protobuf/types
这里包括:
– github.com/gogo/protobuf/proto
– github.com/gogo/protobuf/types
你可以使用这两个包来从信封中取出事件,也可以确认这个过程。
substrate.Message的实现方式
我想开始实现消息发送部分的功能,但在此之前需要一个必要的步骤。
消息发送的APIPublishMessage()函数接受substrate.Message接口作为参数。
因此,我们需要准备一个实现该接口的类型。
只需一个必要的方法。
type Message interface {
Data() []byte
}
定义如下的类型。
type message []byte
func (m message) Data() []byte {
return m
}
发送消息
好的,那么现在是消息发送处理。
server.CreateTodo()方法将如下所示。
func (s *server) CreateTodo(ctx context.Context, req *service.CreateTodoRequest) (*service.CreateTodoResponse, error) {
todoID := uuid.New().String()
ev := &event.CreateTodoActionEvent{
Id: todoID,
Title: req.Todo.Title,
Description: req.Todo.Description,
}
any, err := types.MarshalAny(ev)
if err != nil {
return nil, err
}
env := envelope.Event{
Id: uuid.New().String(),
Timestamp: types.TimestampNow(),
Payload: any,
}
b, err := proto.Marshal(&env)
if err != nil {
return nil, err
}
if err := s.sink.PublishMessage(ctx, message(b)); err != nil {
return nil, err
}
return &service.CreateTodoResponse{
Success: true,
Id: todoID,
}, nil
}
请关注将事件作为有效负载封装在信封中的部分,然后将整个内容转换为[]byte进行编组的处理。
服务器结构体依赖于substrate.SynchrounousMessageSink。
type (
...
server struct {
sink substrate.SynchronousMessageSink
}
)
我們應該寫一段測試傳送訊息程式碼,不過這次我們稍微偷懶了。希望在未來的某一天能夠提到它。
依赖关系的注入
我們需要在 main.go 中注入以下的依賴關係。
diff --git a/main.go b/main.go
index be109e8..907cd41 100644
--- a/main.go
+++ b/main.go
@@ -104,8 +104,6 @@ func main() {
}
defer lis.Close()
- gSrv := initialiseGRPCServer(newServer(nil))
-
actionSink, err := initialiseKafkaSink(sinkKafkaVersion, sinkBrokers, actionTopic, actionKeyFunc)
if err != nil {
log.Fatalln("init payment account kafka sink:", err)
@@ -122,6 +120,9 @@ func main() {
}()
var wg sync.WaitGroup
+
+ gSrv := initialiseGRPCServer(newServer(actionSink))
+
wg.Add(1)
go func() {
defer wg.Done()
我认为在发送消息之前,实际上需要通过验证等手段来检查是否可以真正保存,但总的来说,这已经完成了消息的发送部分。
发现比想象中的任务更多,所以事情进行地有些仓促。非常抱歉。
由于消息发送过程已经完成,接下来我们希望继续处理接收消息并将其保存到数据库中。