目次
-
- 前回までのあらすじ
-
- 記事の趣旨
-
- 前提環境
-
- メッセージング基盤を構築する(Kafka編)
-
- Sink,Source,Processorを作る(Kafka編)
-
- 動作確認
- まとめ
前回までのあらすじ
メッセージ駆動システムの設計実装について、第一歩を踏み出しました。
SpringBootではじめるメッセージ連携
記事の趣旨
二歩目を踏み出しましょう。
お手軽に、メッセージ駆動の恩恵を受けられるようになった一方で
RabbitMQはクラスタ構成が組めないので
スケール上限や、ノードレベル耐障害性の確保などはApache Kafkaに見劣りする様子。
せっかくなのでApache Kafkaにも触れてみましょう。
完成したコードはこちら。
記事では解説しないがテストクラス付き。
前提環境
-
- Spring Tool Suiteが導入されていること。
- JRE 8以上が導入されていること。
Dockerは今回は使いません。
その他は、前回と同様のためバージョン情報は割愛。
メッセージング基盤を構築する(Kafka編)
前述のとおりApache Kafkaを採用する。
Apache Kafkaの公式サイトにquickstartが掲載されているので
この手順に則って導入します。
まずはApache Kafka配布ページよりHTTPリンクを入手。
アクセスするたびにsuggest対象のURLが変わるページ構成。
特に理由なければ「We suggest~」に表示されるURLをメモすること。負荷分散に協力しましょう。
次に、ダウンロードしたファイルを展開して下位フォルダに移動。
# wget [kafka.tgzのURL]
# cd [解凍後フォルダ]
起動コマンドを投入します。まずはZooKeeper。
# bin/zookeeper-server-start.sh config/zookeeper.properties
次にKafka本体。
# bin/kafka-server-start.sh config/server.properties
少し、動作確認しましょう。
testという名前のtopic(メッセージの送信先)を作成。
# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
topic一覧を表示。topicが出来たことを確認します。
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test
これまでの手順により、Kafkaでメッセージを処理する準備が整いました。
Sink,Source,Processorを作る(Kafka編)
前回と同様にSpring Cloud Streamを用いて
SinkとSourceおよびProcessorを作りますが…
実装コードは1行も変更なし。単体テストも同一ソースです。
前回のコードはこちら。
今回のコードはこちら。
唯一、pom.xmlのみ修正しています。
RabbitMQ関連を削除し、Apache Kafka関連を追加。
完成したpom.xmlを前回と比較いただければ判るので、仔細は割愛。
最も重要な変更は以下。ガイドはこちら。
(中略)
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
(中略)
アプリには特段、プロパティ値を設定していないので
デフォルトに基づいてkafkaへの接続先情報(localhost:9092)や各種パラメータが設定されることがポイント。
動作確認
それぞれのプロジェクトについてmaven clean packageを実行。
target配下に生成された各Jarを以下のとおり起動する。
$ java -jar target/hello-source-kafka-0.0.1-SNAPSHOT.jar --server.port=8080
$ java -jar target/hello-sink-kafka-0.0.1-SNAPSHOT.jar --server.port=8082
$ java -jar target/hello-processor-kafka-0.0.1-SNAPSHOT.jar --server.port=8090
起動ログ中に、Kafkaを用いていることが表示される。
以下はSinkのログ出力例。
2019-12-07 12:45:28.595 INFO 4986 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: hello-sink
2019-12-07 12:45:28.600 INFO 4986 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
(中略)
ssl.truststore.type = JKS
2019-12-07 12:45:28.707 INFO 4986 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2019-12-07 12:45:28.709 INFO 4986 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2019-12-07 12:45:28.710 INFO 4986 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1575722728705
curlで、Sourceに対してPOSTリクエストを送る。
$ curl -v localhost:8080 -d '{"tweet":"Hello"}' -H 'Content-Type: application/json'
SinkのJarを起動したコンソールに、メッセージが出力される。
Received Hello processing!
まとめ
SpringBootならびにApache Kafka、Spring Cloud Streamを用いて
マイクロサービスなメッセージ連携の設計/実装方法を学んだ。
Spring Cloud Streamを用いることで
メッセージング基盤への依存性を、アプリ設計や実装から取り除くことが可能。
是非それぞれのメッセージング基盤を沢山、取っ替え引っ替えして遊んで頂きたい。
参考文献
Spring Cloud Stream Kafka Binder Reference Guide
Apache Kafka QuickStart
Apache Kafkaを試してみる