はじめに

Kafkaは要求・応答型のメッセージングには向かないと思っていましたが、Springで要求・応答用のTemplate(ReplyingKafkaTemplate)が提供されていたのでどんなものか動かしてみました。

ReplyingKafkaTemplate を使用する

前提

    • 上記リンクにあるサンプル・コードを基本、そのまま使っています。

 

    • Java 17

 

    Apache Kafka 3.4.0

POM

Kafka周りで以下を設定しています。(他は省略)

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.0</version>
  </parent>


  <dependencies>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
	  <groupId>org.springframework.kafka</groupId>
	  <artifactId>spring-kafka</artifactId>
      <version>3.1.0</version>
	</dependency>

  </dependencies>

Requester

要求メッセージを送信して、その応答メッセージを受信するプログラムは以下になります。

package com.example;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;

@SpringBootApplication
public class App 
{
    public static void main( String[] args )
    {
        System.out.println( "Requester!" );

        SpringApplication.run(App.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "test request");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(30, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }
}

このコードでは、kRequestトピックに要求メッセージ(valueは”test request”)を送信して、kRepliesトピックから応答メッセージを受信するようになっています。

Responder

要求メッセージを受信して、応答メッセージを返信するプログラムは以下になります。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;

@SpringBootApplication
public class App 
{
    public static void main( String[] args )
    {
        System.out.println( "Responder!" );

        SpringApplication.run(App.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }
}

このコードでは、kRequestsトピックをListenし、メッセージを受信したら、そのデータを大文字に変換して、応答メッセージとして返信しています。

動作確認

では、動かしてみます。
リンク先のサンプル・コードではプログラム内でトピックを作成していましたが、今回その部分は省いたので、事前にkRequestsトピックとkRepliesトピックは作成しておきます。

最初に、Responderを起動して、Requesterを起動します。

Requesterのコンソールには以下が出力されました。

Sent ok: kRequests-0@0
Return value: TEST REQUEST

要求メッセージの送信が成功し、応答メッセージが返ってきたことがわかります。
要求メッセージのvalue”test request”が、応答メッセージでは大文字に変換されています。

Responderのコンソールには以下が出力されています。

Server received: test request

別のプログラム(自作)を使って、要求メッセージと応答メッセージの中身を見てみます。

要求メッセージ(3件分)

Ts: 1701650039532, FmdTs: 2023-12-04 09:33:59.532, Topic: kRequests, Partition: 0, Offset: 0, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=�s�^ǡI���$"�� value(byte)=8673b95ec7a149f18ac4fb24221994f2
Ts: 1701650844371, FmdTs: 2023-12-04 09:47:24.371, Topic: kRequests, Partition: 2, Offset: 0, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=s�u�RAE��qvOO��U value(byte)=73a775f0524145f49071764f4fedd855
Ts: 1701650914858, FmdTs: 2023-12-04 09:48:34.858, Topic: kRequests, Partition: 1, Offset: 1, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=�BʞH�Mӭo�gc2M� value(byte)=b842ca9e48de4dd3ad6fb86763324da7

要求メッセージには以下の2つのヘッダー(key=value)がセットされています。

    • kafka_replyTopic=kReplies

 

    kafka_correlationId=8673b95ec7a149f18ac4fb24221994f2(バイナリデータのHex表記)

応答メッセージ(3件分)

Ts: 1701650039646, FmdTs: 2023-12-04 09:33:59.646, Topic: kReplies, Partition: 2, Offset: 0, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=�s�^ǡI���$"�� value(byte)=8673b95ec7a149f18ac4fb24221994f2
Ts: 1701650844407, FmdTs: 2023-12-04 09:47:24.407, Topic: kReplies, Partition: 2, Offset: 1, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=s�u�RAE��qvOO��U value(byte)=73a775f0524145f49071764f4fedd855
Ts: 1701650914886, FmdTs: 2023-12-04 09:48:34.886, Topic: kReplies, Partition: 2, Offset: 2, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=�BʞH�Mӭo�gc2M� value(byte)=b842ca9e48de4dd3ad6fb86763324da7

応答メッセージには以下のヘッダーがセットされています。

    kafka_correlationId=8673b95ec7a149f18ac4fb24221994f2(バイナリデータのHex表記)

仕組み

ReplyingKafkaTemplateは、kafka_correlationIdヘッダーを利用して要求メッセージと応答メッセージの紐付けを行なっています。

Requester側では、ReplyingKafkaTemplateのsendAndReceiveメソッドで要求メッセージを送信すると、内部的に要求メッセージのkafka_correlationIdヘッダーにユニークな値がセットされます。応答メッセージの受信処理ではkafka_correlationIdヘッダーに同じ値を持つ応答メッセージを受信します。
setCorrelationHeaderName(String replyTopicHeaderName)やsetBinaryCorrelation(boolean binaryCorrelation)を利用して、ヘッダー名やヘッダーに付与する値(デフォルトはバイナリ・データ)を変更することができます。

image.png

Responder側では、@SendTo アノテーション(プロパティなし)を使うことで受信した要求メッセージのkafka_correlationIdヘッダーをそのまま応答メッセージにも付加し、kafka_replyTopicヘッダーに指定されているトピックに応答メッセージを返信します。
Forwarding Listener Results using @SendTo

まとめ

SpringのReplyingKafkaTemplateを使うことでKafkaでも簡単に要求・応答型のメッセージングができることがわかりました。
Kafkaで要求・応答を実装する意味は?ってのはありますが、とりあえず今回は動かしてみた報告でした。他にもいろいろできることがありそうなので、時間があれば試してみたいと思います。

广告
将在 10 秒后关闭
bannerAds