使用 gRPC 连接 Kafka

假设有一天,我突然想到了这个想法:
「啊,我对卡夫卡有点腻了。还是用 NATS 吧。」

但是,如果其他团队要使用Kafka的话,我们必须要求他们进行修改。
他们可能很忙。这样就不容易换成NATS了。

在这种情况下,通过抽象化的消息系统接口让其代替直接访问Kafka,可以方便地隐藏内部实现。

这次介绍的是我们公司自家制作的 uw-labs/proximo 库(以及Go服务器的实现)。
在这个库中,我们使用protocol buffer定义了消息传递系统的接口。

    https://github.com/uw-labs/proximo/blob/master/proto/proximo.proto
syntax = "proto3";

package proximo;

message Message {
  bytes data = 1;
  string id = 2;
}

// Consumer types
service MessageSource {
  rpc Consume(stream ConsumerRequest) returns (stream Message) {
  }
}

message ConsumerRequest {
  // expected if this is a start request
  StartConsumeRequest startRequest = 2;
  // expected if this is a confirmation
  Confirmation confirmation = 3;
}

message StartConsumeRequest {
  string topic = 1;
  string consumer = 2;
  Offset initial_offset = 3;
}

enum Offset {
  OFFSET_DEFAULT = 0;
  OFFSET_NEWEST = 1;
  OFFSET_OLDEST = 2;
}

message Confirmation {
  string msgID = 1;
}

// Producer types
service MessageSink {
  rpc Publish(stream PublisherRequest) returns (stream Confirmation) {
  }
}

message PublisherRequest {
  // expected if this is a start request
  StartPublishRequest startRequest = 2;
  // expected if this is a message
  Message msg = 3;
}

message StartPublishRequest {
  string topic = 1;
}

请注意,我们使用双向流的RPC进行发送和接收。
通过准备实现该接口的gRPC服务器,我们可以隐藏用户对使用Kafka的事实。
由于存储库中提供了使用Go进行服务器实现和公共Docker映像,我们将使用它。

Proximo的部署。

使用以下的清单文件来部署将Kafka作为后端使用的proximo。

---
apiVersion: v1
kind: Service
metadata:
  annotations:
    prometheus.io/scrape: 'true'
    prometheus.io/path:   /__/metrics
    prometheus.io/port:   '8080'
  name: &app proximo
  namespace: &ns qiita
  labels:
    app: *app
spec:
  ports:
    - name: app
      port: 6868
      protocol: TCP
      targetPort: 6868
    - name: http
      port: 80
      protocol: TCP
      targetPort: 8080
  selector:
    app: *app
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: &app proximo
  name: *app
  namespace: &ns qiita
spec:
  replicas: 1
  selector:
    matchLabels:
      app: *app
  template:
    metadata:
      labels:
        app: *app
      namespace: *ns
    spec:
      containers:
        - name: *app
          image: quay.io/utilitywarehouse/proximo:latest
          args:
            - /proximo-server
            - kafka
          env:
            - name: PROXIMO_KAFKA_VERSION
              valueFrom:
                configMapKeyRef:
                  name: kafka-brokers
                  key: internal.kafka.broker.version
            - name: PROXIMO_KAFKA_BROKERS
              valueFrom:
                configMapKeyRef:
                  name: kafka-brokers
                  key: internal.kafka.brokers
            - name: PROXIMO_PROBE_PORT
              value: "8080"
            - name: PROXIMO_PORT
              value: "6868"
          imagePullPolicy: Always
          livenessProbe:
            failureThreshold: 3
            httpGet:
              path: /__/ready
              port: 8080
              scheme: HTTP
            initialDelaySeconds: 15
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          ports:
            - containerPort: 6868
              name: proximo
              protocol: TCP
            - containerPort: 8080
              name: proximo-probe
              protocol: TCP
          readinessProbe:
            failureThreshold: 3
            httpGet:
              path: /__/ready
              port: 8080
              scheme: HTTP
            initialDelaySeconds: 15
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          resources:
            limits:
              memory: 512Mi
---

根据这份清单进行部署。

$ kubectl apply -f kubernetes/proximo.yaml
service/proximo created
deployment.apps/proximo created

$ kubectl -n qiita get pods proximo-684b45f898-9vvvr
NAME                       READY   STATUS    RESTARTS   AGE
proximo-684b45f898-9vvvr   1/1     Running   0          36s

通过Proximo接收的消息。

作为用例,我们预计从不同的命名空间中使用,但这一次我们想要将接收在应用程序内部上次实施的改为使用Proximo。

关于如何访问Proximo,substrate库在这里也很有用。
使用substrate/proximo包,我们定义initializeProximoSource()函数如下。

func initialiseProximoSource(addr, consumerID, topic *string, offsetOldest *bool) (substrate.SynchronousMessageSource, error) {
    var proximoOffset proximo.Offset
    if *offsetOldest {
        proximoOffset = proximo.OffsetOldest
    } else {
        proximoOffset = proximo.OffsetNewest
    }

    source, err := proximo.NewAsyncMessageSource(proximo.AsyncMessageSourceConfig{
        ConsumerGroup: *consumerID,
        Topic:         *topic,
        Broker:        *addr,
        Offset:        proximoOffset,
        Insecure:      true,
    })
    if err != nil {
        return nil, err
    }
    return substrate.NewSynchronousMessageSource(source), nil
}

呼叫方需要进行如下更改。

proximoAddr := app.String(cli.StringOpt{
    Desc:   "proximo endpoint",
    Name:   "proximo-addr",
    EnvVar: "PROXIMO_ADDR",
    Value:  "proximo:6868",
})
proximoOffsetOldest := app.Bool(cli.BoolOpt{
    Name:   "proximo-offset-oldest",
    Desc:   "If set to true, will start consuming from the oldest available messages",
    EnvVar: "PROXIMO_OFFSET_OLDEST",
    Value:  true,
})

...

actionSource, err := initialiseProximoSource(proximoAddr, consumerID, actionTopic, proximoOffsetOldest)
if err != nil {
    log.WithError(err).Fatalln("init action event kafka source")
}
defer actionSource.Close()

在上述的部分中,没有包含对 Kafka 的依赖,因此可以轻松实现后端系统而不需要担心是什么。
部署后,我们来进行调试。

Screenshot 2019-12-17 at 14.54.05.png

gRPC成功了,并返回了新的Todo ID。
现在我们来查看数据库。

$ kubectl -n qiita exec -ti cockroachdb-0 -- /cockroach/cockroach sql --url postgresql://root@localhost:26257/qiita_advent_calendar_2019_db?sslmode=disable
...
root@localhost:26257/qiita_advent_calendar_2019_db> select * from todo where id = '71face5d-404e-4cf3-b831-dd33da5147a2';
                   id                  |   title   |                  description                   
+--------------------------------------+-----------+-----------------------------------------------+
  71face5d-404e-4cf3-b831-dd33da5147a2 | Holidays! | Pack your items and prepare for the holidays!  
(1 row)

Time: 1.328365ms

我已经成功地将数据保存到了数据库中。:)


因此,我們已經從啟動微服務,到使用gRPC、DB,再到使用Kafka進行非同步通信,講述了到這裡。
儘管還有一些日子剩下,但我們已經介紹完想要講的內容,所以我們將在這裡先結束主要內容。

由于还有一些其他话题我想提及,所以我打算以我的节奏来更新。非常感谢您阅读到这里 🙂

祝大家圣诞快乐并度过一个美好的新年!

广告
将在 10 秒后关闭
bannerAds