Debezium – 监测数据库等数据源的更改,并通过消费者接收

摘要

以下是上次的延续。
这次是有关消费者(Consumer)接收变更内容并输出消息的笔记。

docker-compose (Docker 合成工具)

由于经常启动很麻烦,所以我选择了用docker-compose。
这是将上一次的教程中使用docker run的操作改成了docker-compose的样子。

答案: 点数/积分

debezium_network という名前の Docker ネットワークにすべて配置します。
version がちょっと古いのは会社の docker のバージョンがちょっと古いからです。v3 を使用すると network 周りの書き方などがちょっと変わるみたいです。

version: "2.4"
services:
  zookeeper:
    container_name: "zookeeper"
    image: "debezium/zookeeper:1.8"
    networks:
      - "debezium_network"
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"

  kafka:
    container_name: "kafka"
    image: "debezium/kafka:1.8"
    depends_on:
      - "zookeeper"
    networks:
      - "debezium_network"
    ports:
      - "9092:9092"
    environment:
      - "ZOOKEEPER_CONNECT=zookeeper:2181"

  mysql:
    container_name: "mysql"
    image: "debezium/example-mysql:1.8"
    networks:
      - "debezium_network"
    ports:
      - "3306:3306"
    environment:
      - "MYSQL_ROOT_PASSWORD=debezium"
      - "MYSQL_USER=mysqluser"
      - "MYSQL_PASSWORD=mysqlpw"

  connect:
    container_name: "connect"
    image: "debezium/connect:1.8"
    depends_on:
      - "zookeeper"
      - "kafka"
      - "mysql"
    networks:
      - "debezium_network"
    ports:
      - "8083:8083"
    environment:
      - "BOOTSTRAP_SERVERS=kafka:9092"
      - "GROUP_ID=1"
      - "CONFIG_STORAGE_TOPIC=my_connect_configs"
      - "OFFSET_STORAGE_TOPIC=my_connect_offsets"
      - "STATUS_STORAGE_TOPIC=my_connect_statuses"

networks:
  default:
    external:
      name: "bridge"
  debezium_network:
    name: debezium_network
    driver: "bridge"

先前的需求是关于在 KafkaConnect 启动后手动注册 DebeziumMySQL 连接器。

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

隨便說說

要想一次性停止已启动的容器,可以使用毁灭咒语来轻松实现。

顾客准备

当数据源发生变化时,debezium会检测到并将变更内容作为队列注册到kafka中。
利用注册到队列中的内容进行某些操作。

这次我们将快速地使用Node.js创建一个应用程序,一旦有队列被注册,就将其内容显示在控制台上。我们使用了一个名为kafka-node的包。

{
  "name": "test-kafka-node",
  "version": "1.0.0",
  "description": "",
  "main": "index.ts",
  "scripts": {
    "start": "ts-node index.ts"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "kafka-node": "^5.0.0",
    "ts-node": "^10.4.0"
  },
  "devDependencies": {
    "@types/kafka-node": "^3.0.0"
  }
}

import kafka from "kafka-node";

const client = new kafka.KafkaClient({
  kafkaHost: "kafka:9092",
});
const consumer = new kafka.Consumer(
  client,
  [{ topic: "dbserver1.inventory.customers"}], // ★customers テーブルに変更を加える予定
  {
    autoCommit: true,
    fromOffset: true,
    groupId: "consumer1",
  }
);

consumer.on("message", function (message) {
  console.log(message);
});

console.log("ready.");

我也会创建一个Dockerfile。
这基本上是完全抄袭自这里的。

# OS: Debian Buster
# Node.js: 14.4.0
FROM node:14.4.0-buster

# Create app directory
WORKDIR /usr/src/app

# Install app dependencies (package.json and package-lock.json)
COPY package*.json ./
RUN npm install

# Bundle app source (server.js)
COPY . .

# Listen port
EXPOSE 8080

# Run Node.js
CMD [ "npm", "run", "start" ]

顾客启动

为了在同一网络中启动,请使用–network debezium_network选项进行指定。

$ docker image build -t test-kafka-node:latest .
$ docker run -it --rm --name consumer --network debezium_network test-kafka-node

做出更改

这次我们将直接进入MySQL容器,并尝试执行Update语句。

$ docker exec -it mysql bash
root@d8382f3c83e8:/# mysql -uroot -pdebezium
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 8.0.27 MySQL Community Server - GPL

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> use inventory;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> UPDATE customers SET first_name='Anne99999' WHERE id=1004;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

如果对上述的customers表进行更改,那么

    1. 顾客表会有更改

 

    1. 通过Debezium将队列注册到Kafka中

 

    Node.js的消费者会捕获进入Kafka的队列,然后执行console.log(message)操作。

在消费者容器中实际上可以确认消息。
由于可以从消息中获取变更内容,因此可以根据这些内容调用 Lambda,将数据插入到读模型中,发送电子邮件等,可以进行任何其他操作。

广告
将在 10 秒后关闭
bannerAds