尝试使用gRPC和Apache Kafka

本文是富士通云技术2017年圣诞节日历的第7天。

昨天在 @makky05 的「FJCT兼職~回顧感人片段~」活動中,真是非常有趣興奮的業務內容啊。希望能再次啟動「透過Wi-Fi連接狀況尋找員工位置的服務」。另外,真是讓人驚嘆的一年,成長得非常盡興啊!

简要介绍

我想在这次尝试中使用gRPC和Apache Kafka。具体来说,我想使用Kafka-Pixy,它可以作为Apache Kafka的gRPC代理,从.proto文件生成代码,并将消息发送到Apache Kafka,并获取消息。

gRPC是谷歌于2015年2月发布的一种RPC框架。您可以使用IDL编写.proto文件,并从该文件生成各种语言的服务器和客户端所需的源代码模板。此外,它使用HTTP/2进行通信。

关于gRPC有很多简单易懂的文章,我建议您参考这些文章。

什么是gRPC?
gRPC是什么?
当你在设计REST API时感到困惑时,来学习一下gRPC吧
我了解了一下ProtocolBuffers

Apache Kafka是LinkedIn公開的一個分散消息系統的開源軟件,旨在以低延遲和高吞吐量的方式收集和傳遞大量數據,它是一個Pull型的发布/订阅系统。

在Kafka的官方網站上有詳細的介紹,你可以在那裡找到很多易懂的文章。

以下是我閱讀Kafka官網上的介紹所做的筆記。

Apache Kafka ―從入門到使用Trifecta進行可視化―

如果你想進一步了解分布式消息中間件的詳細對比,也可以參考一些相關的文章。

关于Kafka-Pixy的信息

Kafka-Pixy是由Mailgun团队开发的用于Apache Kafka的gRPC和REST API的代理。
这次我们将使用Kafka-Pixy来尝试使用gRPC连接Apache Kafka。
该代理的接口是在kafkapixy.proto中定义的,您可以从这个.proto文件中生成使用Kafka-Pixy的客户端代码。
通过使用Kafka-Pixy作为中间代理,您可以使用生成的代码,从而减少在使用Kafka时客户端实现的成本。

动作环境

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core)

$ java -version
openjdk version "9-ea"
OpenJDK Runtime Environment (build 9-ea+163)
OpenJDK 64-Bit Server VM (build 9-ea+163, mixed mode)

$ python --version
Python 3.6.2

Apache Kafka 是一种开源分布式流处理平台。

    インストール(基本的には こちら を参考に)
$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz 
$ tar -zxf kafka_2.12-1.0.0.tgz
    とりあえずデフォルトの設定のまま起動
$ cd kafka_2.12-1.0.0
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
    とりあえずtopicを作成してみる
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
    メッセージを入れてみる
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
message hoge
    メッセージを取得してみる
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
message hoge

gRPC (谷啦啦帕西)

今次我想在Python上尝试一下。我会参考Python Quickstart。

$ python -m pip install grpcio
$ python -m pip install grpcio-tools

卡夫卡-皮克西

howto-install.md を参考に

$ curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.14.0/kafka-pixy-v0.14.0-linux-amd64.tar.gz | tar xz
$ cd kafka-pixy-v0.14.0-linux-amd64
$ cp default.yaml config.yaml
#とりあえずデフォルトのまま起動
$ ./kafka-pixy --config config.yaml

从.proto文件生成代码

在 kafka-pixy-v0.14.0-linux-amd64 中有 kafkapixy.proto 文件,我认为可以使用它来生成代码。
这次先创建一个 sample 目录,并在其中生成代码。

$ python -m grpc_tools.protoc -I./kafka-pixy-v0.14.0-linux-amd64 --python_out=./sample --grpc_python_out=./sample ./kafka-pixy-v0.14.0-linux-amd64/kafkapixy.proto

然后将会生成两个文件,即kafkapixy_pb2.py和kafkapixy_pb2_grpc.py。

顺带提一下,如 quick-start-python.md 所述,由于 Kafka-Pixy 已经生成了 kafkapixy_pb2.py 和 kafkapixy_pb2_grpc.py 这两个文件,您可以直接复制并使用它们,非常安全可靠。

