我尝试使用kafka-node的Consumer
首先
由于有机会稍微使用了kafka-node,我将其总结为备忘录。
顺便提一下,我也试了node-rdkafka,但由于我是在使用Windows机器进行开发,无法编译成功,所以我放弃了。
消费者类型 zhě
消费者共有四种类型,其功能差异如下。
如果暂时只是希望给予我最好的选择,请使用ConsumerGroupStream。
名前Stream機能Group機能Consumer××ConsumerStream○×ConsumerGroup×○ConsumerGroupStream○○
流媒体功能
据我了解,StreamAPI已经实现了缓冲功能。很可能效率更高。
需要注意的是,与普通的Consumer不同,“data”而不是“message”用于事件通知。
群组功能
没有Group功能的Consumer需要显式指定要读取的分区。(如果不指定,默认只读取0号分区,其他分区将不被读取)
在大多数情况下,我认为用户希望在使用多个分区和多个消费者进行扩展时,能够自动分配分区并达到良好的效果,所以会选择这个选项。
相反,如果有要求要将特定分区的处理交给特定的消费者,则需要使用不带Group功能的选项。
试用一下看看
准备一本卡夫卡的书籍
请使用docker快速准备,在具有3个分区的test主题中随意放入一些消息。
docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 --name test_kafka spotify/kafka
docker exec -it test_kafka bash
cd /opt/kafka*
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> message1
> message2
> message3
> message4
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
确定Docker machine的IP地址
因为是在Docker Machine上运行,所以在从本地连接时需要先确认IP地址。
docker-machine ip
# 192.168.99.100
使用ConsumerGroupStream进行加载
mkdir kafka-test
cd kafka-test
npm init -f
npm i kafka-node
touch test.js
const kafka = require('kafka-node')
const consumer = new kafka.ConsumerGroupStream({
kafkaHost: '192.168.99.100:9092',
groupId: 'TestGroup',
fromOffset: 'earliest'
}, 'test')
consumer.on('data', message => {
console.log(message)
})
$ node test.js
{ topic: 'test',
value: 'message3',
offset: 0,
partition: 0,
highWaterOffset: 1,
key: null,
timestamp: 2019-04-07T16:56:45.611Z }
{ topic: 'test',
value: 'message2',
offset: 0,
partition: 1,
highWaterOffset: 1,
key: null,
timestamp: 2019-04-07T16:56:42.667Z }
{ topic: 'test',
value: 'message1',
offset: 0,
partition: 2,
highWaterOffset: 2,
key: null,
timestamp: 2019-04-07T16:56:40.005Z }
{ topic: 'test',
value: 'message4',
offset: 1,
partition: 2,
highWaterOffset: 2,
key: null,
timestamp: 2019-04-07T16:56:48.987Z }
以上就是全部内容。