尝试在OpenShift上使用Debezium连接器将配置的Confluent Platform和MySQL进行集成

首先

让我们使用Debezium的MySQL Source Connector在OpenShift上配置的Confluent Platform和MySQL之间进行数据协作。

只需要一种选择的话,可以这样释义:”条件”

    • IBM Cloud® Virtual Private Cloud(VPC)にOpenShift Container Platform(OCP) 4.7を構成

 

    • OCP上にConfluent Platform 6.0.0を構成(Confluent Operator 1.7.0を利用)

 

    OCP上にMySQL 8.0を構成

请提供更多的上下文或者具体内容,以便我能够正确地为您提供翻译。

Debezium MySQL 连接器的配置示例
Debezium MySQL 源连接器配置属性 – 数据库历史参数
将 MySQL 表的更新信息在 Kafka 上通过 ksql 进行处理的示例

确认构成

确保Confluent Platform成功启动。

>oc get all -n confluent-namespace
I0912 23:39:24.059883   37555 request.go:621] Throttling request took 1.090165589s, request: GET:https://c100-e.jp-tok.containers.cloud.ibm.com:31330/apis/packages.operators.coreos.com/v1?timeout=32s
NAME                               READY   STATUS    RESTARTS   AGE
pod/cc-operator-6ffb5f5489-b6l6r   1/1     Running   0          13h
pod/connectors-0                   1/1     Running   0          13h
pod/connectors-1                   1/1     Running   0          13h
pod/controlcenter-0                1/1     Running   0          13h
pod/kafka-0                        1/1     Running   0          13h
pod/kafka-1                        1/1     Running   0          13h
pod/kafka-2                        1/1     Running   0          13h
pod/ksql-0                         1/1     Running   0          13h
pod/ksql-1                         1/1     Running   0          13h
pod/replicator-0                   1/1     Running   0          13h
pod/replicator-1                   1/1     Running   0          13h
pod/schemaregistry-0               1/1     Running   0          13h
pod/zookeeper-0                    1/1     Running   0          13h
pod/zookeeper-1                    1/1     Running   0          13h
pod/zookeeper-2                    1/1     Running   0          13h
(以下、省略)

请将Kafka服务器的auto.create.topics.enable属性设置为true。

确认MySQL正常运行。

>oc get all -n shoheim

I0912 23:02:28.265544   36803 request.go:621] Throttling request took 1.148324247s, request: GET:https://c100-e.jp-tok.containers.cloud.ibm.com:31330/apis/monitoring.operator.ibm.com/v1alpha1?timeout=32s
NAME                 READY   STATUS      RESTARTS   AGE
pod/mysql-1-deploy   0/1     Completed   0          45s
pod/mysql-1-z54sl    1/1     Running     0          38s
(以下、省略)

※请参考以下内容来设置用户权限和日志相关设置。
MySQL的Debezium连接器-进行设置

在MySQL中,需要构建以下数据库和表。

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| opdb               |
+--------------------+

mysql> SHOW TABLES;
+----------------+
| Tables_in_opdb |
+----------------+
| menus          |
+----------------+
1 row in set (0.00 sec)

mysql> SELECT * from menus;
+------+-----------+
| id   | name      |
+------+-----------+
|    1 | curry     |
|    2 | hamburger |
+------+-----------+
2 rows in set (0.00 sec)

MySQL源连接器的配置

安装MySQL Connector插件

使用Confluent Hub客户端(confluent-hub命令)将Debezium MySQL Connector插件安装到Connectors的Pod上。

>oc exec -it connectors-0 /bin/bash
bash-4.4$ confluent-hub install debezium/debezium-connector-mysql:1.6.0
The component can be installed in any of the following Confluent Platform installations: 
  1. / (installed rpm/deb package) 
  2. / (where this tool is installed) 
Choose one of these to continue the installation (1-2): 2
Do you want to install this into /usr/share/confluent-hub-components? (yN) y


Component's license: 
Apache 2.0 
https://github.com/debezium/debezium/blob/master/LICENSE.txt 
I agree to the software license agreement (yN) y

You are about to install 'debezium-connector-mysql' from Debezium Community, as published on Confluent Hub. 
Do you want to continue? (yN) y

Downloading component Debezium MySQL CDC Connector 1.6.0, provided by Debezium Community from Confluent Hub and installing into /usr/share/confluent-hub-components 
Detected Worker's configs: 
  1. Standard: /etc/kafka/connect-distributed.properties 
  2. Standard: /etc/kafka/connect-standalone.properties 
  3. Standard: /etc/schema-registry/connect-avro-distributed.properties 
  4. Standard: /etc/schema-registry/connect-avro-standalone.properties 
  5. Used by Connect process with PID : /opt/confluent/etc/connect/connect.properties 
