※Kafkaに関する情報は下記のURLから参照してください。
https://kafka.apache.org/intro

必要ライブラリ

    • kafka-clients-3.2.0

 

    • slf4j-api-1.7.36

 

    slf4j-simple-1.7.36

事前準備

    • Kafkaの起動

 

    • 下記のコマンドで立ち上げを行います。

 

    • /bin/zookeeper-server-start.sh /config/zookeeper.properties

Kafka brokcerの起動
下記のコマンドで立ち上げを行います。
/bin/kafka-server-start.sh /config/server.properties

Coding

Kafkaのプロパティを定義します。
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
“KafkaSimpleConsumer”);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

ConsumerConfig.BOOTSTRAP_SERVERS_CONFIGはKafkaのサーバ情報を記載します。

ConsumerConfig.GROUP_ID_CONFIGはコンシュマーのグループIDを記載します。

コンシューマーを生成します。
final Consumer<String, String> consumer =
new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());

受け付けるTopicを設定します。
List topics = new ArrayList<>();

topics.add(TOPIC);
consumer.subscribe(topics);

イベントが発行がトリガーになるので、無限ループでイベントを待ち受けます。その際に上限リトライを設定します。
final int giveUp = 100;
int noRecordsCount = 0;

while (true) {

Kafkaからイベントを取得します。今回の取得間隔は1000ミリ秒に設定します。
final ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofMillis(1000));

取得されたイベントをコンソールに出力します。
consumerRecords.forEach(record -> {
System.out.printf(“Consumer Record:(%s, %s, %d, %s, %s)\n”,
record.key(), record.value(),
record.partition(), record.offset(), record.topic());
});

Kafkaサーバと同期を取ります。
consumer.commitAsync();

これ以上、取得するイベントがない際には接続をクローズします。
consumer.close();

参考

下記のレポジトリにソースコードを確認いただけます。
https://github.com/dlstjq7685/KafkaSimpleConsumerTuttorial

また、前の記事のKafka Javaチュートリアル -プロデューサー-の続きになるので
合わせて見ていただければと思います。

ソースコードはkafkaのJavaDocの内容に沿って作成しております。
あくまで参考程度で書いたものですので、
その他の設定については別記事を参考にしてください。

广告
将在 10 秒后关闭
bannerAds