使用 Oracle Streaming Service 和 Autonomous DB 的 Kafka Connect 进行动态协同工作
首先
尝试了在Oracle Blogs上介绍的”使用Kafka Connect与Oracle流媒体服务和自主数据库”,由于遇到一些问题,现在将内容总结如下:当数据被插入到Autonomous DB的TEST表中时,该数据将自动通知到Streaming作为消息。
获取认证令牌
请根据此参考在OCI控制台上获取身份验证令牌。(省略)
创建虚拟机实例
在本文中,因为我们将使用已经创建的Kafka Connect Docker映像,所以需要安装Docker的虚拟机实例。为了方便起见,这次我使用了Oracle Developer Image,但您可以随意预配虚拟机实例。(省略部分)
创建自主数据库
接下来,我们将部署Autonomous DB。
请参考以下步骤:“1. 通过OCI控制台部署ATP”。
自主数据库的设置
接下来,创建用于测试与Autonomous DB集成的模式(schema)和表,并获取钱包的资格证书。
登錄後,執行以下操作,創建一個從Kafka connect訪問的用戶。
CREATE USER connectdemo IDENTIFIED BY "Str0ngP@$$word1234";
GRANT CONNECT, RESOURCE TO connectdemo;
GRANT UNLIMITED TABLESPACE TO connectdemo;
接下来执行以下SQL语句,创建一个用于测试的数据表。
CREATE TABLE TEST
(
ID NUMBER(10,0) GENERATED BY DEFAULT ON NULL AS IDENTITY,
USERNAME VARCHAR2(50) NOT NULL,
FIRST_NAME VARCHAR2(50) NOT NULL,
MIDDLE_NAME VARCHAR2(50),
LAST_NAME VARCHAR2(50) NOT NULL,
AGE NUMBER(5,0) DEFAULT 0 NOT NULL,
CREATED_ON TIMESTAMP(9) NOT NULL,
CONSTRAINT TEST_PK PRIMARY KEY
(
ID
)
ENABLE
);
因为之后会使用,我会把它保存在某个地方。
创建Streaming Pool
我們接下來要創建 “Streaming Pool”。
在OCI的控制台上,從漢堡菜單中選擇 “分析 – 流式處理”,然後轉到 “Streaming” 頁面。
在流媒体池的名称处输入”demo-stream-pool”,然后按下扩展选项的显示按钮,在其中勾选”自动创建主题”,最后按下”创建”按钮。
创建Kafka连接配置
配置必要的文件(如jdbc驱动程序等)
一旦达到这个程度后,我们将进行Kafka Connect的设置。
通过Terminal访问之前创建的VM实例。
这次我们使用Teraterm。
一旦访问成功,执行以下命令来创建一个项目所需的目录。
sudo mkdir -p /projects/connect-demo/drivers
sudo mkdir -p /projects/connect-demo/kafka-jdbc/connector
sudo mkdir -p /projects/connect-demo/wallet
sudo chmod -R 777 /projects/
接下来,我们将安排驱动程序和钱包。
Oracle JDBC 驱动程序
从Oracle JDBC驱动程序下载并将其放置在/projects/connect-demo/drivers目录中。
*本次使用的是ojdbc10-full.tar.gz。
Kafka JDBC 连接器
从 Kafka JDBC Connector 的官方网站下载并将文件放置在 /projects/connect-demo/kafka-jdbc/connector 目录下,并解压缩文件。本次操作所下载的文件为 confluentinc-kafka-connect-jdbc-10.0.1.zip。
ATP 钱包
请将从ATP控制台下载的钱包文件放置在/projects/connect-demo/wallet目录下,并进行解压。
※本次操作指的是解压文件Wallet_tfOKEATPDB.zip。
当完成必要文件的配置后,将会呈现如下所示的结构。
projects
└ connect-demo
├ drivers
│ └ ojdbc10-full.tar.gz
├ kafka-jdbc
│ └ connector
│ ├ confluentinc-kafka-connect-jdbc-10.0.1.zip
│ └ confluentinc-kafka-connect-jdbc-10.0.1
│ ├ assets
│ │ ├ confluent.png
│ │ └ jdbc.jpg
│ ├ doc
│ │ ├ LICENSE
│ │ ├ LICENSE-xmlparserv2-19.7.0.0.txt
│ │ ├ licenses.html
│ │ ├ NOTICE
│ │ ├ README.md
│ │ ├ version.txt
│ │ └ licenses
│ │ ├ LICENSE-kafka-connect-jdbc-10.0.0-SNAPSHOT.txt
│ │ ├ LICENSE-mssql-jdbc-8.4.1.jre8.txt
│ │ ├ LICENSE-ojdbc8-19.7.0.0.txt
│ │ ├ LICENSE-ons-19.7.0.0.txt
│ │ ├ LICENSE-oraclepki-19.7.0.0.txt
│ │ ├ LICENSE-orai18n-19.7.0.0.txt
│ │ ├ LICENSE-osdt_cert-19.7.0.0.txt
│ │ ├ LICENSE-osdt_core-19.7.0.0.txt
│ │ ├ LICENSE-postgresql-42.2.10.txt
│ │ ├ LICENSE-simplefan-19.7.0.0.txt
│ │ ├ LICENSE-sqlite-jdbc-3.25.2.txt
│ │ ├ LICENSE-ucp-19.7.0.0.txt
│ │ └ LICENSE-xdb-19.7.0.0.txt
│ ├ etc
│ │ ├ sink-quickstart-sqlite.properties
│ │ └ source-quickstart-sqlite.properties
│ └ lib
│ ├ common-utils-6.0.0.jar
│ ├ jtds-1.3.1.jar
│ ├ kafka-connect-jdbc-10.0.1.jar
│ ├ mssql-jdbc-8.4.1.jre8.jar
│ ├ ojdbc8-19.7.0.0.jar
│ ├ ojdbc8-production-19.7.0.0.pom
│ ├ ons-19.7.0.0.jar
│ ├ oraclepki-19.7.0.0.jar
│ ├ orai18n-19.7.0.0.jar
│ ├ osdt_cert-19.7.0.0.jar
│ ├ osdt_core-19.7.0.0.jar
│ ├ postgresql-42.2.10.jar
│ ├ simplefan-19.7.0.0.jar
│ ├ slf4j-api-1.7.30.jar
│ ├ sqlite-jdbc-3.25.2.jar
│ ├ ucp-19.7.0.0.jar
│ ├ xdb-19.7.0.0.jar
│ ├ xmlparserv2-19.7.0.0.jar
│ └ manifest.json
└ wallet
├ cwallet.sso
├ ewallet.p12
├ keystore.jks
├ ojdbc.properties
├ README
├ sqlnet.ora
├ tnsnames.ora
├ truststore.jks
└ Wallet_tfOKEATPDB.zip
启动Kafka Connect
当达到这一步时,我们会创建配置文件并启动Kafka Connect。
首先,我们需要创建以下文件。
请将括号部分替换为您的实际值。
group.id=connect-demo-group
bootstrap.servers=<streamPoolBootstrapServer>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
config.storage.replication.factor=1
config.storage.partitions=1
config.storage.topic=<connectConfigOCID>-config
status.storage.replication.factor=1
status.storage.partitions=1
status.storage.topic=<connectConfigOCID>-status
offset.storage.replication.factor=1
offset.storage.partitions=1
offset.storage.topic=<connectConfigOCID>-offset
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000
接下来,我们将创建一个Dockerfile文件。
FROM debezium/connect:0.10
USER root
RUN mkdir /wallet
USER kafka
COPY drivers/* /kafka/libs/
COPY kafka-jdbc/connector/confluentinc-kafka-connect-jdbc-10.0.1/lib/* /kafka/libs/
COPY wallet/* /wallet/
接下来,我们将创建一个用于启动Docker镜像的Shell脚本。请用您实际的值来替换<括弧>中的部分。
#!/usr/bin/env bash
CONNECT_CONFIG_ID=<connectConfigOCID>
CONFIG_STORAGE_TOPIC=$CONNECT_CONFIG_ID-config
OFFSET_STORAGE_TOPIC=$CONNECT_CONFIG_ID-offset
STATUS_STORAGE_TOPIC=$CONNECT_CONFIG_ID-status
docker build -t connect .
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=connect-demo-group \
-e BOOTSTRAP_SERVERS=<streamPoolBootstrapServer> \
-e CONFIG_STORAGE_TOPIC=$CONFIG_STORAGE_TOPIC \
-e OFFSET_STORAGE_TOPIC=$OFFSET_STORAGE_TOPIC \
-e STATUS_STORAGE_TOPIC=$STATUS_STORAGE_TOPIC \
-v `pwd -P`/connect-distributed.properties:/kafka/config.orig/connect-distributed.properties \
connect
完成后,执行以下操作:
cd /projects/connect-demo
chmod +x connect-demo.sh
./connect-demo.sh
如果没有错误,将如下所示在控制台上显示。
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-12-30 01:17:29,915 INFO || 尝试打开到Oracle的第一个连接 [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-12-30 01:17:32,323 INFO || [Worker clientId=connect-1, groupId=connect-demo-group] 完成启动连接器和任务 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-12-30 01:17:32,427 INFO || 找到分区{protocol=1, table=ADMIN.TEST}的偏移量{{protocol=1, table=ADMIN.TEST}={incrementing=1}, {table=TEST}=null} [io.confluent.connect.jdbc.source.JdbcSourceTask]
2020-12-30 01:17:32,432 INFO || 已启动JDBC源任务 [io.confluent.connect.jdbc.source.JdbcSourceTask]
2020-12-30 01:17:32,433 INFO || WorkerSourceTask{id=oss-atp-connector-0} 源任务已完成初始化和启动 [org.apache.kafka.connect.runtime.WorkerSourceTask]
2020-12-30 01:17:32,440 INFO || 开始使用SQL查询:SELECT * FROM “ADMIN”.”TEST” WHERE “ADMIN”.”TEST”.”ID” > ? ORDER BY “ADMIN”.”TEST”.”ID” ASC [io.confluent.connect.jdbc.source.TableQuerier]
这样就可以启动Kafka connect的实例,并能够通过REST API创建连接器。
为了通过REST API创建连接器,需要创建一个json格式的配置文件。
请创建以下文件。
将”value”的值更改为您自己的值。(如果参考了此处,可以是tfokeatpdb)
{
"name": "oss-atp-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "100",
"connection.url": "jdbc:oracle:thin:@<dbname>_high?TNS_ADMIN=/wallet",
"connection.user": "coannectdemo",
"connection.password": "Str0ngP@$$word1234",
"mode": "incrementing",
"incrementing.column.name": "ID",
"topic.prefix": "demo-stream-",
"table.whitelist": "TEST",
"numeric.mapping": "best_fit"
}
}
使用REST API将该配置文件以POST方式发送,创建Source Connector。
curl -iX POST -H “Accept:application/json” -H “Content-Type:application/json” -d @connector-config.json http://localhost:8083/connectors
我們可以通過以下命令來確認是否已經創建了oss-atp-connector連接器:
“`
curl -i http://localhost:8083/connectors
HTTP/1.1 200 OK
Date: Wed, 30 Dec 2020 01:20:45 GMT
Content-Type: application/json
Content-Length: 21
Server: Jetty(9.4.18.v20190429)
“`
[“oss-atp-connector”] 可以转化为:[“阿里云对象存储-应用跟踪器连接器”]
顺便提一下,如果要删除可以使用以下命令进行删除。
使用curl -i -X DELETE http://localhost:8083/connectors/[connector-name]
考试
準備已經完成了。
那麼,我們來進行測試吧。
我将登录 SQLDeveloper 并尝试向 TEST 表插入数据。执行以下 SQL 语句:
插入到 TEST 表 (username, first_name, middle_name, last_name, age, created_on) 的值为 (‘taro’, ‘taro’, null, ‘tanaka’, 10, sysdate);
提交事务。
请提供参考链接
使用Kafka Connect与Oracle Streaming Service和Autonomous DB
上述内容。