Do you want to update all detected configs? (yN) y

Adding installation directory to plugin path in the following files: 
  /etc/kafka/connect-distributed.properties 
  /etc/kafka/connect-standalone.properties 
  /etc/schema-registry/connect-avro-distributed.properties 
  /etc/schema-registry/connect-avro-standalone.properties 
  /opt/confluent/etc/connect/connect.properties 

Completed 

在这里,将插件安装到/usr/share/confluent-hub-components目录。因此,在连接器Pod之前,配置了将上述目录作为挂载点的持久卷。

确认已安装的内容

bash-4.4$ cd /usr/share/confluent-hub-components
bash-4.4$ ls -ltr
total 0
drwxrwxrwx. 5 1001 root 63 Oct 25 02:26 debezium-debezium-connector-mysql
bash-4.4$ ls -l debezium-debezium-connector-mysql
total 8
drwxrwxrwx. 2 1001 root   59 Oct 25 02:26 assets
drwxrwxrwx. 2 1001 root  175 Oct 25 02:26 doc
drwxrwxrwx. 2 1001 root 4096 Oct 25 02:26 lib
-rw-rw-rw-. 1 1001 root 2868 Oct 25 02:26 manifest.json
bash-4.4$ ls -l debezium-debezium-connector-mysql/lib
total 9624
-rw-rw-rw-. 1 1001 root  337864 Oct 25 02:26 antlr4-runtime-4.8.jar
-rw-rw-rw-. 1 1001 root   20743 Oct 25 02:26 debezium-api-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root  376353 Oct 25 02:26 debezium-connector-mysql-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root  866648 Oct 25 02:26 debezium-core-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 2777855 Oct 25 02:26 debezium-ddl-parser-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root    4617 Oct 25 02:26 failureaccess-1.0.1.jar
-rw-rw-rw-. 1 1001 root 2858426 Oct 25 02:26 guava-30.0-jre.jar
-rw-rw-rw-. 1 1001 root  192762 Oct 25 02:26 mysql-binlog-connector-java-0.25.1.jar
-rw-rw-rw-. 1 1001 root 2397321 Oct 25 02:26 mysql-connector-java-8.0.21.jar

重新启动Pod,通过REST从本地机器检查插件。

>curl -H "Accept:application/json" https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connector-plugins -k |jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   597  100   597    0     0    711      0 --:--:-- --:--:-- --:--:--   710
[
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "1.6.0.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "6.1.0-ce"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "6.1.0-ce"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

io.debezium.connector.mysql.MySqlConnector正在显示。(其他插件都是默认安装的)

Connector_2021-10-26-11-06-24.png

2. MySQL Source Connector的配置

可以从C3进行配置,但在这里我们将使用本地计算机通过REST构建连接器。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" -k https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connectors/ -d '{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "172.21.17.38", "database.port": "3306", "database.user": "user", "database.password": "pass", "database.server.id": "184055", "database.server.name": "dbserver1", "database.include.list": "opdb", "database.history.kafka.bootstrap.servers": "kafka:9071", "database.history.kafka.topic": "dbhistory.opdb", "table.include.list": "opdb.menus", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN", "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";", "include.schema.changes": "true" } }'

整理并显示数据部门

{
    "name": "mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "172.21.17.38",
        "database.port": "3306",
        "database.user": "user",
        "database.password": "pass",
        "database.server.id": "184055",
        "database.server.name": "dbserver1",
        "database.include.list": "opdb",
        "database.history.kafka.bootstrap.servers": "kafka:9071",
        "database.history.kafka.topic": "dbhistory.opdb",
        "table.include.list": "opdb.menus",
        "database.history.consumer.security.protocol": "SASL_SSL",
        "database.history.consumer.sasl.mechanism": "PLAIN",
        "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
        "database.history.producer.security.protocol": "SASL_SSL",
        "database.history.producer.sasl.mechanism": "PLAIN",
        "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
        "include.schema.changes": "true"
    }
}

请参考以下属性。

MySQL连接器配置示例
※由于连接器的版本可能会导致属性发生变化,请参考您使用的版本文档。这里使用的是1.6版本。

您可以在上面的链接中查看基本属性,但是我们还添加了用于使用SASL/PLAIN连接到Kafka服务器的属性(database.history.producer.xxx、database.history.consumer.xxx)。

