まずBatchMessageListenerの実装を作成する。

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.stereotype.Component;

@Component
public class SampleBatchMessageListener implements BatchMessageListener<String, String> {
    @KafkaListener(topics = "mytopic2")
    @Override
    public void onMessage(List<ConsumerRecord<String, String>> data) {
        System.out.println(data.size() + ":" + data);
    }
}

次にspring.kafka.listener.typeをBATCHに変更する。

spring.kafka.bootstrap-servers=localhost:32770
spring.kafka.consumer.group-id=sample-group
spring.kafka.listener.type=BATCH

これでspring-bootを起動して適当にpublishする。

5:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, offs(以下省略)
1:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, offset (以下省略)
3:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, o(以下省略)

一度にバラバラの件数でconsumeされているのが分かる。

以上で動作はするが、実運用では以下らへんのパラメータも指定しないとダメと思われる。バラバラの件数でメッセージが到着しているのを見ると、いわゆるRDBMSのコミット間隔とまったく同じように使えるというわけでもない。

spring.kafka.consumer.fetch-max-wait
spring.kafka.consumer.fetch-min-size
spring.kafka.consumer.max-poll-records

バッチ処理的な発想はkafkaでも有効だが、kafka固有の事情は考慮が必要と思われる。

ドキュメント https://docs.spring.io/spring-kafka/docs/current/reference/html/#message-listeners

广告
将在 10 秒后关闭
bannerAds