尝试使用gRPC和Apache Kafka
本文是富士通云技术2017年圣诞节日历的第7天。
昨天在 @makky05 的「FJCT兼職~回顧感人片段~」活動中,真是非常有趣興奮的業務內容啊。希望能再次啟動「透過Wi-Fi連接狀況尋找員工位置的服務」。另外,真是讓人驚嘆的一年,成長得非常盡興啊!
简要介绍
我想在这次尝试中使用gRPC和Apache Kafka。具体来说,我想使用Kafka-Pixy,它可以作为Apache Kafka的gRPC代理,从.proto文件生成代码,并将消息发送到Apache Kafka,并获取消息。
gRPC是谷歌于2015年2月发布的一种RPC框架。您可以使用IDL编写.proto文件,并从该文件生成各种语言的服务器和客户端所需的源代码模板。此外,它使用HTTP/2进行通信。
关于gRPC有很多简单易懂的文章,我建议您参考这些文章。
什么是gRPC?
gRPC是什么?
当你在设计REST API时感到困惑时,来学习一下gRPC吧
我了解了一下ProtocolBuffers
Apache Kafka是LinkedIn公開的一個分散消息系統的開源軟件,旨在以低延遲和高吞吐量的方式收集和傳遞大量數據,它是一個Pull型的发布/订阅系统。
在Kafka的官方網站上有詳細的介紹,你可以在那裡找到很多易懂的文章。
以下是我閱讀Kafka官網上的介紹所做的筆記。
Apache Kafka ―從入門到使用Trifecta進行可視化―
如果你想進一步了解分布式消息中間件的詳細對比,也可以參考一些相關的文章。
关于Kafka-Pixy的信息
Kafka-Pixy是由Mailgun团队开发的用于Apache Kafka的gRPC和REST API的代理。
这次我们将使用Kafka-Pixy来尝试使用gRPC连接Apache Kafka。
该代理的接口是在kafkapixy.proto中定义的,您可以从这个.proto文件中生成使用Kafka-Pixy的客户端代码。
通过使用Kafka-Pixy作为中间代理,您可以使用生成的代码,从而减少在使用Kafka时客户端实现的成本。
动作环境
$ cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
$ java -version
openjdk version "9-ea"
OpenJDK Runtime Environment (build 9-ea+163)
OpenJDK 64-Bit Server VM (build 9-ea+163, mixed mode)
$ python --version
Python 3.6.2
Apache Kafka 是一种开源分布式流处理平台。
- インストール(基本的には こちら を参考に)
$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz
$ tar -zxf kafka_2.12-1.0.0.tgz
- とりあえずデフォルトの設定のまま起動
$ cd kafka_2.12-1.0.0
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
- とりあえずtopicを作成してみる
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
- メッセージを入れてみる
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
message hoge
- メッセージを取得してみる
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
message hoge
gRPC (谷啦啦帕西)
今次我想在Python上尝试一下。我会参考Python Quickstart。
$ python -m pip install grpcio
$ python -m pip install grpcio-tools
卡夫卡-皮克西
howto-install.md を参考に
$ curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.14.0/kafka-pixy-v0.14.0-linux-amd64.tar.gz | tar xz
$ cd kafka-pixy-v0.14.0-linux-amd64
$ cp default.yaml config.yaml
#とりあえずデフォルトのまま起動
$ ./kafka-pixy --config config.yaml
从.proto文件生成代码
在 kafka-pixy-v0.14.0-linux-amd64 中有 kafkapixy.proto 文件,我认为可以使用它来生成代码。
这次先创建一个 sample 目录,并在其中生成代码。
$ python -m grpc_tools.protoc -I./kafka-pixy-v0.14.0-linux-amd64 --python_out=./sample --grpc_python_out=./sample ./kafka-pixy-v0.14.0-linux-amd64/kafkapixy.proto
然后将会生成两个文件,即kafkapixy_pb2.py和kafkapixy_pb2_grpc.py。
顺带提一下,如 quick-start-python.md 所述,由于 Kafka-Pixy 已经生成了 kafkapixy_pb2.py 和 kafkapixy_pb2_grpc.py 这两个文件,您可以直接复制并使用它们,非常安全可靠。
使用生成的代码向Kafka投递和获取消息。
使用上述创建的 kafkapixy_pb2.py 和 kafkapixy_pb2_grpc.py,在Apache Kafka中进行消息投递和获取。
这次只编写了投递和获取消息的代码。
消息输入代码
import grpc
from kafkapixy_pb2 import ProdRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys
grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)
def produce(kafkapixy_client ,topic, msg):
rq = ProdRq(topic=topic, message=msg)
rs = kafkapixy_client.Produce(rq)
return rs
def main():
topic = sys.argv[1]
msg = bytes(sys.argv[2], encoding="utf-8")
produce(kafkapixy_client, topic, msg)
if __name__ == "__main__":
main()
获取消息的代码
import grpc
from kafkapixy_pb2 import ConsNAckRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys
grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)
def consume(kafkapixy_client, group, topic):
ack_partition = None
ack_offset = None
rq = ConsNAckRq(topic=topic, group=group)
keep_running = True
while keep_running:
if ack_offset is None:
rq.no_ack = True
rq.ack_partition = 0
rq.ack_offset = 0
else:
rq.no_ack = False
rq.ack_partition = ack_partition
rq.ack_offset = ack_offset
try:
rs = kafkapixy_client.ConsumeNAck(rq)
except grpc.RpcError as err:
if err.code() == grpc.StatusCode.NOT_FOUND:
ack_offset = None
continue
else:
print(err.result)
continue
try:
ack_partition = rs.partition
ack_offset = rs.offset
except:
ack_offset = None
print(rs.message)
ack_partition = rs.partition
ack_offset = rs.offset
def main():
topic = sys.argv[1]
group = sys.argv[2]
consume(kafkapixy_client, group, topic)
if __name__ == "__main__":
main()
我們試著實際投入和取得訊息。
默认情况下,当添加新的群组时,它会从后续添加的消息中获取。因此,这次我们先执行获取消息的脚本(如果群组不存在则创建),然后再投入消息。(默认情况下,消费者设置为auto.offset.reset=latest)
- メッセージ取得用のスクリプトを実行しておく
#test-topicからgroup_idにtest-consumer-groupを指定してメッセージを取得
$ python sample_consumer.py test-topic test-consumer-group
- メッセージを投入してみる
#test-topicにhogehoge1というメッセージを投入
$ python sample_producer.py test-topic hogehoge1
- 結果
$ python sample_consumer.py test-topic test-consumer-group
b'hogehoge1'
我能够获取到无事投入的消息。
赠品
- Kafka-PixyはREST APIも対応していて、以下のようにメッセージを取得することも可能です
$ curl "http://localhost:19092/topics/test-topic/messages?group=test-consumer-group"
{
"key": null,
"value": "aGVsbG8=",
"partition": 0,
"offset": 64
}
#keyとvalueはbase64エンコードされた値となります
- gRPC, REST APIのlistenポートはKafka-Pixyの設定ファイルで以下のように定義されます
# TCP address that gRPC API server should listen on.
grpc_addr: 0.0.0.0:19091
# TCP address that RESTful API server should listen on.
tcp_addr: 0.0.0.0:19092
总结
我使用Kafka-Pixy和gRPC尝试了使用Apache Kafka。由于Kafka-Pixy的.proto文件可以根据客户端使用的语言生成源代码,所以可以很容易地实现客户端的功能。在gRPC中,服务器端也可以使用从.proto文件生成的代码,并且可以强制将API规范明确化到.proto文件中等优点。未来,我也想尝试使用gRPC来实现服务器端的功能。虽然关于Kafka的内容变得较少,但我认为已经写出了一种快速使用的方法。如果您有其他意见或指正的话,我会非常感谢您的评论。
明天是 @ntoofu 先生的“建立测试IaaS基础设施环境”的演讲。非常有趣啊。