はじめに
GxP の清田です。
イベント駆動型アーキテクチャ を実現するための技術の1つである、
Apach Kafka (以降 Kafka と記載) を動かして学んでみます。
素振りをしようと思った動機は、@yusuke_arclamp さんの『マイクロサービスの次に来るかもしれない言葉について』や、
社内イベント 『TechTalk「この20年の Agile, DevOps, Microservices の流れについて」』 で Kafka を耳にして気になっていたからです。
Microservice におけるシステム間連携の方法として、イベント駆動型アーキテクチャには以下の特徴があります。
-
- 同期的なAPIの連携でなく非同期の連携
-
- データを共有するのではなくイベントを配信する
- イベントの発行元は誰が受け取るかを気にしなくて良い(=疎結合)
これまでの開発経験で触れたことのないものだったので、一度動くアプリケーションを作って恐怖心をなくしておこうと思います。
対象読者
-
- イベント駆動型アーキテクチャに興味のある人
Kafka を 動かして概念の一部を理解したい人
Docker Compose でコンテナを起動して CLI で動かして学ぶ
Spring for Apache Kafka で簡単な producer, consumer を実装する
イベントを複数のシステムが受け取って、それぞれの解釈で処理する 実装のイメージを掴みたい人
上記のいずれかに該当する場合、楽しんでいただけるかも知れません。
記載しないこと
-
- 脱 Zookeeper を支える技術である Apache Kafka Raft (KRaft) について
-
- Kafka のクラウド環境でのデプロイ(Amazon Managed Streaming for Apache Kafka など)について
-
- イベントの配信の保証(At Least One や Exactly Once) について
-
- RabbitMQ などの message brokers との比較について
- Amazon Simple Queue Service や Cloud Pub/Sub などのパブリッククラウドで提供されているサービスとの使い分けについて
上記の話題については、学習中で知識を有していないため記載しないです。(いずれも気になる)
Kafkaとは
あたりの説明で日本語でざっと読めるものとしては、以下の記事が大変参考になりました。
この記事では概念の1つずつの紹介はしていないので、参照いただけますとイメージを掴みやすいと思います。
個人的には、動かして学ぶことでこの辺りの記載を実感を持って理解することができました。
Apache Kafka(以降、Kafka)はスケーラビリティに優れた分散メッセージキューです。
Kafkaは処理性能を重視したメッセージキューであり、複数台のマシンでクラスタを構成して分散処理を行うことで、高いスループットを発揮します。後からクラスタにマシンを追加することで、処理性能とデータ保持容量をスケールアウトすることもできます。また、Kafkaはクラスタ内でデータを複製するため、一部のマシンに障害が発生してもデータを失うことなく処理を継続できます。
※イメージを掴んだ上で、Confluent のドキュメントや Kafaka のドキュメント を確認しました
Docker Compose でコンテナを起動して CLI で動かして学ぶ
まずは低レイヤーから学んでいきます。
登場人物は以下の図のものです。
GitHub にサンプルを用意(KiyotaTakeshi/kafka-playground) しましたので手元で動かして確認ができます。
さくっと broker 3台でクラスターを作成します。
※今回は Zookeeper が必要な image(bitnami/kafka) を使用しています
$ git clone https://github.com/KiyotaTakeshi/kafka-playground.git
$ cd kafka-playground
$ docker compose up -d
$ docker compose ps
NAME COMMAND SERVICE STATUS PORTS
kafka-playground-kafka1-1 "/opt/bitnami/script…" kafka1 running 0.0.0.0:49760->9092/tcp
kafka-playground-kafka2-1 "/opt/bitnami/script…" kafka2 running 0.0.0.0:49761->9092/tcp
kafka-playground-kafka3-1 "/opt/bitnami/script…" kafka3 running 0.0.0.0:49762->9092/tcp
kafka-playground-zookeeper-1 "/opt/bitnami/script…" zookeeper running 0.0.0.0:2181->2181/tcp
任意の broker コンテナに入り、topic を作成します。
# どの broker に exec するかは任意(今回は kafka1)
$ docker compose exec kafka1 bash
# まずは replica は1でつくる
# partitions は4でつくる
$ /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
作成したものを確認します。
出力から以下のことがわかります。
-
- broker が3台に対して、 partition を4つに設定したため、1002(数字は実行によって異なります) の broker が2つの partiton の Leader になっている
-
- replica は1にしているので、各 partition は1台の broker のみが保持している
Isr(In-Sync Replica) が Leader と同じ broker のみ
$ /opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic
Topic: test-topic TopicId: sqlIpd0oSEC3d7PJC3c_Dg PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic Partition: 0 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: test-topic Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test-topic Partition: 2 Leader: 1003 Replicas: 1003 Isr: 1003
Topic: test-topic Partition: 3 Leader: 1002 Replicas: 1002 Isr: 1002
では、producer と consumer でメッセージのやり取りができることを確認します。
※関係する概念だけ記載した図です
シェルを2つ起動する必要があります。
# シェル1
# consumer の起動
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
# シェル2
# どの Broker に exec するかは任意
$ docker compose exec kafka1 bash
# producer を起動しメッセージを送る
$ /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>hello
>world
consumer 側でメッセージを受信できました。
# シェル1
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
hello
world
もう1つシェルを立ち上げて、 consumer を増やしてみます。
# シェル3
$ docker compose exec kafka1 bash
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
producer からメッセージを送ると、
# シェル2
$ /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>goodbye
2つの consumer がメッセージを受信できていることがわかります。
# シェル1
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
goodbye
# シェル3
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
goodbye
このことから、 同じ topic から2つの consumer がメッセージを受信できる ことがわかります。
イベント駆動型アーキテクチャにおける、「イベントを複数のシステムが受け取る」が実現 できていますね(まだ CLI 上ですが)。
2つの consumer が別の subscriber として振る舞ったのはなぜでしょうか?
理由は、別の consumer group だからです。
producer を停止して、確認してみます。
# シェル2
$ /opt/bitnami/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-32834
console-consumer-40587
では、2つの consumer を同じ consumer group で立ち上げます。
# シェル1
# console-consumer という consumer group で起動
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test-topic --group console-consumer
# シェル3
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test-topic --group console-consumer
この状態で、 producer からメッセージを送ると
# シェル2
$ /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>hello
>world
>goodbye
2分の1でそれぞれの consumer がメッセージを受信します。
# シェル1
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic test-topic --group console-consumer
goodbye
# シェル3
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic test-topic --group console-consumer
hello
world
つまり、 とある topic に対して同じ処理をする consumer のスケールアウト が実現できました(今回はただコンソールに出力されるだけですが)。
ここまでで
-
- 別の consumer group で起動して、同じ topic を複数の consumer で受け取る
イベントのシステムごとの解釈
同じ consumer group で起動して、topic を複数の同じ処理をする consumer 間で受け取る
同じ解釈をするシステムのスケールアウト
が確認できました。
では、次は broker 側のスケールアウトについて見ていきます。
以下の図のように、複数の replica を指定することで Topic の partition を複数の broker に持つことができます。
まず、コンテナを停止しホスト側にマウントしているデータも吹き飛ばします。
(先程作成した topic や consumer group 等がなくなります)
$ docker compose down && rm -rf .docker
コンテナを再起動し、コンテナに入ります。
$ docker compose up -d && docker compose ps
$ docker compose exec kafka1 bash
今度は、 replica 数を3(broker と同じ台数)で指定し、作成します。
# 作成
$ /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic test-topic-replicated \
> --bootstrap-server localhost:9092 --replication-factor 3 --partitions 4
作成したものを確認します。
出力から以下のことがわかります。
-
- broker が3台に対して、 partition を4つに設定したため、1002(数字は実行によって異なります) の broker が2つの partiton の Leader になっている
-
- replica を3にしているので、各 partition は3台の broker が保持している
Isr(In-Sync Replica) が3台
$ /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topic-replicated
Topic: test-topic-replicated TopicId: w7ITwLFVTZi3m2AKNAUYIg PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test-topic-replicated Partition: 0 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1001,1003
Topic: test-topic-replicated Partition: 1 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1001,1003,1002
Topic: test-topic-replicated Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1002,1001
Topic: test-topic-replicated Partition: 3 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1003,1001
では、 broker を停止してみましょう。
# 任意のコンテナを停止
$ docker compose stop kafka2
topic を確認してみると(コンテナ内部に入ってコマンド実行でもいいです)、出力から以下のことがわかります。
-
- 停止した broker が Leader でなくなっている
-
- 停止した broker が Isr(In-Sync Replica) に含まれなくなっている
- 動作している broker で partition の Leader を分け合っている
$ docker compose exec -T kafka1 /bin/bash <<EOF
/opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic-replicated
EOF
Topic: test-topic-replicated TopicId: w7ITwLFVTZi3m2AKNAUYIg PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test-topic-replicated Partition: 0 Leader: 1001 Replicas: 1002,1001,1003 Isr: 1001,1003
Topic: test-topic-replicated Partition: 1 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1001,1003
Topic: test-topic-replicated Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1001
Topic: test-topic-replicated Partition: 3 Leader: 1003 Replicas: 1002,1003,1001 Isr: 1003,1001
では、止めていた broker を起動します。
$ docker compose start kafka2
再度 topic を確認してみると以下のことがわかりました。
Isr(In-Sync Replica) に再度起動した broker が含まれている
再度起動した broker は partition の Leader には選出されていない
$ docker compose exec -T kafka1 /bin/bash <<EOF
/opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic-replicated
EOF
Topic: test-topic-replicated TopicId: w7ITwLFVTZi3m2AKNAUYIg PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test-topic-replicated Partition: 0 Leader: 1001 Replicas: 1002,1001,1003 Isr: 1001,1003,1002
Topic: test-topic-replicated Partition: 1 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1001,1003,1002
Topic: test-topic-replicated Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1001,1002
Topic: test-topic-replicated Partition: 3 Leader: 1003 Replicas: 1002,1003,1001 Isr: 1003,1001,1002
※ちなみに、この状態で別の broker (今回だと kafka3)を停止すると、
先程起動した broker が Leader に含まれる挙動になりました
これらの検証で、 broker の台数を増やし replica に設定することで、
producer, consumer とやりとりをする基盤としての broker の可用性を高めることができるとわかりました。
最後に broker が実体としてどのように topic を保存しているのかを確認します。
ホスト側にマウントしているデータを確認します。topic の partition ごとにディレクトリが切られているようです。
$ ls .docker/kafka1/data/test-topic-replicated-?
.docker/kafka1/data/test-topic-replicated-0:
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint partition.metadata
.docker/kafka1/data/test-topic-replicated-1:
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint partition.metadata
.docker/kafka1/data/test-topic-replicated-2:
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint partition.metadata
.docker/kafka1/data/test-topic-replicated-3:
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint partition.metadata
$ ls -ld .docker/kafka1/data/test-topic-replicated-*
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 16:03 .docker/kafka1/data/test-topic-replicated-0
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 16:03 .docker/kafka1/data/test-topic-replicated-1
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 15:33 .docker/kafka1/data/test-topic-replicated-2
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 15:33 .docker/kafka1/data/test-topic-replicated-3
replica を3で指定したため、それぞれの broker に同様のデータが存在しています。
$ ls -ld .docker/kafka2/data/test-topic-replicated-?
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 16:12 .docker/kafka2/data/test-topic-replicated-0
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 15:33 .docker/kafka2/data/test-topic-replicated-1
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 15:33 .docker/kafka2/data/test-topic-replicated-2
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 16:12 .docker/kafka2/data/test-topic-replicated-3
$ ls -ld .docker/kafka3/data/test-topic-replicated-?
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 15:33 .docker/kafka3/data/test-topic-replicated-0
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 15:33 .docker/kafka3/data/test-topic-replicated-1
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 16:03 .docker/kafka3/data/test-topic-replicated-2
drwxr-xr-x 7 t.kiyota AD\Domain Users 224 Dec 15 16:03 .docker/kafka3/data/test-topic-replicated-3
以上で CLI での検証は終わります。
コンテナ起動して CLI で動作検証することで Kafka の主要な概念についてイメージが膨らんだのではないでしょうか。
Spring for Apache Kafka で簡単な producer, consumer を実装する
次は Kafka とやりとりをするアプリケーションを開発し、イベント駆動型アーキテクチャの理解を深めていきます。
最終的に動作するものは、
GitHub のサンプル(KiyotaTakeshi/kafka-spring-sample)の Gif を見ていただくとイメージがしやすいと思います。
Qiita の容量制限を超えているのか Gif が貼れなかった…
図にすると以下のものです。
-
- order producer(Spring Boot) がWebリクエストを受け付けて、 topic(order-events) にイベントを発行
-
- order notification(Spring Boot) がイベントを受け取ってメールを送信
コンテナ起動している mailhog にて受信したメールを確認できる
order rdb(Sring Boot) がイベントを受け取って MySQL に保存
という構成です。
-
- producer 側のアプリケーションはイベントを発行するところまでしか気にしない
- consumer はイベントを受け取って各々の処理を行う
というイベント駆動型アーキテクチャをシンプルですが実現しました。
(他にもオブジェクトのまま NoSQL に保存する consumer があってもいいかも)
では order producer の実装を見ていきます。
topic の作成は Configuration クラスにより行っています。
@Configuration
public class AutoCreateConfig {
// if you create manual
// $ ./bin/kafka-topics.sh --create --topic order-events --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3
@Bean
public NewTopic orderEvents(){
return TopicBuilder.name("order-events")
.partitions(3)
.replicas(3)
.build();
}
}
controller がリクエストを受け付けて、
@RestController
@RequestMapping("/order-events")
@Slf4j
@AllArgsConstructor
public class OrderEventsController {
private OrderEventProducer orderEventProducer;
@PostMapping
public ResponseEntity<OrderEvent> postOrderEvent(@RequestBody @Valid OrderEvent orderEvent)
throws JsonProcessingException {
log.info("invoke kafka producer");
orderEvent.setOrderEventType(OrderEventType.NEW);
orderEventProducer.sendOrderEvent(orderEvent);
return ResponseEntity.status(HttpStatus.CREATED).body(orderEvent);
}
kafkaTemplate を使用して topic に対して非同期で message を送信します。
public ListenableFuture<SendResult<Integer, String>> sendOrderEvent(OrderEvent orderEvent) throws JsonProcessingException {
Integer key = orderEvent.getOrderEventId();
String value = objectMapper.writeValueAsString(orderEvent);
ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(TOPIC, key, value);
listenableFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
handleFailure(key, value, ex);
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
handleSuccess(key, value, result);
}
});
return listenableFuture;
}
イベントを配るだけなのでシンプルな実装になっています。
application.yaml は接続先の Kafka Cluster の情報やシリアライズ、デシリアライズの設定を記載します。
admin の設定は上記の topic の生成で必要なものです。
spring:
kafka:
template:
default-topic: "order-events"
producer:
bootstrap-servers:
- localhost:9092
- localhost:9093
- localhost:9094
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
admin:
properties:
bootstrap.servers: localhost:9092,localhost:9093,localhost:9094
次に order notification の実装を見ていきます。
こちらの application.yaml は consumer として接続先の Kafka Cluster の情報を記載しています。
また、メールの送信先の情報も記載しています。
(今回はコンテナ起動した mailhog を使用していますが、 mailtrap のようなクラウドサービスを使ってもいいかもです)
group-id: order-notification-events-group で指定してるのは consumer group です。
spring:
kafka:
consumer:
bootstrap-servers:
- localhost:9092
- localhost:9093
- localhost:9094
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: order-notification-events-group
mail:
# if you use mailtrap instead of mailhog
# host: smtp.mailtrap.io
host: localhost
port: 1025
# username: dummy
# password: dummy
protocol: smtp
@KafkaListener で topic を指定することで、 event を受け取って処理することができます。
@Component
@Slf4j
@AllArgsConstructor
public class OrderEventsConsumer {
private final OrderEventsService orderEventsService;
@KafkaListener(topics = {"order-events"})
public void onMessage(ConsumerRecord<Integer, String> consumerRecord) throws JsonProcessingException {
log.info("ConsumerRecord: {} ", consumerRecord);
orderEventsService.processOrderEvent(consumerRecord);
}
}
テンプレートエンジンである Thymeleaf で、
event として受け取ったオブジェクトを本文に含めた HTML 形式のメールを送信します。
private void sendMail(NotificationEmail notificationEmail) {
MimeMessagePreparator messagePreparer = mimeMessage -> {
MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, "UTF-8");
helper.setFrom("kafka-spring-sample@example.com");
helper.setTo(notificationEmail.getRecipient());
helper.setSubject(notificationEmail.getSubject());
helper.setText(build(notificationEmail.getBody()), true);
};
try {
mailSender.send(messagePreparer);
log.info("Email sent!!");
} catch (MailException e) {
log.error("Exception occurred when sending mail", e);
throw new RuntimeException("Exception occurred when sending mail to " + notificationEmail.getRecipient(), e);
}
}
最後に order rdb の実装を見ていきます。
application.yaml は order notification と同じ Kafka に関する設定に加えて DB 周りの設定を記載しています。
order notification とは別の consumer group の group-id: order-rdb-events-group が指定されていることがポイントです。
CLI で確認したように 同じ topic から consumer が別の処理をするため(同じイベントを複数のシステムが受け取るため)に、別のIDが指定 されています。
spring:
kafka:
consumer:
bootstrap-servers:
- localhost:9092
- localhost:9093
- localhost:9094
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: order-rdb-events-group
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/kafka-spring-sample?createDatabaseIfNotExist=true
username: root
password: 1qazxsw2
jpa:
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
format_sql: true
hibernate:
ddl-auto: update
# パラメータの値をログに表示 ex.) binding parameter [1] as [INTEGER] - [1]
# 2021-11-21 00:55:11.604 TRACE 11400 --- [ main] o.h.type.descriptor.sql.BasicBinder : binding parameter [1] as [BIGINT] - [1]
logging.level.org.hibernate.type.descriptor.sql.BasicBinder: trace
topic を指定してイベントを受け取る処理は同じです。
@Component
@Slf4j
@AllArgsConstructor
public class OrderEventsConsumer {
private final OrderEventsService orderEventsService;
@KafkaListener(topics = {"order-events"})
public void onMessage(ConsumerRecord<Integer, String> consumerRecord) throws JsonProcessingException {
log.info("ConsumerRecord: {} ", consumerRecord);
orderEventsService.processOrderEvent(consumerRecord);
}
}
あとは受け取ったイベントのモデルで MySQL に保存しています。
private void save(OrderEvent orderEvent) {
orderEvent.getOrder().setOrderEvent(orderEvent);
orderEventsRepository.save(orderEvent);
log.info("Successfully Persisted the Order Event {}", orderEvent);
}
では、 HTTP リクエストしてみましょう。
※サンプルコード(KiyotaTakeshi/kafka-spring-sample)に postman のコレクションを配置しています
もろもろ必要なコンテナを起動します。
$ docker compose up -d
$ docker compose ps
NAME COMMAND SERVICE STATUS PORTS
order-kafka1 "/opt/bitnami/script…" kafka1 running 0.0.0.0:9092->9092/tcp
order-kafka2 "/opt/bitnami/script…" kafka2 running 0.0.0.0:9093->9093/tcp
order-kafka3 "/opt/bitnami/script…" kafka3 running 0.0.0.0:9094->9094/tcp
order-mailhog "MailHog" mailhog running 0.0.0.0:1025->1025/tcp, 0.0.0.0:8025->8025/tcp
order-mysql "docker-entrypoint.s…" mysql running 0.0.0.0:3306->3306/tcp
order-zookeeper "/opt/bitnami/script…" zookeeper running 0.0.0.0:2181->2181/tcp
root project でビルドして executable jar を生成します。
IDE で起動しても問題ないです!
$ export JAVA_HOME=`/usr/libexec/java_home -v 11`
$ java -version
openjdk version "11.0.11" 2021-04-20 LTS
OpenJDK Runtime Environment Corretto-11.0.11.9.1 (build 11.0.11+9-LTS)
OpenJDK 64-Bit Server VM Corretto-11.0.11.9.1 (build 11.0.11+9-LTS, mixed mode)
$ ./gradlew clean build
$ ls -l order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar
$ ls -l order-producer/build/libs/order-producer-0.0.1-SNAPSHOT.jar
$ ls -l order-rdb-consumer/build/libs/order-rdb-consumer-0.0.1-SNAPSHOT.jar
topic を作成するため、 order producer から起動します。
$ java -jar order-producer/build/libs/order-producer-0.0.1-SNAPSHOT.jar
そして order notification, order rdb を起動すると準備完了です。
$ java -jar order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar
$ java -jar order-rdb-consumer/build/libs/order-rdb-consumer-0.0.1-SNAPSHOT.jar
HTTP リクエストをすると、
$ curl --location --request POST 'http://localhost:8080/order-events' \
--header 'Content-Type: application/json' \
--data-raw '{
"orderEventId": null,
"order": {
"id": 43333,
"name": "チョコモナカジャンボ",
"price": 145
}
}'
DB にレコードが保存され、
select * from orders;
/*
+-----+----------+-----+--------------+
|id |name |price|order_event_id|
+-----+----------+-----+--------------+
|43333|チョコモナカジャンボ|145 |1 |
+-----+----------+-----+--------------+
*/
チョコモナカジャンボ の購入を受け付けたメールが届きました。
イベント駆動型アーキテクチャで複数のアプリが各々の解釈でイベントを処理するイメージが掴めましたね。
では consumer を複数起動してみましょう。複数のシェルで executable jar を実行してみます。
そうすると興味深いログが出力されます。
order-notification-events-group: partitions assigned: [order-events-0] から、
consumer がどの partition と疎通するかがわかります。
# シェル1
$ java -jar order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar
2021-12-15 18:49:59.734 INFO 70648 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : order-notification-events-group: partitions assigned: [order-events-2]
# シェル2
$ java -jar order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar
2021-12-15 18:49:59.732 INFO 70918 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : order-notification-events-group: partitions assigned: [order-events-1]
# シェル3
$ java -jar order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar
2021-12-15 18:49:59.740 INFO 71159 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : order-notification-events-group: partitions assigned: [order-events-0]
この状態でリクエストしてみても、consumer group が同じ consumer を複数起動しているだけなので、
event は1回しか処理されず、バニラモナカジャンボの購入を受け付けたメールも一度しか送信しません。
2021-12-15 18:53:44.635 INFO 70918 --- [ntainer#0-0-C-1] c.k.o.service.OrderEventsService : orderEvent: OrderEvent(orderEventId=null, orderEventType=NEW, order=Order(id=41111, name=バニラモナカジャンボ, price=145))
2021-12-15 18:53:45.087 INFO 70918 --- [ntainer#0-0-C-1] c.k.o.service.OrderEventsService : Email sent!!
consumer が1台ダウンしてしまった仮定して、プロセスを1つ止めてみます。
すると、 (Re-)joining group が走って、起動している consumer で partition が割り振られます。
# シェル1
2021-12-15 18:49:17.045 INFO 70648 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-order-notification-events-group-1, groupId=order-notification-events-group] (Re-)joining group
2021-12-15 18:57:54.080 INFO 70648 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : order-notification-events-group: partitions assigned: [order-events-2]
# シェル2
2021-12-15 18:57:54.078 INFO 71159 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : order-notification-events-group: partitions assigned: [order-events-1, order-events-0]
consumer として実装するアプリケーションを水平にスケールすることで、可用性を高める事ができます。
(それぞれの partition に均等にイベントが配信されるのであれば、 partition の数まで consumer を増やせば処理の並列度も上がりシステム全体のスループットも向上すると思います)
最後に
気になっていたイベント駆動アーキテクチャについて、動くものを作ったことで理解が進みました。(知らないものに対する恐怖心が和らぎました)
一方で、今回のメール送信にあたる「通知サービス」のように、
サービスを分割する粒度や設計方法については学習と経験を積む必要があると感じました。
また、 Kafka は broker, consumer などの各コンポーネントのスケーラビリティが高いことがわかりましたが、
イベント駆動アーキテクチャを実現するための技術として、ユースケースに合わせて適切な技術を選んでいくことも必要だと感じました。
参考
-
- Kafka: The Definitive Guide
-
- KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum
-
- 「Kafka Summit 2020」開催!ストレージ階層化,ZooKeeperフリー,クラウドネイティブ ―次の10年に向けて進化を続けるKafkaのいま
-
- Exactly-Once Semantics Are Possible: Here’s How Kafka Does It
-
- Apache Kafka for Developers using Spring Boot[LatestEdition]
-
- Spring for Apache Kafka 公式ドキュメント
-
- spring.pleiades.io による Spring for Apache Kafka の日本語ドキュメント
- Using ThymeLeaf and FreeMarker Emails Templates with Spring
ソースコード
-
- KiyotaTakeshi/kafka-playground
- KiyotaTakeshi/kafka-spring-sample