概要

マイクロサービス化案件に携わっており、Debezium を触ったので、明日のためにメモしときます。

登場人物

Kafka

オープンソースの分散メッセージングシステム。メッセージキューって言ってよいのかな。
メッセージキューはマイクロサービスにおけるバックエンドの要。

Kafka Connect

Kafka と他のシステム間でデータをスケーラブルかつ、ストリーミングするためのソフトウェア。
Kafka との接続方法やリバランスなど、手間のかかる問題を処理してくれる。

Debezium

データの変更をキャプチャするためのオープンソースの分散プラットフォーム。
データソース (DBなど) の変更を監視し、変更内容を Kafka Topics に格納する。
Kafka 上に構築されている。
KafKa Connect が使用されている。
トランザクションログを監視している。

ZooKeeper

大規模分散システムでよく利用される、設定情報の集中管理や名前付けなどのサービスを提供するソフトウェア。
kafka の実行に必要なようで。

undefined

手順

最初は Tutorial に沿ってやっていきます。

1. ZooKeeperの起動

$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8

2. Kafkaの起動

$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8

3. MySQLの起動

サンプルスキーマが含まれているイメージです。

$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.8

4. MySQLコマンドラインクライアントの起動

まだ使わないけど、起動しておきます。

$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

5. KafkaConnectを起動

Kafka と MySQL のデータ連携に必要です。
API で操作可能なので curl 使います。

$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8

DebeziumMySQLコネクタを登録します。

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
undefined

6. 登録されたメッセージを確認する

Kafka 実行中のコンテナに入る。

$ docker exec -it kafka bash

以下のコマンドを使用すると、Kafka に入ってきたメッセージの内容をリアルタイムに確認できます。

[kafka@851e7f6e1d62 ~]$ bin/kafka-console-consumer.sh --topic dbserver1.inventory.customers --bootstrap-server kafka:9092

[–topic dbserver1.inventory.customers]
以下のコマンドで確認。
$ curl -i -X GET -H “Accept:application/json” localhost:8083/connectors/inventory-connector/topics

[–from-beginning]
今回使用していないが、このパラメーターを使用することで、全メッセージを確認できる。

7. データに変更を加える

MySQLコマンドラインクライアントで

use inventory;
UPDATE customers SET first_name='Anne123' WHERE id=1004;

customers テーブルへ変更を加えると、kafka-console-consumer.sh の画面に変更内容のメッセージキューが入ってくることが確認できる。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
          // 省略...
      }
  },
  "payload": {
    "before": {
      "id": 1004,
      "first_name": "Anne", // ★変更前
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": {
      "id": 1004,
      "first_name": "Anne123", // ★変更後
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "1.8.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1640265034000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 1599,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "u",
    "ts_ms": 1640265034477,
    "transaction": null
  }
}

続き

明日は、上記の環境を windows PC で構築し、メッセージキューに入ってきた内容で lambda を叩いてみます。
よく使う環境なら docker-compose にするのが良いんだろうな…

2021-12-24 追記
Windows でうまくコマンドを実行できないときは PowerShell を使うと良い。
Windows には Curl は別途インストールが必要。(または Postman などの API Client)

广告
将在 10 秒后关闭
bannerAds