试试安装Confluent Platform(社区版),并尝试使用Kafka Connect
首先
这个内容大约有80%是一样的。区别在于使用了Community Component版本。
- Confluent PlatformをインストールしてKafka Connectを試してみる
Confluent Platform是Confluent公司提供的以Apache Kafka为核心的平台。除了Apache Kafka之外,还包括Schema Registry、Rest Proxy以及运维工具等。
有商业版和社区版两种版本,各个许可证的组件差异如下所示。
商用版可使用时间为30天,在过期后将无法使用商用功能。然而,据说如果Kafka代理仅为1个节点,则可无限期使用。
我打算安装Community版,并尝试使用Kafka Connect(我真正想做的事情)将Oracle数据传送到Kafka。
获取 Confluent 平台的下载
首先,从以下的“下载页面”链接下载Confluent Platform的模块。
- https://docs.confluent.io/5.3.1/installation/installing_cp/index.html#zip-and-tar-archives
当单击链接后,将跳转到以下页面,选择“TAR存档”进行下载。
这次下载的版本是 v5.3.1,并且文件名是「confluent-community-5.3.1-2.12.tar.gz」。
安装Confluent Platform。
本次安装Confluent Platform,将参考以下页面进行操作。
由于只是想尝试Kafka Connect,因此将以本地模式(?)构建一个用于开发的独立模式运行。
- https://docs.confluent.io/current/quickstart/index.html
在安装之前,请按照以下步骤安装OpenJDK8。
# yum install java-1.8.0-openjdk-devel
# java -version
openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)
我正在设置JAVA_HOME并将Java路径添加到PATH中。
# echo "export JAVA_HOME=$(readlink -e $(which java)|sed 's:/bin/java::')" > /etc/profile.d/java.sh
# echo "PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/profile.d/java.sh
# source /etc/profile
接下来在/opt目录下解压confluent-community-5.3.1-2.12.tar.gz。
# tar xvzf /tmp/confluent-community-5.3.1-2.12.tar.gz -C /opt
安装Confluent CLI
为了在单独的环境中运行,并使用confluent local命令,您需要安装Confluent CLI。
# curl -L https://cnfl.io/cli | sh -s -- -b /opt/confluent-5.3.1/bin
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 162 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
100 10113 100 10113 0 0 6247 0 0:00:01 0:00:01 --:--:-- 9875k
confluentinc/cli info checking S3 for latest tag
confluentinc/cli info found version: latest for latest/linux/amd64
confluentinc/cli info installed /opt/confluent-5.3.1/bin/confluent
confluentinc/cli info please ensure /opt/confluent-5.3.1/bin is in your PATH
将Confluent bin目录添加到PATH中。
将Confluent bin目录的安装位置添加到PATH中。
# echo "export PATH=/opt/confluent-5.3.1/bin:$PATH" >> /etc/profile.d/confluent.sh
# source /etc/profile
启动Confluent Platform.
使用confluent local start命令启动Confluent Platform。
该命令将启动Confluent Platform的所有组件,包括Kafka、Kafka Connect和KSQL等。
需要注意的是,由于不是商业版,因此无法使用Confluent Control Center。
$ confluent local start
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
安装Oracle JDBC驱动器
从Oracle官网下载Oracle JDBC驱动程序(ojdbc8.jar),然后按照以下方式进行存储。
- /opt/confluent-5.3.1/share/java/kafka-connect-jdbc/ojdbc8.jar
接下来重新启动Kafka连接。
# confluent local stop connect
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping connect
connect is [DOWN]
# confluent local start connect
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting connect
connect is [UP]
创建源表
创建一个Oracle数据库中的表来加载数据。
我们将创建一个名为”emtest”的表,如下所示。
“no”列将用于存储连续的数据并成为通过Kafka Connect逐步加载数据的键值。
CREATE TABLE emtest
(
no NUMBER(3,0),
name VARCHAR2(50),
update_at TIMESTAMP(0)
);
创建Kafka Connector。
使用JdbcSourceConnector,将数据从Oracle加载到Kafka进行配置。
JdbcSourceConnector是一个用于通过JDBC从数据库加载数据的连接器,可以在Oracle之外的PostgreSQL等数据库中使用。
首先,创建JdbcSourceConnector的配置文件。
您可以在任何地方创建该文件,但我选择了类似的位置(/opt/confluent-5.3.1/etc/kafka-connect-jdbc)进行创建。
# cd /opt/confluent-5.3.1/etc/kafka-connect-jdbc
# vi source-oracle.properties
設定文件的內容如下所示。
{
"name": "TestOracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@192.168.10.232:1521:testdb",
"connection.user": "hr",
"connection.password": "hr",
"table.whitelist": "EMTEST",
"mode": "incrementing",
"incrementing.column.name": "NO",
"validate.non.null": "false",
"topic.prefix": "test."
}
}
mode属性被设置为”incrementing”,以便加载增量数据。
incrementing.column.name属性用于指定判断增量的键名。
topic.prefix属性用于指定主题名的前缀。在这里,我们指定为”test.”,主题名将会自动创建为”test.EMTEST”,其中EMTEST是根据表名生成的。
如果您想要在Kafka中加载的消息以默认的Avro格式进行转换。如果要将其转换为String,则需要在转换器中指定“org.apache.kafka.connect.storage.StringConverter”。
使用以下命令将创建的配置文件注册到Kafka Connect。
# curl -X POST -H "Content-Type: application/json" --data @source-oracle.json http://localhost:8083/connectors
{"name":"TestOracle","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:oracle:thin:@192.168.10.232:1521:testdb","connection.user":"hr","connection.password":"hr","table.whitelist":"EMTEST","mode":"incrementing","incrementing.column.name":"NO","validate.non.null":"false","topic.prefix":"test.","name":"TestOracle"},"tasks":[],"type":"source"}
为了查看在这个位置发布给卡夫卡的消息,可以使用kafka-avro-console-consumer命令。如果不是Avro格式的消息,则可以使用kafka-console-consumer命令。
$ confluent local consume test.EMTEST -- --value-format avro --from-beginning
在这里将数据插入到EMTEST表中,然后可以从Kafka中获取存储的消息并显示在控制台上。
$ confluent local consume test.EMTEST -- --value-format avro --from-beginning
{"NO":{"bytes":"\u0015"},"NAME":{"string":"CCCC"},"UPDATE_AT":{"long":1569248672000}}
查看日志(参考)
据说可以使用以下命令查看Connect的日志。
confluent local log connect
结束Confluent
要终止Confluent,请使用confluent local stop命令。
$ confluent local stop
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
使用「confluent local destroy」似乎会破坏整个环境(因为我没有使用过,所以不太清楚)。
请提供以下选项的中文同义词:
-
- Confluent Platform Quick Start
-
- Easy Ways to Generate Test Data in Kafka
- Introduction to Kafka Connectors