使用 gRPC 连接 Kafka
假设有一天,我突然想到了这个想法:
「啊,我对卡夫卡有点腻了。还是用 NATS 吧。」
但是,如果其他团队要使用Kafka的话,我们必须要求他们进行修改。
他们可能很忙。这样就不容易换成NATS了。
在这种情况下,通过抽象化的消息系统接口让其代替直接访问Kafka,可以方便地隐藏内部实现。
这次介绍的是我们公司自家制作的 uw-labs/proximo 库(以及Go服务器的实现)。
在这个库中,我们使用protocol buffer定义了消息传递系统的接口。
- https://github.com/uw-labs/proximo/blob/master/proto/proximo.proto
syntax = "proto3";
package proximo;
message Message {
bytes data = 1;
string id = 2;
}
// Consumer types
service MessageSource {
rpc Consume(stream ConsumerRequest) returns (stream Message) {
}
}
message ConsumerRequest {
// expected if this is a start request
StartConsumeRequest startRequest = 2;
// expected if this is a confirmation
Confirmation confirmation = 3;
}
message StartConsumeRequest {
string topic = 1;
string consumer = 2;
Offset initial_offset = 3;
}
enum Offset {
OFFSET_DEFAULT = 0;
OFFSET_NEWEST = 1;
OFFSET_OLDEST = 2;
}
message Confirmation {
string msgID = 1;
}
// Producer types
service MessageSink {
rpc Publish(stream PublisherRequest) returns (stream Confirmation) {
}
}
message PublisherRequest {
// expected if this is a start request
StartPublishRequest startRequest = 2;
// expected if this is a message
Message msg = 3;
}
message StartPublishRequest {
string topic = 1;
}
请注意,我们使用双向流的RPC进行发送和接收。
通过准备实现该接口的gRPC服务器,我们可以隐藏用户对使用Kafka的事实。
由于存储库中提供了使用Go进行服务器实现和公共Docker映像,我们将使用它。
Proximo的部署。
使用以下的清单文件来部署将Kafka作为后端使用的proximo。
---
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/scrape: 'true'
prometheus.io/path: /__/metrics
prometheus.io/port: '8080'
name: &app proximo
namespace: &ns qiita
labels:
app: *app
spec:
ports:
- name: app
port: 6868
protocol: TCP
targetPort: 6868
- name: http
port: 80
protocol: TCP
targetPort: 8080
selector:
app: *app
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: &app proximo
name: *app
namespace: &ns qiita
spec:
replicas: 1
selector:
matchLabels:
app: *app
template:
metadata:
labels:
app: *app
namespace: *ns
spec:
containers:
- name: *app
image: quay.io/utilitywarehouse/proximo:latest
args:
- /proximo-server
- kafka
env:
- name: PROXIMO_KAFKA_VERSION
valueFrom:
configMapKeyRef:
name: kafka-brokers
key: internal.kafka.broker.version
- name: PROXIMO_KAFKA_BROKERS
valueFrom:
configMapKeyRef:
name: kafka-brokers
key: internal.kafka.brokers
- name: PROXIMO_PROBE_PORT
value: "8080"
- name: PROXIMO_PORT
value: "6868"
imagePullPolicy: Always
livenessProbe:
failureThreshold: 3
httpGet:
path: /__/ready
port: 8080
scheme: HTTP
initialDelaySeconds: 15
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 10
ports:
- containerPort: 6868
name: proximo
protocol: TCP
- containerPort: 8080
name: proximo-probe
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /__/ready
port: 8080
scheme: HTTP
initialDelaySeconds: 15
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 10
resources:
limits:
memory: 512Mi
---
根据这份清单进行部署。
$ kubectl apply -f kubernetes/proximo.yaml
service/proximo created
deployment.apps/proximo created
$ kubectl -n qiita get pods proximo-684b45f898-9vvvr
NAME READY STATUS RESTARTS AGE
proximo-684b45f898-9vvvr 1/1 Running 0 36s
通过Proximo接收的消息。
作为用例,我们预计从不同的命名空间中使用,但这一次我们想要将接收在应用程序内部上次实施的改为使用Proximo。
关于如何访问Proximo,substrate库在这里也很有用。
使用substrate/proximo包,我们定义initializeProximoSource()函数如下。
func initialiseProximoSource(addr, consumerID, topic *string, offsetOldest *bool) (substrate.SynchronousMessageSource, error) {
var proximoOffset proximo.Offset
if *offsetOldest {
proximoOffset = proximo.OffsetOldest
} else {
proximoOffset = proximo.OffsetNewest
}
source, err := proximo.NewAsyncMessageSource(proximo.AsyncMessageSourceConfig{
ConsumerGroup: *consumerID,
Topic: *topic,
Broker: *addr,
Offset: proximoOffset,
Insecure: true,
})
if err != nil {
return nil, err
}
return substrate.NewSynchronousMessageSource(source), nil
}
呼叫方需要进行如下更改。
proximoAddr := app.String(cli.StringOpt{
Desc: "proximo endpoint",
Name: "proximo-addr",
EnvVar: "PROXIMO_ADDR",
Value: "proximo:6868",
})
proximoOffsetOldest := app.Bool(cli.BoolOpt{
Name: "proximo-offset-oldest",
Desc: "If set to true, will start consuming from the oldest available messages",
EnvVar: "PROXIMO_OFFSET_OLDEST",
Value: true,
})
...
actionSource, err := initialiseProximoSource(proximoAddr, consumerID, actionTopic, proximoOffsetOldest)
if err != nil {
log.WithError(err).Fatalln("init action event kafka source")
}
defer actionSource.Close()
在上述的部分中,没有包含对 Kafka 的依赖,因此可以轻松实现后端系统而不需要担心是什么。
部署后,我们来进行调试。

gRPC成功了,并返回了新的Todo ID。
现在我们来查看数据库。
$ kubectl -n qiita exec -ti cockroachdb-0 -- /cockroach/cockroach sql --url postgresql://root@localhost:26257/qiita_advent_calendar_2019_db?sslmode=disable
...
root@localhost:26257/qiita_advent_calendar_2019_db> select * from todo where id = '71face5d-404e-4cf3-b831-dd33da5147a2';
id | title | description
+--------------------------------------+-----------+-----------------------------------------------+
71face5d-404e-4cf3-b831-dd33da5147a2 | Holidays! | Pack your items and prepare for the holidays!
(1 row)
Time: 1.328365ms
我已经成功地将数据保存到了数据库中。:)
因此,我們已經從啟動微服務,到使用gRPC、DB,再到使用Kafka進行非同步通信,講述了到這裡。
儘管還有一些日子剩下,但我們已經介紹完想要講的內容,所以我們將在這裡先結束主要內容。
由于还有一些其他话题我想提及,所以我打算以我的节奏来更新。非常感谢您阅读到这里 🙂
祝大家圣诞快乐并度过一个美好的新年!