Debezium MySQL源连接器配置属性-数据库历史参数

确认已成功运行所构建的源连接器。

>curl -H "Accept:application/json" https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connectors/mysql-connector/status -k |jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   245  100   245    0     0    641      0 --:--:-- --:--:-- --:--:--   641
{
  "name": "mysql-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connectors-1.connectors.confluent-namespace.svc:9083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connectors-1.connectors.confluent-namespace.svc:9083"
    }
  ],
  "type": "source"
}

确认主题上的数据

通过创建的Connector,确认MySQL的数据已发送到Kafka的主题。

首先,检查在Kafka上创建的主题。

Connector_2021-12-12-12-58-36.png

数据已发送到odserver1.opdb.menus主题。

[
    {
        "topic": "dbserver1.opdb.menus",
        "partition": 0,
        "offset": 0,
        "timestamp": 1639280629029,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "before": null,
            "after": {
                "id": 1,
                "name": "curry"
            },
            "source": {
                "version": "1.6.0.Final",
                "connector": "mysql",
                "name": "dbserver1",
                "ts_ms": 1639280628022,
                "snapshot": "true",
                "db": "opdb",
                "sequence": null,
                "table": "menus",
                "server_id": 0,
                "gtid": null,
                "file": "binlog.000002",
                "pos": 3264,
                "row": 0,
                "thread": null,
                "query": null
            },
            "op": "r",
            "ts_ms": 1639280628024,
            "transaction": null
        }
    },
    {
        "topic": "dbserver1.opdb.menus",
        "partition": 0,
        "offset": 1,
        "timestamp": 1639280629030,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "before": null,
            "after": {
                "id": 2,
                "name": "hamburger"
            },
            "source": {
                "version": "1.6.0.Final",
                "connector": "mysql",
                "name": "dbserver1",
                "ts_ms": 1639280628026,
                "snapshot": "true",
                "db": "opdb",
                "sequence": null,
                "table": "menus",
                "server_id": 0,
                "gtid": null,
                "file": "binlog.000002",
                "pos": 3264,
                "row": 0,
                "thread": null,
                "query": null
            },
            "op": "r",
            "ts_ms": 1639280628026,
            "transaction": null
        }
    }
]

在Offset=0的时候可以看到menus表的第一行数据,而在Offset=1的时候可以看到第二行的数据。

当向菜单表中添加行时,也会向主题发送新数据。

INSERT INTO menus (id, name) VALUES (3, 'katsudon');
    {
        "topic": "dbserver1.opdb.menus",
        "partition": 0,
        "offset": 2,
        "timestamp": 1639280629030,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "before": null,
            "after": {
                "id": 3,
                "name": "katsudon"
            },
            "source": {
                "version": "1.6.0.Final",
                "connector": "mysql",
                "name": "dbserver1",
                "ts_ms": 1639280628026,
                "snapshot": "true",
                "db": "opdb",
                "sequence": null,
                "table": "menus",
                "server_id": 0,
                "gtid": null,
                "file": "binlog.000002",
                "pos": 3264,
                "row": 0,
                "thread": null,
                "query": null
            },
            "op": "r",
            "ts_ms": 1639280628026,
            "transaction": null
        }
    }

如果更新了表中的数据,下面的消息将被发送到主题中。

UPDATE menus SET name='tendon' WHERE id=3;
[
    {
        "topic": "dbserver1.opdb.menus",
        "partition": 0,
        "offset": 4,
        "timestamp": 1639530256773,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "before": {
                "id": 3,
                "name": "katsudon"
            },
            "after": {
                "id": 3,
                "name": "tendon"
            },
            "source": {
                "version": "1.6.0.Final",
                "connector": "mysql",
                "name": "dbserver1",
                "ts_ms": 1639530256000,
                "snapshot": "false",
                "db": "opdb",
                "sequence": null,
                "table": "menus",
                "server_id": 1,
                "gtid": null,
                "file": "binlog.000002",
                "pos": 3486,
                "row": 0,
                "thread": null,
                "query": null
            },
            "op": "u",
            "ts_ms": 1639530256515,
            "transaction": null
        }
    }
]

在before中存储着更新前的数据,在after中存储着更新后的数据。

如果是删除的情况,请参考以下内容。

