我尝试使用kafka-node的Consumer

首先

由于有机会稍微使用了kafka-node,我将其总结为备忘录。
顺便提一下,我也试了node-rdkafka,但由于我是在使用Windows机器进行开发,无法编译成功,所以我放弃了。

消费者类型 zhě

消费者共有四种类型,其功能差异如下。
如果暂时只是希望给予我最好的选择,请使用ConsumerGroupStream。

名前Stream機能Group機能Consumer××ConsumerStream○×ConsumerGroup×○ConsumerGroupStream○○

流媒体功能

据我了解,StreamAPI已经实现了缓冲功能。很可能效率更高。
需要注意的是,与普通的Consumer不同,“data”而不是“message”用于事件通知。

群组功能

没有Group功能的Consumer需要显式指定要读取的分区。(如果不指定,默认只读取0号分区,其他分区将不被读取)
在大多数情况下,我认为用户希望在使用多个分区和多个消费者进行扩展时,能够自动分配分区并达到良好的效果,所以会选择这个选项。
相反,如果有要求要将特定分区的处理交给特定的消费者,则需要使用不带Group功能的选项。

试用一下看看

准备一本卡夫卡的书籍

请使用docker快速准备,在具有3个分区的test主题中随意放入一些消息。

docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 --name test_kafka spotify/kafka
docker exec -it test_kafka bash
cd /opt/kafka*
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> message1
> message2
> message3
> message4
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

确定Docker machine的IP地址

因为是在Docker Machine上运行,所以在从本地连接时需要先确认IP地址。

docker-machine ip
# 192.168.99.100

使用ConsumerGroupStream进行加载

mkdir kafka-test
cd kafka-test
npm init -f
npm i kafka-node
touch test.js
const kafka = require('kafka-node')

const consumer = new kafka.ConsumerGroupStream({
  kafkaHost: '192.168.99.100:9092',
  groupId: 'TestGroup',
  fromOffset: 'earliest'
}, 'test')

consumer.on('data', message => {
  console.log(message)
})
$ node test.js
{ topic: 'test',
  value: 'message3',
  offset: 0,
  partition: 0,
  highWaterOffset: 1,
  key: null,
  timestamp: 2019-04-07T16:56:45.611Z }
{ topic: 'test',
  value: 'message2',
  offset: 0,
  partition: 1,
  highWaterOffset: 1,
  key: null,
  timestamp: 2019-04-07T16:56:42.667Z }
{ topic: 'test',
  value: 'message1',
  offset: 0,
  partition: 2,
  highWaterOffset: 2,
  key: null,
  timestamp: 2019-04-07T16:56:40.005Z }
{ topic: 'test',
  value: 'message4',
  offset: 1,
  partition: 2,
  highWaterOffset: 2,
  key: null,
  timestamp: 2019-04-07T16:56:48.987Z }

以上就是全部内容。

广告
将在 10 秒后关闭
bannerAds