使用生成的代码向Kafka投递和获取消息。

使用上述创建的 kafkapixy_pb2.py 和 kafkapixy_pb2_grpc.py,在Apache Kafka中进行消息投递和获取。
这次只编写了投递和获取消息的代码。

消息输入代码

import grpc
from kafkapixy_pb2 import ProdRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys

grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)

def produce(kafkapixy_client ,topic, msg):
    rq = ProdRq(topic=topic, message=msg)
    rs = kafkapixy_client.Produce(rq)
    return rs

def main():
    topic = sys.argv[1]
    msg = bytes(sys.argv[2], encoding="utf-8")
    produce(kafkapixy_client, topic, msg)

if __name__ == "__main__":
    main()

获取消息的代码

import grpc
from kafkapixy_pb2 import ConsNAckRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys

grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)

def consume(kafkapixy_client, group, topic):

    ack_partition = None
    ack_offset = None
    rq = ConsNAckRq(topic=topic, group=group)
    keep_running = True
    while keep_running:
        if ack_offset is None:
            rq.no_ack = True
            rq.ack_partition = 0
            rq.ack_offset = 0
        else:
            rq.no_ack = False
            rq.ack_partition = ack_partition
            rq.ack_offset = ack_offset

        try:
            rs = kafkapixy_client.ConsumeNAck(rq)
        except grpc.RpcError as err:
            if err.code() == grpc.StatusCode.NOT_FOUND:
                ack_offset = None
                continue
            else:
                print(err.result)
                continue

        try:
            ack_partition = rs.partition
            ack_offset = rs.offset
        except:
            ack_offset = None

        print(rs.message)
        ack_partition = rs.partition
        ack_offset = rs.offset

def main():
    topic = sys.argv[1]
    group = sys.argv[2]
    consume(kafkapixy_client, group, topic) 

if __name__ == "__main__":
    main()

我們試著實際投入和取得訊息。

默认情况下,当添加新的群组时,它会从后续添加的消息中获取。因此,这次我们先执行获取消息的脚本(如果群组不存在则创建),然后再投入消息。(默认情况下,消费者设置为auto.offset.reset=latest)

    メッセージ取得用のスクリプトを実行しておく
#test-topicからgroup_idにtest-consumer-groupを指定してメッセージを取得
$ python sample_consumer.py test-topic test-consumer-group
    メッセージを投入してみる
#test-topicにhogehoge1というメッセージを投入
$ python sample_producer.py test-topic hogehoge1
    結果
$ python sample_consumer.py test-topic test-consumer-group
b'hogehoge1'

我能够获取到无事投入的消息。

赠品

    Kafka-PixyはREST APIも対応していて、以下のようにメッセージを取得することも可能です
$ curl "http://localhost:19092/topics/test-topic/messages?group=test-consumer-group"
{
  "key": null,
  "value": "aGVsbG8=",
  "partition": 0,
  "offset": 64
}
#keyとvalueはbase64エンコードされた値となります
    gRPC, REST APIのlistenポートはKafka-Pixyの設定ファイルで以下のように定義されます
# TCP address that gRPC API server should listen on.
grpc_addr: 0.0.0.0:19091

# TCP address that RESTful API server should listen on.
tcp_addr: 0.0.0.0:19092

总结

我使用Kafka-Pixy和gRPC尝试了使用Apache Kafka。由于Kafka-Pixy的.proto文件可以根据客户端使用的语言生成源代码,所以可以很容易地实现客户端的功能。在gRPC中,服务器端也可以使用从.proto文件生成的代码,并且可以强制将API规范明确化到.proto文件中等优点。未来,我也想尝试使用gRPC来实现服务器端的功能。虽然关于Kafka的内容变得较少,但我认为已经写出了一种快速使用的方法。如果您有其他意见或指正的话,我会非常感谢您的评论。

明天是 @ntoofu 先生的“建立测试IaaS基础设施环境”的演讲。非常有趣啊。

广告
将在 10 秒后关闭
bannerAds