接收Kafka消息
今天我们要看一下昨天发送的消息的接收处理。
【漏洞修复】
在这之前,我将修复一个错误。
我已经使用以下类型实施了substrate.Message。
type message []byte
func (m message) Data() []byte {
return m
}
然而,在substrate库中有以下的描述,由于slice无法使用==/!=运算符进行比较,因此会发生恐慌。
- https://github.com/uw-labs/substrate/blob/master/sync_adapter_sink.go#L98
if msg.Message != req.m {
panic(fmt.Sprintf("wrong message expected: %s got: %s", req.m, msg.Message))
}
所以我们将调整message型如下。
type message struct{ data []byte }
func (m *message) Data() []byte {
return m.data
}
请查看存储库,因为呼叫方也需要进行一些小的更改。修改已完成。
创建Substrate.SynchronousMessageSource
从substrate库中提供了用于接收消息的Source对象接口。
通过定义initializeKafkaSource()方法,可以创建Source对象。
func initialiseKafkaSource(version, brokers, topic, consumer *string, offsetOldest *bool) (substrate.SynchronousMessageSource, error) {
var kafkaOffset int64
if *offsetOldest {
kafkaOffset = kafka.OffsetOldest
} else {
kafkaOffset = kafka.OffsetNewest
}
source, err := kafka.NewAsyncMessageSource(kafka.AsyncMessageSourceConfig{
ConsumerGroup: *consumer,
Topic: *topic,
Brokers: strings.Split(*brokers, ","),
Offset: kafkaOffset,
Version: *version,
})
if err != nil {
return nil, err
}
return substrate.NewSynchronousMessageSource(source), nil
}
呼叫方的样子就是这样。
sourceKafkaVersion := app.String(cli.StringOpt{
Name: "source-kafka-version",
Desc: "source kafka version",
EnvVar: "SOURCE_KAFKA_VERSION",
})
sourceBrokers := app.String(cli.StringOpt{
Name: "source-brokers",
Desc: "kafka source brokers",
EnvVar: "SOURCE_BROKERS",
Value: "localhost:9092",
})
consumerID := app.String(cli.StringOpt{
Name: "consumer-id",
Desc: "consumer id to connect to source",
EnvVar: "CONSUMER_ID",
Value: appName,
})
kafkaOffsetOldest := app.Bool(cli.BoolOpt{
Name: "kafka-offset-oldest",
Desc: "If set to true, will start consuming from the oldest available messages",
EnvVar: "KAFKA_OFFSET_OLDEST",
Value: true,
})
...
actionSource, err := initialiseKafkaSource(sourceKafkaVersion, sourceBrokers, actionTopic, consumerID, kafkaOffsetOldest)
if err != nil {
log.WithError(err).Fatalln("init action event kafka source")
}
defer actionSource.Close()
创建消息处理程序
以下是在 substrate.SynchronousMessageSource接口中定义的方法。
type SynchronousMessageSource interface {
...
// ConsumeMessages calls the `handler` function for each messages
// available to consume. If the handler returns no error, an
// acknowledgement will be sent to the broker. If an error is returned
// by the handler, it will be propogated and returned from this
// function. This function will block until `ctx` is done or until an
// error occurs.
ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error
...
}
这个参数所代表的substrate.ConsumerMessageHandler被定义如下,作为处理消息的处理器来实现它。
// ConsumerMessageHandler is the callback function type that synchronous
// message consumers must implement.
type ConsumerMessageHandler func(context.Context, Message) error
因此,用于处理程序的actionEventHandler如下所示。
type actionEventHandler struct {
todoMgr todoManager
}
func newActionEventHandler(todoMgr todoManager) actionEventHandler {
return actionEventHandler{todoMgr: todoMgr}
}
func (h actionEventHandler) handle(ctx context.Context, msg substrate.Message) error {
var env envelope.Event
if err := proto.Unmarshal(msg.Data(), &env); err != nil {
return errors.Wrap(err, "failed to unmarshal message")
}
if types.Is(env.Payload, &event.CreateTodoActionEvent{}) {
var ev event.CreateTodoActionEvent
if err := types.UnmarshalAny(env.Payload, &ev); err != nil {
return errors.Wrap(err, "failed to unmarshal payload")
}
if err := h.todoMgr.projectTodo(todo{
id: ev.Id,
title: ev.Title,
description: ev.Description,
}); err != nil {
return errors.Wrap(err, "failed to project a todo")
}
}
return nil
}
把从msg.Data()到事件的拆解处理和前几天在keyFunc章节中看到的非常相似啊。
将提取的事件保存处理与同步处理时在服务器结构体中实现的完全一样。
对不起,测试部分省略了。
请确认根据对todo的id处理所做的更改,相应地进行了一些数据类型和todoManager接口的变更。
接收消息进程的开始
在 main.go 内启动一个新的 goroutine 来接收消息。
wg.Add(1)
go func() {
defer wg.Done()
h := newActionEventHandler(store)
if err := actionSource.ConsumeMessages(context.Background(), h.handle); err != nil {
errCh <- errors.Wrap(err, "failed to consume action event")
}
}()
这样也可以,但是让我们稍作改进,以便以和平的方式结束。
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
h := newActionEventHandler(store)
if err := actionSource.ConsumeMessages(ctx, h.handle); err != nil {
errCh <- errors.Wrap(err, "failed to consume action event")
}
}()
...
gSrv.GracefulStop()
cancel()
wg.Wait()
通过这个过程,gRPC服务器和Kafka消息接收过程将同时开始运行。
好吧,我们现在要更新Kubernetes的清单文件并进行调试,但我会偷懒?
如果你对此感兴趣,请先查看GitHub仓库。
我现在已经实现了将Todo保存的异步处理。明天我想试试给Kafka添加gRPC接口,这样做有点有趣呢。那么,再见。