尝试在OpenShift上使用Debezium连接器将配置的Confluent Platform和MySQL进行集成
首先
让我们使用Debezium的MySQL Source Connector在OpenShift上配置的Confluent Platform和MySQL之间进行数据协作。
只需要一种选择的话,可以这样释义:”条件”
-
- IBM Cloud® Virtual Private Cloud(VPC)にOpenShift Container Platform(OCP) 4.7を構成
-
- OCP上にConfluent Platform 6.0.0を構成(Confluent Operator 1.7.0を利用)
- OCP上にMySQL 8.0を構成
请提供更多的上下文或者具体内容,以便我能够正确地为您提供翻译。
Debezium MySQL 连接器的配置示例
Debezium MySQL 源连接器配置属性 – 数据库历史参数
将 MySQL 表的更新信息在 Kafka 上通过 ksql 进行处理的示例
确认构成
确保Confluent Platform成功启动。
>oc get all -n confluent-namespace
I0912 23:39:24.059883 37555 request.go:621] Throttling request took 1.090165589s, request: GET:https://c100-e.jp-tok.containers.cloud.ibm.com:31330/apis/packages.operators.coreos.com/v1?timeout=32s
NAME READY STATUS RESTARTS AGE
pod/cc-operator-6ffb5f5489-b6l6r 1/1 Running 0 13h
pod/connectors-0 1/1 Running 0 13h
pod/connectors-1 1/1 Running 0 13h
pod/controlcenter-0 1/1 Running 0 13h
pod/kafka-0 1/1 Running 0 13h
pod/kafka-1 1/1 Running 0 13h
pod/kafka-2 1/1 Running 0 13h
pod/ksql-0 1/1 Running 0 13h
pod/ksql-1 1/1 Running 0 13h
pod/replicator-0 1/1 Running 0 13h
pod/replicator-1 1/1 Running 0 13h
pod/schemaregistry-0 1/1 Running 0 13h
pod/zookeeper-0 1/1 Running 0 13h
pod/zookeeper-1 1/1 Running 0 13h
pod/zookeeper-2 1/1 Running 0 13h
(以下、省略)
请将Kafka服务器的auto.create.topics.enable属性设置为true。
确认MySQL正常运行。
>oc get all -n shoheim
I0912 23:02:28.265544 36803 request.go:621] Throttling request took 1.148324247s, request: GET:https://c100-e.jp-tok.containers.cloud.ibm.com:31330/apis/monitoring.operator.ibm.com/v1alpha1?timeout=32s
NAME READY STATUS RESTARTS AGE
pod/mysql-1-deploy 0/1 Completed 0 45s
pod/mysql-1-z54sl 1/1 Running 0 38s
(以下、省略)
※请参考以下内容来设置用户权限和日志相关设置。
MySQL的Debezium连接器-进行设置
在MySQL中,需要构建以下数据库和表。
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| opdb |
+--------------------+
mysql> SHOW TABLES;
+----------------+
| Tables_in_opdb |
+----------------+
| menus |
+----------------+
1 row in set (0.00 sec)
mysql> SELECT * from menus;
+------+-----------+
| id | name |
+------+-----------+
| 1 | curry |
| 2 | hamburger |
+------+-----------+
2 rows in set (0.00 sec)
MySQL源连接器的配置
安装MySQL Connector插件
使用Confluent Hub客户端(confluent-hub命令)将Debezium MySQL Connector插件安装到Connectors的Pod上。
>oc exec -it connectors-0 /bin/bash
bash-4.4$ confluent-hub install debezium/debezium-connector-mysql:1.6.0
The component can be installed in any of the following Confluent Platform installations:
1. / (installed rpm/deb package)
2. / (where this tool is installed)
Choose one of these to continue the installation (1-2): 2
Do you want to install this into /usr/share/confluent-hub-components? (yN) y
Component's license:
Apache 2.0
https://github.com/debezium/debezium/blob/master/LICENSE.txt
I agree to the software license agreement (yN) y
You are about to install 'debezium-connector-mysql' from Debezium Community, as published on Confluent Hub.
Do you want to continue? (yN) y
Downloading component Debezium MySQL CDC Connector 1.6.0, provided by Debezium Community from Confluent Hub and installing into /usr/share/confluent-hub-components
Detected Worker's configs:
1. Standard: /etc/kafka/connect-distributed.properties
2. Standard: /etc/kafka/connect-standalone.properties
3. Standard: /etc/schema-registry/connect-avro-distributed.properties
4. Standard: /etc/schema-registry/connect-avro-standalone.properties
5. Used by Connect process with PID : /opt/confluent/etc/connect/connect.properties
Do you want to update all detected configs? (yN) y
Adding installation directory to plugin path in the following files:
/etc/kafka/connect-distributed.properties
/etc/kafka/connect-standalone.properties
/etc/schema-registry/connect-avro-distributed.properties
/etc/schema-registry/connect-avro-standalone.properties
/opt/confluent/etc/connect/connect.properties
Completed
在这里,将插件安装到/usr/share/confluent-hub-components目录。因此,在连接器Pod之前,配置了将上述目录作为挂载点的持久卷。
确认已安装的内容
bash-4.4$ cd /usr/share/confluent-hub-components
bash-4.4$ ls -ltr
total 0
drwxrwxrwx. 5 1001 root 63 Oct 25 02:26 debezium-debezium-connector-mysql
bash-4.4$ ls -l debezium-debezium-connector-mysql
total 8
drwxrwxrwx. 2 1001 root 59 Oct 25 02:26 assets
drwxrwxrwx. 2 1001 root 175 Oct 25 02:26 doc
drwxrwxrwx. 2 1001 root 4096 Oct 25 02:26 lib
-rw-rw-rw-. 1 1001 root 2868 Oct 25 02:26 manifest.json
bash-4.4$ ls -l debezium-debezium-connector-mysql/lib
total 9624
-rw-rw-rw-. 1 1001 root 337864 Oct 25 02:26 antlr4-runtime-4.8.jar
-rw-rw-rw-. 1 1001 root 20743 Oct 25 02:26 debezium-api-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 376353 Oct 25 02:26 debezium-connector-mysql-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 866648 Oct 25 02:26 debezium-core-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 2777855 Oct 25 02:26 debezium-ddl-parser-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 4617 Oct 25 02:26 failureaccess-1.0.1.jar
-rw-rw-rw-. 1 1001 root 2858426 Oct 25 02:26 guava-30.0-jre.jar
-rw-rw-rw-. 1 1001 root 192762 Oct 25 02:26 mysql-binlog-connector-java-0.25.1.jar
-rw-rw-rw-. 1 1001 root 2397321 Oct 25 02:26 mysql-connector-java-8.0.21.jar
重新启动Pod,通过REST从本地机器检查插件。
>curl -H "Accept:application/json" https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connector-plugins -k |jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 597 100 597 0 0 711 0 --:--:-- --:--:-- --:--:-- 710
[
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "1.6.0.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "6.1.0-ce"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "6.1.0-ce"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
io.debezium.connector.mysql.MySqlConnector正在显示。(其他插件都是默认安装的)
2. MySQL Source Connector的配置
可以从C3进行配置,但在这里我们将使用本地计算机通过REST构建连接器。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" -k https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connectors/ -d '{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "172.21.17.38", "database.port": "3306", "database.user": "user", "database.password": "pass", "database.server.id": "184055", "database.server.name": "dbserver1", "database.include.list": "opdb", "database.history.kafka.bootstrap.servers": "kafka:9071", "database.history.kafka.topic": "dbhistory.opdb", "table.include.list": "opdb.menus", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN", "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";", "include.schema.changes": "true" } }'
整理并显示数据部门
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.21.17.38",
"database.port": "3306",
"database.user": "user",
"database.password": "pass",
"database.server.id": "184055",
"database.server.name": "dbserver1",
"database.include.list": "opdb",
"database.history.kafka.bootstrap.servers": "kafka:9071",
"database.history.kafka.topic": "dbhistory.opdb",
"table.include.list": "opdb.menus",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
"include.schema.changes": "true"
}
}
请参考以下属性。
MySQL连接器配置示例
※由于连接器的版本可能会导致属性发生变化,请参考您使用的版本文档。这里使用的是1.6版本。
您可以在上面的链接中查看基本属性,但是我们还添加了用于使用SASL/PLAIN连接到Kafka服务器的属性(database.history.producer.xxx、database.history.consumer.xxx)。
Debezium MySQL源连接器配置属性-数据库历史参数
确认已成功运行所构建的源连接器。
>curl -H "Accept:application/json" https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connectors/mysql-connector/status -k |jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 245 100 245 0 0 641 0 --:--:-- --:--:-- --:--:-- 641
{
"name": "mysql-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connectors-1.connectors.confluent-namespace.svc:9083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connectors-1.connectors.confluent-namespace.svc:9083"
}
],
"type": "source"
}
确认主题上的数据
通过创建的Connector,确认MySQL的数据已发送到Kafka的主题。
首先,检查在Kafka上创建的主题。
数据已发送到odserver1.opdb.menus主题。
[
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 0,
"timestamp": 1639280629029,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": null,
"after": {
"id": 1,
"name": "curry"
},
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639280628022,
"snapshot": "true",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 0,
"gtid": null,
"file": "binlog.000002",
"pos": 3264,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1639280628024,
"transaction": null
}
},
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 1,
"timestamp": 1639280629030,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": null,
"after": {
"id": 2,
"name": "hamburger"
},
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639280628026,
"snapshot": "true",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 0,
"gtid": null,
"file": "binlog.000002",
"pos": 3264,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1639280628026,
"transaction": null
}
}
]
在Offset=0的时候可以看到menus表的第一行数据,而在Offset=1的时候可以看到第二行的数据。
当向菜单表中添加行时,也会向主题发送新数据。
INSERT INTO menus (id, name) VALUES (3, 'katsudon');
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 2,
"timestamp": 1639280629030,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": null,
"after": {
"id": 3,
"name": "katsudon"
},
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639280628026,
"snapshot": "true",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 0,
"gtid": null,
"file": "binlog.000002",
"pos": 3264,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1639280628026,
"transaction": null
}
}
如果更新了表中的数据,下面的消息将被发送到主题中。
UPDATE menus SET name='tendon' WHERE id=3;
[
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 4,
"timestamp": 1639530256773,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": {
"id": 3,
"name": "katsudon"
},
"after": {
"id": 3,
"name": "tendon"
},
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639530256000,
"snapshot": "false",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 3486,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1639530256515,
"transaction": null
}
}
]
在before中存储着更新前的数据,在after中存储着更新后的数据。
如果是删除的情况,请参考以下内容。
DELETE FROM menus WHERE id=4;
[
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 5,
"timestamp": 1639530687264,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": {
"id": 4,
"name": "gyoza"
},
"after": null,
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639530687000,
"snapshot": "false",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 3794,
"row": 0,
"thread": null,
"query": null
},
"op": "d",
"ts_ms": 1639530687221,
"transaction": null
}
}
]
以下的消息储存在Connector指定的database.history.kafka.topic属性所对应的dbhistory.opdb主题中。
这个主题用于Connector存储数据库的模式历史记录。
[
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 0,
"timestamp": 1639280627942,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "",
"ddl": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 1,
"timestamp": 1639280627970,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "DROP TABLE IF EXISTS `opdb`.`menus`",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 2,
"timestamp": 1639280627973,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "DROP DATABASE IF EXISTS `opdb`",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 3,
"timestamp": 1639280627975,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "CREATE DATABASE `opdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 4,
"timestamp": 1639280627981,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "USE `opdb`",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 5,
"timestamp": 1639280627998,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "CREATE TABLE `menus` (\n `id` int DEFAULT NULL,\n `name` varchar(100) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"tableChanges": [
{
"type": "CREATE",
"id": "\"opdb\".\"menus\"",
"table": {
"defaultCharsetName": "utf8mb4",
"primaryKeyColumnNames": [],
"columns": [
{
"name": "id",
"jdbcType": 4,
"typeName": "INT",
"typeExpression": "INT",
"charsetName": null,
"position": 1,
"optional": true,
"autoIncremented": false,
"generated": false
},
{
"name": "name",
"jdbcType": 12,
"typeName": "VARCHAR",
"typeExpression": "VARCHAR",
"charsetName": "utf8mb4",
"length": 100,
"position": 2,
"optional": true,
"autoIncremented": false,
"generated": false
}
]
}
}
]
}
}
]
最后
本来計算需要構建Sink Connector并且將數據庫複製到其他地方,但由於OCP環境不可用的各種情況,所以我們將在OCP環境恢復後重新開始。