Spring BootではWebアプリを作ることが多いかもしれないが、Kafkaクライアントを開発することもできる。今回はspring-kafkaを使ってConsumerを作ってみる。
全体のコードは以下にある。
https://github.com/Udomomo/spring-boot-kafka-consumer
バージョン
Spring Boot: 2.4.2
spring-kafka: 2.6.5
spring-kafkaの仕組み
spring-kafkaによるKafka Consumerの処理は、@KafkaListener アノテーションを使うことで実装することができる。これを使うためには、KafkaListenerContainerFactory を作成するBeanを @Configuration アノテーションを付与したクラスで提供し、さらにそのクラスに @EnableKafka アノテーションを付与する。これにより MessageListenerContainer がDIコンテナに登録され、Kafka Consumerが自動で設定される。
application.yaml
公式ドキュメントのチュートリアルでは、Kafka Consumerの設定情報をコードの中で直接指定することでコード量を減らしているが、今回は業務で使うアプリケーションに近くなるよう、設定情報をapplication.yamlに切り出してみる。
application.yamlには、bootstrap-servers, group-id, topic の3つを記載する。(Deserializerはアプリケーションの実装と密接に関わるため、アプリケーション内で指定する)
kafka:
bootstrap-servers: http://localhost:9093
group-id: foo
topic: my-topic
KafkaListenerContainerFactoryの作成
application.yamlで記載した設定情報を使い、KafkaListenerContainerFactory を作成する。
まず、アプリケーションからapplication.yamlの情報を取得する。 @ConfigurationProperties とすることで、application.yaml内の設定値をPOJOにマッピングできる。また、prefix を指定することでyamlの階層構造を指定できる。
下の例では、 prefix = kafka と指定しているので、例えば bootstrapServers プロパティにapplication.yaml内の kafka.bootstrap-servers の値がマッピングされる。(大文字小文字やハイフン・アンダースコアなどの表記差は、Spring BootのRelaxed Binding機能によって良きに計らってくれる。詳しくは公式リファレンスを参照)
なお、設定値をマッピングして後で取得するにはgetter/setterメソッドが必要になる。今回はLombokの @Data アノテーションを使用してgetter/setterメソッドを作成している。
@ConfigurationProperties(prefix = "kafka")
@Data
public class KafkaSettings {
private String bootstrapServers;
private String groupId;
private String topic;
}
取得した設定情報を用いて、KafkaConsumerを設定する。まずDefaultKafkaConsumerFactory インスタンスを作成し、それを用いてConcurrentKafkaListenerContainerFactory インスタンスを作成する。このインスタンスは ConcurrentKafkaListenerContainer を生成するファクトリであり、これによって複数のtopicやpartitionからのメッセージをマルチスレッドで処理することも可能になる。
また、@EnableConfigurationPropertiesアノテーションを使うことで、@ConfigurationProperties を付与したPOJOをBeanとして登録できる。これにより、設定値をマッピングされたKafkaSettingsインスタンスを扱えるようになり、getterメソッドを呼び出して設定値を取得できるようになる。
@Configuration
@EnableKafka
@EnableConfigurationProperties({KafkaSettings.class})
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory(KafkaSettings settings) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(settings));
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaSettings settings) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, settings.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, settings.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}
最後に、 @KafkaListener アノテーションを使ってConsumerのメインロジックを実装する。ここではListenするtopicの名前を指定しなければいけないが、 topics = “${kafka.topic}” のようにSpELを使うことでapplication.yamlのtopicの値を直接呼び出している。この部分では groupId を指定することもできるが、今回はgroupIdは@ConfigurationProperties を通じて既に指定したのでそちらが使われる。
@Component
public class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic}")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("key: " + record.key() + ", value: " + record.value());
}
}
メッセージの送信
bitnami/kafkaを使ってKafka brokerを立てる。以下のようにすると、Dockerネットワーク内部からの接続は localhost:9092 で、外部からの接続は localhost:9093 で受け付けられる。
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:2
ports:
- "9093:9093"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
kafka cliからtopicを作り、メッセージを送ってみる。
$ docker-compose up -d
$ docker-compose exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092
$ docker-compose exec kafka /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=,"
>hello,hi
Spring Boot側でログが出れば成功。
key: hello, value: hi