Spring BootではWebアプリを作ることが多いかもしれないが、Kafkaクライアントを開発することもできる。今回はspring-kafkaを使ってConsumerを作ってみる。
全体のコードは以下にある。
https://github.com/Udomomo/spring-boot-kafka-consumer

バージョン

Spring Boot: 2.4.2
spring-kafka: 2.6.5

spring-kafkaの仕組み

spring-kafkaによるKafka Consumerの処理は、@KafkaListener アノテーションを使うことで実装することができる。これを使うためには、KafkaListenerContainerFactory を作成するBeanを @Configuration アノテーションを付与したクラスで提供し、さらにそのクラスに @EnableKafka アノテーションを付与する。これにより MessageListenerContainer がDIコンテナに登録され、Kafka Consumerが自動で設定される。

application.yaml

公式ドキュメントのチュートリアルでは、Kafka Consumerの設定情報をコードの中で直接指定することでコード量を減らしているが、今回は業務で使うアプリケーションに近くなるよう、設定情報をapplication.yamlに切り出してみる。
application.yamlには、bootstrap-servers, group-id, topic の3つを記載する。(Deserializerはアプリケーションの実装と密接に関わるため、アプリケーション内で指定する)

kafka:
  bootstrap-servers: http://localhost:9093
  group-id: foo
  topic: my-topic

KafkaListenerContainerFactoryの作成

application.yamlで記載した設定情報を使い、KafkaListenerContainerFactory を作成する。
まず、アプリケーションからapplication.yamlの情報を取得する。 @ConfigurationProperties とすることで、application.yaml内の設定値をPOJOにマッピングできる。また、prefix を指定することでyamlの階層構造を指定できる。
下の例では、 prefix = kafka と指定しているので、例えば bootstrapServers プロパティにapplication.yaml内の kafka.bootstrap-servers の値がマッピングされる。(大文字小文字やハイフン・アンダースコアなどの表記差は、Spring BootのRelaxed Binding機能によって良きに計らってくれる。詳しくは公式リファレンスを参照)
なお、設定値をマッピングして後で取得するにはgetter/setterメソッドが必要になる。今回はLombokの @Data アノテーションを使用してgetter/setterメソッドを作成している。

@ConfigurationProperties(prefix = "kafka")
@Data
public class KafkaSettings {
  private String bootstrapServers;
  private String groupId;
  private String topic;
}

取得した設定情報を用いて、KafkaConsumerを設定する。まずDefaultKafkaConsumerFactory インスタンスを作成し、それを用いてConcurrentKafkaListenerContainerFactory インスタンスを作成する。このインスタンスは ConcurrentKafkaListenerContainer を生成するファクトリであり、これによって複数のtopicやpartitionからのメッセージをマルチスレッドで処理することも可能になる。
また、@EnableConfigurationPropertiesアノテーションを使うことで、@ConfigurationProperties を付与したPOJOをBeanとして登録できる。これにより、設定値をマッピングされたKafkaSettingsインスタンスを扱えるようになり、getterメソッドを呼び出して設定値を取得できるようになる。

@Configuration
@EnableKafka
@EnableConfigurationProperties({KafkaSettings.class})
public class KafkaConfig {
  @Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
  kafkaListenerContainerFactory(KafkaSettings settings) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory(settings));
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory(KafkaSettings settings) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, settings.getBootstrapServers());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, settings.getGroupId());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props);
  }
}

最後に、 @KafkaListener アノテーションを使ってConsumerのメインロジックを実装する。ここではListenするtopicの名前を指定しなければいけないが、 topics = “${kafka.topic}” のようにSpELを使うことでapplication.yamlのtopicの値を直接呼び出している。この部分では groupId を指定することもできるが、今回はgroupIdは@ConfigurationProperties を通じて既に指定したのでそちらが使われる。

@Component
public class KafkaConsumer {
  @KafkaListener(topics = "${kafka.topic}")
  public void consume(ConsumerRecord<String, String> record) {
    System.out.println("key: " + record.key() + ", value: " + record.value());
  }
}

メッセージの送信

bitnami/kafkaを使ってKafka brokerを立てる。以下のようにすると、Dockerネットワーク内部からの接続は localhost:9092 で、外部からの接続は localhost:9093 で受け付けられる。

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.7
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:2
    ports:
      - "9093:9093"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

kafka cliからtopicを作り、メッセージを送ってみる。

$ docker-compose up -d
$ docker-compose exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092
$ docker-compose exec kafka /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=,"
>hello,hi

Spring Boot側でログが出れば成功。

key: hello, value: hi
广告
将在 10 秒后关闭
bannerAds