目次

    1. 前回までのあらすじ

 

    1. 記事の趣旨

 

    1. 前提環境

 

    1. メッセージング基盤を構築する(Kafka編)

 

    1. Sink,Source,Processorを作る(Kafka編)

 

    1. 動作確認

 

    まとめ

前回までのあらすじ

メッセージ駆動システムの設計実装について、第一歩を踏み出しました。
SpringBootではじめるメッセージ連携

記事の趣旨

二歩目を踏み出しましょう。

お手軽に、メッセージ駆動の恩恵を受けられるようになった一方で
RabbitMQはクラスタ構成が組めないので
スケール上限や、ノードレベル耐障害性の確保などはApache Kafkaに見劣りする様子。

せっかくなのでApache Kafkaにも触れてみましょう。

完成したコードはこちら。
記事では解説しないがテストクラス付き。

前提環境

    • Spring Tool Suiteが導入されていること。

 

    JRE 8以上が導入されていること。

Dockerは今回は使いません。
その他は、前回と同様のためバージョン情報は割愛。

メッセージング基盤を構築する(Kafka編)

前述のとおりApache Kafkaを採用する。

Apache Kafkaの公式サイトにquickstartが掲載されているので
この手順に則って導入します。

まずはApache Kafka配布ページよりHTTPリンクを入手。

アクセスするたびにsuggest対象のURLが変わるページ構成。
特に理由なければ「We suggest~」に表示されるURLをメモすること。負荷分散に協力しましょう。

スクリーンショット 2019-12-07 22.34.10.png

次に、ダウンロードしたファイルを展開して下位フォルダに移動。

# wget [kafka.tgzのURL]
# cd [解凍後フォルダ]

起動コマンドを投入します。まずはZooKeeper。

# bin/zookeeper-server-start.sh config/zookeeper.properties

次にKafka本体。

# bin/kafka-server-start.sh config/server.properties

少し、動作確認しましょう。
testという名前のtopic(メッセージの送信先)を作成。

# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

topic一覧を表示。topicが出来たことを確認します。

# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

これまでの手順により、Kafkaでメッセージを処理する準備が整いました。

Sink,Source,Processorを作る(Kafka編)

前回と同様にSpring Cloud Streamを用いて
SinkとSourceおよびProcessorを作りますが…
実装コードは1行も変更なし。単体テストも同一ソースです。

前回のコードはこちら。
今回のコードはこちら。

唯一、pom.xmlのみ修正しています。
RabbitMQ関連を削除し、Apache Kafka関連を追加。
完成したpom.xmlを前回と比較いただければ判るので、仔細は割愛。

最も重要な変更は以下。ガイドはこちら。

(中略)
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
(中略)

アプリには特段、プロパティ値を設定していないので
デフォルトに基づいてkafkaへの接続先情報(localhost:9092)や各種パラメータが設定されることがポイント。

動作確認

それぞれのプロジェクトについてmaven clean packageを実行。
target配下に生成された各Jarを以下のとおり起動する。

$ java -jar target/hello-source-kafka-0.0.1-SNAPSHOT.jar --server.port=8080
$ java -jar target/hello-sink-kafka-0.0.1-SNAPSHOT.jar  --server.port=8082
$ java -jar target/hello-processor-kafka-0.0.1-SNAPSHOT.jar --server.port=8090

起動ログ中に、Kafkaを用いていることが表示される。
以下はSinkのログ出力例。

2019-12-07 12:45:28.595  INFO 4986 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: hello-sink
2019-12-07 12:45:28.600  INFO 4986 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
(中略)
    ssl.truststore.type = JKS
2019-12-07 12:45:28.707  INFO 4986 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2019-12-07 12:45:28.709  INFO 4986 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2019-12-07 12:45:28.710  INFO 4986 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1575722728705

curlで、Sourceに対してPOSTリクエストを送る。

$ curl -v localhost:8080 -d '{"tweet":"Hello"}' -H 'Content-Type: application/json'

SinkのJarを起動したコンソールに、メッセージが出力される。

Received Hello processing!

まとめ

SpringBootならびにApache Kafka、Spring Cloud Streamを用いて
マイクロサービスなメッセージ連携の設計/実装方法を学んだ。

Spring Cloud Streamを用いることで
メッセージング基盤への依存性を、アプリ設計や実装から取り除くことが可能。

是非それぞれのメッセージング基盤を沢山、取っ替え引っ替えして遊んで頂きたい。

参考文献

Spring Cloud Stream Kafka Binder Reference Guide

Apache Kafka QuickStart

Apache Kafkaを試してみる

广告
将在 10 秒后关闭
bannerAds