DELETE FROM menus WHERE id=4;
[
    {
        "topic": "dbserver1.opdb.menus",
        "partition": 0,
        "offset": 5,
        "timestamp": 1639530687264,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "before": {
                "id": 4,
                "name": "gyoza"
            },
            "after": null,
            "source": {
                "version": "1.6.0.Final",
                "connector": "mysql",
                "name": "dbserver1",
                "ts_ms": 1639530687000,
                "snapshot": "false",
                "db": "opdb",
                "sequence": null,
                "table": "menus",
                "server_id": 1,
                "gtid": null,
                "file": "binlog.000002",
                "pos": 3794,
                "row": 0,
                "thread": null,
                "query": null
            },
            "op": "d",
            "ts_ms": 1639530687221,
            "transaction": null
        }
    }
]

以下的消息储存在Connector指定的database.history.kafka.topic属性所对应的dbhistory.opdb主题中。
这个主题用于Connector存储数据库的模式历史记录。

[
    {
        "topic": "dbhistory.opdb",
        "partition": 0,
        "offset": 0,
        "timestamp": 1639280627942,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "source": {
                "server": "dbserver1"
            },
            "position": {
                "ts_sec": 1639280627,
                "file": "binlog.000002",
                "pos": 3264,
                "snapshot": true
            },
            "databaseName": "",
            "ddl": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci",
            "tableChanges": []
        }
    },
    {
        "topic": "dbhistory.opdb",
        "partition": 0,
        "offset": 1,
        "timestamp": 1639280627970,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "source": {
                "server": "dbserver1"
            },
            "position": {
                "ts_sec": 1639280627,
                "file": "binlog.000002",
                "pos": 3264,
                "snapshot": true
            },
            "databaseName": "opdb",
            "ddl": "DROP TABLE IF EXISTS `opdb`.`menus`",
            "tableChanges": []
        }
    },
    {
        "topic": "dbhistory.opdb",
        "partition": 0,
        "offset": 2,
        "timestamp": 1639280627973,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "source": {
                "server": "dbserver1"
            },
            "position": {
                "ts_sec": 1639280627,
                "file": "binlog.000002",
                "pos": 3264,
                "snapshot": true
            },
            "databaseName": "opdb",
            "ddl": "DROP DATABASE IF EXISTS `opdb`",
            "tableChanges": []
        }
    },
    {
        "topic": "dbhistory.opdb",
        "partition": 0,
        "offset": 3,
        "timestamp": 1639280627975,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "source": {
                "server": "dbserver1"
            },
            "position": {
                "ts_sec": 1639280627,
                "file": "binlog.000002",
                "pos": 3264,
                "snapshot": true
            },
            "databaseName": "opdb",
            "ddl": "CREATE DATABASE `opdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
            "tableChanges": []
        }
    },
    {
        "topic": "dbhistory.opdb",
        "partition": 0,
        "offset": 4,
        "timestamp": 1639280627981,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "source": {
                "server": "dbserver1"
            },
            "position": {
                "ts_sec": 1639280627,
                "file": "binlog.000002",
                "pos": 3264,
                "snapshot": true
            },
            "databaseName": "opdb",
            "ddl": "USE `opdb`",
            "tableChanges": []
        }
    },
    {
        "topic": "dbhistory.opdb",
        "partition": 0,
        "offset": 5,
        "timestamp": 1639280627998,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": null,
        "value": {
            "source": {
                "server": "dbserver1"
            },
            "position": {
                "ts_sec": 1639280627,
                "file": "binlog.000002",
                "pos": 3264,
                "snapshot": true
            },
            "databaseName": "opdb",
            "ddl": "CREATE TABLE `menus` (\n  `id` int DEFAULT NULL,\n  `name` varchar(100) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
            "tableChanges": [
                {
                    "type": "CREATE",
                    "id": "\"opdb\".\"menus\"",
                    "table": {
                        "defaultCharsetName": "utf8mb4",
                        "primaryKeyColumnNames": [],
                        "columns": [
                            {
                                "name": "id",
                                "jdbcType": 4,
                                "typeName": "INT",
                                "typeExpression": "INT",
                                "charsetName": null,
                                "position": 1,
                                "optional": true,
                                "autoIncremented": false,
                                "generated": false
                            },
                            {
                                "name": "name",
                                "jdbcType": 12,
                                "typeName": "VARCHAR",
                                "typeExpression": "VARCHAR",
                                "charsetName": "utf8mb4",
                                "length": 100,
                                "position": 2,
                                "optional": true,
                                "autoIncremented": false,
                                "generated": false
                            }
                        ]
                    }
                }
            ]
        }
    }
]

最后

本来計算需要構建Sink Connector并且將數據庫複製到其他地方,但由於OCP環境不可用的各種情況,所以我們將在OCP環境恢復後重新開始。

广告
将在 10 秒后关闭
bannerAds