概要
マイクロサービス化案件に携わっており、Debezium を触ったので、明日のためにメモしときます。
登場人物
Kafka
オープンソースの分散メッセージングシステム。メッセージキューって言ってよいのかな。
メッセージキューはマイクロサービスにおけるバックエンドの要。
Kafka Connect
Kafka と他のシステム間でデータをスケーラブルかつ、ストリーミングするためのソフトウェア。
Kafka との接続方法やリバランスなど、手間のかかる問題を処理してくれる。
Debezium
データの変更をキャプチャするためのオープンソースの分散プラットフォーム。
データソース (DBなど) の変更を監視し、変更内容を Kafka Topics に格納する。
Kafka 上に構築されている。
KafKa Connect が使用されている。
トランザクションログを監視している。
ZooKeeper
大規模分散システムでよく利用される、設定情報の集中管理や名前付けなどのサービスを提供するソフトウェア。
kafka の実行に必要なようで。
手順
最初は Tutorial に沿ってやっていきます。
1. ZooKeeperの起動
$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
2. Kafkaの起動
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8
3. MySQLの起動
サンプルスキーマが含まれているイメージです。
$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.8
4. MySQLコマンドラインクライアントの起動
まだ使わないけど、起動しておきます。
$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
5. KafkaConnectを起動
Kafka と MySQL のデータ連携に必要です。
API で操作可能なので curl 使います。
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8
DebeziumMySQLコネクタを登録します。
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
6. 登録されたメッセージを確認する
Kafka 実行中のコンテナに入る。
$ docker exec -it kafka bash
以下のコマンドを使用すると、Kafka に入ってきたメッセージの内容をリアルタイムに確認できます。
[kafka@851e7f6e1d62 ~]$ bin/kafka-console-consumer.sh --topic dbserver1.inventory.customers --bootstrap-server kafka:9092
[–topic dbserver1.inventory.customers]
以下のコマンドで確認。
$ curl -i -X GET -H “Accept:application/json” localhost:8083/connectors/inventory-connector/topics
[–from-beginning]
今回使用していないが、このパラメーターを使用することで、全メッセージを確認できる。
7. データに変更を加える
MySQLコマンドラインクライアントで
use inventory;
UPDATE customers SET first_name='Anne123' WHERE id=1004;
customers テーブルへ変更を加えると、kafka-console-consumer.sh の画面に変更内容のメッセージキューが入ってくることが確認できる。
{
"schema": {
"type": "struct",
"fields": [
{
// 省略...
}
},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne", // ★変更前
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne123", // ★変更後
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.8.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1640265034000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 1599,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1640265034477,
"transaction": null
}
}
続き
明日は、上記の環境を windows PC で構築し、メッセージキューに入ってきた内容で lambda を叩いてみます。
よく使う環境なら docker-compose にするのが良いんだろうな…
2021-12-24 追記
Windows でうまくコマンドを実行できないときは PowerShell を使うと良い。
Windows には Curl は別途インストールが必要。(または Postman などの API Client)