在kafka的Ruby客户端phobos中,快速启动消费者(*也可作为生产者*)
我想要更轻松地处理Kafka,所以我在寻找一些好用的Rubygems。但是常常被推荐的RubyGems大多数都是与Rails的集成前提相关的。
只是想要订阅主题并轻松地进行分发,这个目的可能有点繁重。
在这其中,我找到了Phobos。只需几行代码就能创建一个Consumer,感觉非常开心。
- phobos/phobos: Simplifying Kafka for ruby apps
这样的话,我甚至可以像写脚本一样立即做出反应。所以我来介绍一下。
准备用于测试的Kafka-broker。
从这里开始引用,并在本地运行kafka-broker。
- https://docs.confluent.io/current/installation/docker/docs/config-reference.html
稍微调整一下端口和环境变量。
和动物园管理员一起,
$ docker run -d --rm \
-p 2181:2181 \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
-e ZOOKEEPER_TICK_TIME=2000 \
-e ZOOKEEPER_SYNC_LIMIT=2 \
confluentinc/cp-zookeeper:5.1.0
我是经纪人。
$ docker run -d --rm \
-p 9092:9092 \
--name=kafka \
--link=zookeeper \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false \
confluentinc/cp-kafka:5.1.0
如果使用docker-compose的话,会是这样的感觉。
---
version: "3.1"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.0
ports:
- 2181:2181
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_SYNC_LIMIT=2
kafka:
image: confluentinc/cp-kafka:5.1.0
ports:
- 9092:9092
links:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_BROKER_ID=2
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
现在,我们已经准备好了kafka-broker,让我们来谈谈phobos吧。
Phobos的安装设置
安装Rubygems的phobos。
$ bundle init
$ echo "gem 'phobos'" >> Gemfile
$ bundle install --binstubs --path vendor/bundle
我会确认是否会出现帮助提示。
$ ./bin/phobos
Commands:
phobos help [COMMAND] # Describe available commands or one specific command
phobos init # Initialize your project with Phobos
phobos start # Starts Phobos
phobos version # Outputs the version number. Can be used with: phobos -v or phobos --version
让我们使用phobos init来准备初始文件。
$ ./bin/phobos init
create config/phobos.yml
create phobos_boot.rb
在这个时候,你已经非常友善了。
请简单确认一下phobos的运行情况。
将整体配置写入config/phobos.yml。
-
- 接続情報などグローバルな設定
-
- Producer/Consumerのデフォルト設定
- Consumerの購読先(複数可)とリアクション
我稍微看一下由init精心制作的config/phobos.yml文件。
listeners:
- handler: Phobos::EchoHandler
topic: test
]测试’这个主题的订阅,定义了一个内置处理程序Phobos::EchoHandler来处理。
如果kafka-broker已在本地启动,就不需要进行任何更改,请尝试使用phobos start来启动。
$ ./bin/phobos start
______ _ _
| ___ \ | | |
| |_/ / |__ ___ | |__ ___ ___
| __/| '_ \ / _ \| '_ \ / _ \/ __|
| | | | | | (_) | |_) | (_) \__ \
\_| |_| |_|\___/|_.__/ \___/|___/
phobos_boot.rb - find this file at /Users/sawanoboriyu/develop/src/sandbox/qiita_ruby-phobos/phobos_boot.rb
[2019-02-03T18:11:59:345+0900Z] INFO -- Phobos : <Hash> {:message=>"Phobos configured", :env=>"N/A"}
[2019-02-03T18:11:59:374+0900Z] INFO -- Phobos : <Hash> {:message=>"Listener started", :listener_id=>"442f72", :group_id=>"test-1", :topic=>"test", :handler=>"Phobos::EchoHandler"}
你想尝试一些东西吗?这是从最后一行可以看出来,作为一个关于”test”主题的消费者在运作。
让我们使用kafkacat进行数据流入。我们可以使用Homebrew之类的工具来安装它。
- edenhill/kafkacat: Generic command line non-JVM Apache Kafka producer and consumer
实际上,kafkacat已经可以作为一个休闲的消费者进行操作,不过暂时先不管它。
$ echo aa | kafkacat -P -b localhost -t test
Phobos::EchoHandler是一个简单的处理程序,它只是将接收到的消息记录在日志中。确实收到了消息:message=>”aa”。
$ ./bin/phobos start
______ _ _
| ___ \ | | |
| |_/ / |__ ___ | |__ ___ ___
| __/| '_ \ / _ \| '_ \ / _ \/ __|
# -- snip --
[2019-02-03T18:12:14:481+0900Z] INFO -- Phobos : <Hash> {:message=>"aa", :listener_id=>"442f72", :group_id=>"test-1", :topic=>"test", :handler=>"Phobos::EchoHandler", :key=>nil, :partition=>0, :offset=>0, :retry_count=>0}
创建处理程序来处理消息。
我们来看一下由 phobos init 创建的另一个文件,phobos_boot.rb。
只能放置不管。
# Use this file to load your code
puts <<~ART
______ _ _
| ___ \\ | | |
| |_/ / |__ ___ | |__ ___ ___
| __/| '_ \\ / _ \\| '_ \\ / _ \\/ __|
| | | | | | (_) | |_) | (_) \\__ \\
\\_| |_| |_|\\___/|_.__/ \\___/|___/
ART
puts "
phobos_boot.rb - find this file at #{File.expand_path(__FILE__)}
"
这是phobos默认引用的入口点。如果想要做些什么,可以继续在这里添加。
暂时将以下代码作为替代Phobos::Handler使用的处理器写入,请确保包含#consume方法即可。我已经这样写了一个处理器,名为MyHandler,它只是将内容打印到标准输出中。
class MyHandler
include Phobos::Handler
def consume(payload, metadata)
puts 'your message is ' + payload
end
end
将在config/phobos.yml文件中更改配置以使用此处理程序。
listeners:
- handler: MyHandler
# handler: Phobos::EchoHandler
topic: test
我会启动Phobos,并且试着在背后注入消息。
$ ./bin/phobos start
______ _ _
| ___ \ | | |
| |_/ / |__ ___ | |__ ___ ___
| __/| '_ \ / _ \| '_ \ / _ \/ __|
| | | | | | (_) | |_) | (_) \__ \
\_| |_| |_|\___/|_.__/ \___/|___/
phobos_boot.rb - find this file at /Users/sawanoboriyu/develop/src/sandbox/qiita_ruby-phobos/phobos_boot.rb
[2019-02-03T18:45:52:276+0900Z] INFO -- Phobos : <Hash> {:message=>"Phobos configured", :env=>"N/A"}
[2019-02-03T18:45:52:287+0900Z] INFO -- Phobos : <Hash> {:message=>"Listener started", :listener_id=>"b59783", :group_id=>"test-1", :topic=>"test", :handler=>"MyHandler"}
# 裏で `echo aa | kafkacat -P -b localhost -t test`
your message is aa
太好了。
附带一提,如果包括Phobos::Producer,您可以将消息流传到任何主题。这也很简单,请试试吧。
这个例外怎么样呢?
当Consumer处理时出现异常时,将使用简单的Backoff算法进行随机等待后进行重试。
但是,由于ConsumerGroup的机制,它好像记得处理了多少内容,导致对此消息的重试会一直重复。
因此,我们需要快速地将日志记录下来并结束,或者将其通知或将有效负载发送到用于重试的主题等,以便能够处理下一个任务。
结束
当我发现kafka如此易于操作时,我感觉它变得更加贴近我了。直觉上说,它的轻松性与redis相当。对于将其嵌入到现有的某个东西中并以库的方式使用,我认为使用方式类似于amqp + bunny。
Phobos本身能够通过一个参数在线程中进行并行处理,也可以单独处理多个监听器,非常方便。此外,它拥有很好的group_id和kafka机制,因此可以轻松进行扩展。