はじめに

Apache kafkaはオープンソースの分散メッセージングシステムで、大量のメッセージを高速にpublish/subscribeすることができます。
ここでは、統合フレームワークであるApache Camelを用いてkafkaとメッセージをやりとりするアプリケーションを作成します。
Apache Camelでkafkaに接続するためにはcamel-kafkaコンポーネントを使用します。

今回の環境では、kafkaはv2.0.0を使用します。環境の構築方法は以下の記事を参照してください。

    Apacke kafka v2.0.0がリリースされていたのでインストールしてみる

また、以前に次の記事で同様のアプリケーションをXML DSLで書いていますが、今回はJava DSLを用いてアプリケーションを作成してみます。また、前回よりも詳細に書いてみたいと思います。

    Apache CamelフレームワークのKafkaコンポーネントを使用して簡単なメッセージの送受信

作成するアプリケーションの説明

今回作成するアプリケーションは、下図のように1秒ごとに日時のメッセージをkafkaへpublishし、kafkaから同メッセージをsubscribeするものです。

image.png

Apache Camelを利用すれば、このようなkafkaを用いたアプリケーションが以下の数十行のコードだけで実装できます。main関数に全てコードを入れているので、綺麗なコードではありませんが、Camelにより簡単に実装できることが分かってもらえるかと思います。

    public static void main(String[] args) {
        try {
            CamelContext context = new DefaultCamelContext();
            KafkaComponent kafka = new KafkaComponent();
            kafka.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // kafka1, kafka2, kafka3の3サーバをブローカとして登録
            context.addComponent("kafka", kafka);

            context.addRoutes(new RouteBuilder() {
                public void configure() {
                    from("timer:trigger?period=1000") // 1000ミリ秒毎に実行
                            .routeId("kafka_producer_route")
                            .setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
                            .to("kafka:test01"); // トピックtest01にメッセージをpublish

                    from("kafka:test01") // トピックtest01からメッセージをsubscribe
                            .routeId("kafka_consumer_route")
                            .log("body = ${body}"); // メッセージのBODYの内容をログに表示
                }
            });

            context.start();
            Thread.sleep(10000);
            context.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

それでは、kafkaのアプリケーションを作成してみましょう。

kafkaを使用するためのライブラリを追加する

Mavenの場合は、pom.xmlに以下のライブラリを追加します。
camel-kafkaがCamelでKafkaを扱うためのコンポーネントで、${camel.version}には使用しているcamelのバージョンを指定します。
kafka-clientsはKafka用のクライアントライブラリです。

        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-kafka</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

コンポーネントのURI

camel-kafkaコンポーネントを使用するためのURIは以下のようになります。

kafka:topic[?options]

コンテキストパス(topic)には対象のトピック名を指定します。

メッセージをpublish/subscribeするrouteを作成する

kafkaへメッセージをpublishするrouteを作成します。
Java DSLで書いたrouteは以下のようになります。

トピックは自動で作成されますが、トピックの設定を変えたい場合は手動で作成しておきます。
コードは簡単すぎるので特に説明することはありませんね。

                from("timer:trigger?period=1000") // 1000ミリ秒毎に実行
                        .routeId("kafka_producer_route")
                        .setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
                        .to("kafka:test01"); // トピックtest01にメッセージをpublish

次にkafkaへメッセージをsubscribeするrouteを作成します。
Java DSLで書いたrouteは以下のようになります。

                from("kafka:test01") // トピックtest01からメッセージをsubscribe
                        .routeId("kafka_consumer_route")
                        .log("body = ${body}"); // メッセージのBODYの内容をログに表示

これだけではkafkaへ接続できないので、接続するための設定を書きます。
まず最初にkafkaの設定を登録するKafkaConfigurationのインスタンスを生成し、kafkaへ接続するための設定を行います。様々な設定項目があり、後で簡単に説明したいと思います。
以下の例ではいくつかの設定を行っていますが、接続するだけであれば、setBrokersメソッドで接続先のブローカを設定するだけです。それ以外はデフォルトの値が使用されるため設定しなくても構いません。

        KafkaConfiguration kafkaConfig = new KafkaConfiguration();
        kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // 3台のkafkaブローカへ接続
        kafkaConfig.setGroupId("group1"); // consumerのグループIDを設定
        kafkaConfig.setAutoCommitEnable(true); // 自動コミットをオンに設定
        kafkaConfig.setAutoCommitIntervalMs(5000); // 自動コミットのインターバルを設定
        kafkaConfig.setAutoOffsetReset("earliest"); // コミットされたオフセットのないパーティションのsubscribeした場合の動作
        kafkaConfig.setRequestRequiredAcks("all"); // publishが成功したと判断する条件
        kafkaConfig.setConsumersCount(1); // consumerの数

次にKafkaComponentのインスタンスを生成し、先ほど作成したKafkaConfigurationのインスタンスをセットします。

        KafkaComponent kafka = new KafkaComponent();
        kafka.setConfiguration(kafkaConfig);

kafkaへ接続するための設定はこれだけです。

これでpublisherとconsumerのrouteの作成とkafkaの接続設定が終わりました。
これらを使用するmain関数などを作成した全体のソースは以下になります。

package example.camelbegginer.kafka;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.impl.DefaultCamelContext;

public class KafkaExample {

    public static void main(String[] args) {
        try {
            CamelContext context = new DefaultCamelContext();
            context.addComponent("kafka", createKafkaComponent());
            context.addRoutes(createProducerRouteBuilder());
            context.addRoutes(createConsumerRouteBuilder());

            context.start();
            Thread.sleep(10000);
            context.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static KafkaComponent createKafkaComponent() {
        KafkaConfiguration kafkaConfig = new KafkaConfiguration();
        kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // 3台のkafkaブローカへ接続
        kafkaConfig.setGroupId("group1"); // グループIDを設定
        kafkaConfig.setAutoCommitEnable(true); // 自動コミットをオンに設定
        kafkaConfig.setAutoCommitIntervalMs(5000); // 自動コミットのインターバルを設定
        kafkaConfig.setAutoOffsetReset("earliest"); // コミットされたオフセットのないパーティションの読み出した場合の動作
        kafkaConfig.setRequestRequiredAcks("all"); // publishが成功したと判断する条件
        kafkaConfig.setConsumersCount(1); // consumerの数

        KafkaComponent kafka = new KafkaComponent();
        kafka.setConfiguration(kafkaConfig);

        return kafka;
    }

    static RouteBuilder createProducerRouteBuilder() {
        return new RouteBuilder() {
            public void configure() throws Exception {
                from("timer:trigger?period=1000") // 1000ミリ秒毎に実行
                        .routeId("kafka_producer_route")
                        .setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
                        .to("kafka:test01"); // トピックtest01にメッセージをpublish
            }
        };
    }

    static RouteBuilder createConsumerRouteBuilder() {
        return new RouteBuilder() {
            public void configure() throws Exception {
                from("kafka:test01") // トピックtest01からメッセージをsubscribe
                        .routeId("kafka_consumer_route")
                        .log("body = ${body}"); // メッセージのBODYの内容をログに表示
            }
        };
    }
}

作成したCamelアプリケーションを実行する

このアプリケーションを実行すると、以下のように1秒おきのログが出力されます。

[2019-01-31 21:15:38.112], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:38
[2019-01-31 21:15:39.031], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:39
[2019-01-31 21:15:40.034], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:40
[2019-01-31 21:15:41.026], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:41
[2019-01-31 21:15:42.029], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:42
[2019-01-31 21:15:43.024], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:43
[2019-01-31 21:15:44.044], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:44
[2019-01-31 21:15:45.028], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:45
[2019-01-31 21:15:46.032], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:46

また、以下のコードでヘッダーを出力すると、メッセージのオフセットやパーティションの情報がログに出力されます。

                from("kafka:test01") // トピックtest01からメッセージをsubscribe
                        .routeId("kafka_consumer_route")
                        .log("body = ${body}") // メッセージのBODYの内容をログに表示
                        .log("${headers}"); // ★ここを追加

出力されるログは以下のようになります。

2019-02-01 10:10:46.236], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, {breadcrumbId=[B@58a2fc46, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = breadcrumbId, value = [73, 68, 45, 109, 107, 121, 45, 80, 67, 45, 49, 53, 52, 56, 57, 56, 51, 52, 52, 51, 56, 55, 51, 45, 48, 45, 49])], isReadOnly = false), kafka.OFFSET=52, kafka.PARTITION=0, kafka.TIMESTAMP=1548983446195, kafka.TOPIC=test01}

このように、Apache CamelフレームワークのKafkaコンポーネントを使用して簡単なメッセージの送受信するアプリケーションを作成できました。Camelを使用することで、簡単にkafkaと接続することが理解いただけたでしょうか。

camel-kafkaコンポーネントの主なプロパティ

最後にcamel-kafkaコンポーネントの主なプロパティについて説明します。
プロパティは以下の表以外にもたくさんあり、詳細は公式ページを参照してください。

    Kafka Component(旧公式サイト)

プロパティはたくさんありますが、いくつかはkafka clientで使用するものと同じですので、違和感なく使えると思います。

プロパティ名producer/consumer説明デフォルト値型brokers共通Kafkaブローカーを指定する。フォーマットはhost1:port1で、複数ブローカを指定する場合は、host1:port1,host2:port2とカンマ区切りとする。
StringclientId共通クライアントIDは、クライアントのアプリケーションからの呼び出しをトレースするためのユーザー指定の文字列です。
StringautoCommitEnableconsumertrueに設定すると、consumerが取得されているメッセージのオフセットを定期的に自動でコミットします。自動コミットのインターバルはautoCommitIntervalMsオプションで指定します。TRUEBooleanautoCommitIntervalMsconsumerconsumerがメッセージのオフセットが自動コミットするインターバル(ミリ秒)を指定する。5000IntegerautoCommitOnStopconsumerコンシューマが停止する際に、ブローカから最後にsubscribeされたメッセージを明示的に自動コミットするかどうかを指定する。これにはautoCommitEnableオプションがオンになっている必要があります。 設定可能な値は、sync、async、またはnoneです。syncStringautoOffsetResetconsumer初期オフセットがない場合、またはオフセットが範囲外の場合は、次の手順を実行します。earliest:オフセットを最も最初のオフセットにリセットします。latest:自動的にオフセットを最新(最後)のオフセットにリセットします。fail:consumerに例外をスローします。latestStringconsumerRequestTimeoutMsconsumer構成は、クライアントが要求の応答を待つ最大時間を制御します。 タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じて要求を再送信するか、または再試行が尽きると要求を失敗させます。40000IntegerconsumersCountconsumerkafkaサーバーに接続するンシューマーの数を指定する。1intconsumerStreamsconsumer同時コンシューマ数。10intgroupIdconsumerこのconsumerが属するグループを識別する文字列を指定します。同じグループIDを設定することによって、複数のconsumerがすべて同じコンシューマグループであることを示します。Consumerでは必須のオプションです。
StringmaxPollRecordsconsumerpoll()の1回の呼び出しで返されるレコードの最大数を指定する。500IntegerpollTimeoutMsconsumerKafkaConsumerをポーリングするときに使用されるタイムアウト値(ミリ秒)。5000LongcompressionCodecproducerこのパラメータを使用すると、producerによって生成されたデータの圧縮コーデックを指定できます。指定できる値はnone、gzip、およびsnappyです。デフォルトでは圧縮されません(none)。none
requestRequiredAcksproducerproducerからのリクエストが完了したと見なす条件を設定する。メッセージの信頼性のレベルを指定することになります。指定する値は以下の3つがあります。acks=0ですと、リーダーのブローカがダウンするとメッセージをロストしますので、通常はacks=1、またはallを指定します。・acks = 0producerはサーバーからの確認応答を待たずに送信を完了したとみなします。この場合、kafkaがメッセージを受信したことを保証することはできません。各レコードに返されるオフセットは常に-1に設定されます。・acks = 1リーダーがそのメッセージをローカルに書き込むが、他のフォロワーからの完了を待たずに応答します。この場合、メッセージを承認した直後にリーダーにダウンが発生した場合、他のフォロワーがそれをレプリケーションできずにメッセージが失われます。 ・acks = allリーダーが応答を返す前に、他の同期レプリカの書き込みがすべて完了するまで待機します。これにより、少なくとも1つの同期レプリカが存続している限り、メッセージが失われないことが保証されます。1StringrequestTimeoutMsproducerクライアント(producer)にエラーを返す前に、ブローカーがrequest.required.acksの要件を満たそうまでに待機する時間。305000Integerretriesproducer0より大きい値を設定すると、クライアントは一時的なエラーで送信に失敗したレコードを再送信(リトライ)します。0Integer

参考

Apache CamelフレームワークのKafkaコンポーネントを使用して簡単なメッセージの送受信(昔の記事)

Kafka Component(公式サイト)
Kafka Component(旧公式サイト)
Camel Kafka example

广告
将在 10 秒后关闭
bannerAds