まずBatchMessageListenerの実装を作成する。
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.stereotype.Component;
@Component
public class SampleBatchMessageListener implements BatchMessageListener<String, String> {
@KafkaListener(topics = "mytopic2")
@Override
public void onMessage(List<ConsumerRecord<String, String>> data) {
System.out.println(data.size() + ":" + data);
}
}
次にspring.kafka.listener.typeをBATCHに変更する。
spring.kafka.bootstrap-servers=localhost:32770
spring.kafka.consumer.group-id=sample-group
spring.kafka.listener.type=BATCH
これでspring-bootを起動して適当にpublishする。
5:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, offs(以下省略)
1:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, offset (以下省略)
3:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, o(以下省略)
一度にバラバラの件数でconsumeされているのが分かる。
以上で動作はするが、実運用では以下らへんのパラメータも指定しないとダメと思われる。バラバラの件数でメッセージが到着しているのを見ると、いわゆるRDBMSのコミット間隔とまったく同じように使えるというわけでもない。
spring.kafka.consumer.fetch-max-wait
spring.kafka.consumer.fetch-min-size
spring.kafka.consumer.max-poll-records
バッチ処理的な発想はkafkaでも有効だが、kafka固有の事情は考慮が必要と思われる。
ドキュメント https://docs.spring.io/spring-kafka/docs/current/reference/html/#message-listeners