Debezium – 监测数据库等数据源的更改,并通过消费者接收
摘要
以下是上次的延续。
这次是有关消费者(Consumer)接收变更内容并输出消息的笔记。
docker-compose (Docker 合成工具)
由于经常启动很麻烦,所以我选择了用docker-compose。
这是将上一次的教程中使用docker run的操作改成了docker-compose的样子。
答案: 点数/积分
debezium_network という名前の Docker ネットワークにすべて配置します。
version がちょっと古いのは会社の docker のバージョンがちょっと古いからです。v3 を使用すると network 周りの書き方などがちょっと変わるみたいです。
version: "2.4"
services:
zookeeper:
container_name: "zookeeper"
image: "debezium/zookeeper:1.8"
networks:
- "debezium_network"
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
container_name: "kafka"
image: "debezium/kafka:1.8"
depends_on:
- "zookeeper"
networks:
- "debezium_network"
ports:
- "9092:9092"
environment:
- "ZOOKEEPER_CONNECT=zookeeper:2181"
mysql:
container_name: "mysql"
image: "debezium/example-mysql:1.8"
networks:
- "debezium_network"
ports:
- "3306:3306"
environment:
- "MYSQL_ROOT_PASSWORD=debezium"
- "MYSQL_USER=mysqluser"
- "MYSQL_PASSWORD=mysqlpw"
connect:
container_name: "connect"
image: "debezium/connect:1.8"
depends_on:
- "zookeeper"
- "kafka"
- "mysql"
networks:
- "debezium_network"
ports:
- "8083:8083"
environment:
- "BOOTSTRAP_SERVERS=kafka:9092"
- "GROUP_ID=1"
- "CONFIG_STORAGE_TOPIC=my_connect_configs"
- "OFFSET_STORAGE_TOPIC=my_connect_offsets"
- "STATUS_STORAGE_TOPIC=my_connect_statuses"
networks:
default:
external:
name: "bridge"
debezium_network:
name: debezium_network
driver: "bridge"
先前的需求是关于在 KafkaConnect 启动后手动注册 DebeziumMySQL 连接器。
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
隨便說說
要想一次性停止已启动的容器,可以使用毁灭咒语来轻松实现。
顾客准备
当数据源发生变化时,debezium会检测到并将变更内容作为队列注册到kafka中。
利用注册到队列中的内容进行某些操作。
这次我们将快速地使用Node.js创建一个应用程序,一旦有队列被注册,就将其内容显示在控制台上。我们使用了一个名为kafka-node的包。
{
"name": "test-kafka-node",
"version": "1.0.0",
"description": "",
"main": "index.ts",
"scripts": {
"start": "ts-node index.ts"
},
"author": "",
"license": "ISC",
"dependencies": {
"kafka-node": "^5.0.0",
"ts-node": "^10.4.0"
},
"devDependencies": {
"@types/kafka-node": "^3.0.0"
}
}
import kafka from "kafka-node";
const client = new kafka.KafkaClient({
kafkaHost: "kafka:9092",
});
const consumer = new kafka.Consumer(
client,
[{ topic: "dbserver1.inventory.customers"}], // ★customers テーブルに変更を加える予定
{
autoCommit: true,
fromOffset: true,
groupId: "consumer1",
}
);
consumer.on("message", function (message) {
console.log(message);
});
console.log("ready.");
我也会创建一个Dockerfile。
这基本上是完全抄袭自这里的。
# OS: Debian Buster
# Node.js: 14.4.0
FROM node:14.4.0-buster
# Create app directory
WORKDIR /usr/src/app
# Install app dependencies (package.json and package-lock.json)
COPY package*.json ./
RUN npm install
# Bundle app source (server.js)
COPY . .
# Listen port
EXPOSE 8080
# Run Node.js
CMD [ "npm", "run", "start" ]
顾客启动
为了在同一网络中启动,请使用–network debezium_network选项进行指定。
$ docker image build -t test-kafka-node:latest .
$ docker run -it --rm --name consumer --network debezium_network test-kafka-node
做出更改
这次我们将直接进入MySQL容器,并尝试执行Update语句。
$ docker exec -it mysql bash
root@d8382f3c83e8:/# mysql -uroot -pdebezium
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 8.0.27 MySQL Community Server - GPL
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use inventory;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> UPDATE customers SET first_name='Anne99999' WHERE id=1004;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
如果对上述的customers表进行更改,那么
-
- 顾客表会有更改
-
- 通过Debezium将队列注册到Kafka中
- Node.js的消费者会捕获进入Kafka的队列,然后执行console.log(message)操作。
在消费者容器中实际上可以确认消息。
由于可以从消息中获取变更内容,因此可以根据这些内容调用 Lambda,将数据插入到读模型中,发送电子邮件等,可以进行任何其他操作。