试试安装Confluent Platform(社区版),并尝试使用Kafka Connect

首先

这个内容大约有80%是一样的。区别在于使用了Community Component版本。

    Confluent PlatformをインストールしてKafka Connectを試してみる

Confluent Platform是Confluent公司提供的以Apache Kafka为核心的平台。除了Apache Kafka之外,还包括Schema Registry、Rest Proxy以及运维工具等。

有商业版和社区版两种版本,各个许可证的组件差异如下所示。

20190329100752.png

商用版可使用时间为30天,在过期后将无法使用商用功能。然而,据说如果Kafka代理仅为1个节点,则可无限期使用。

我打算安装Community版,并尝试使用Kafka Connect(我真正想做的事情)将Oracle数据传送到Kafka。

获取 Confluent 平台的下载

首先,从以下的“下载页面”链接下载Confluent Platform的模块。

    https://docs.confluent.io/5.3.1/installation/installing_cp/index.html#zip-and-tar-archives

当单击链接后,将跳转到以下页面,选择“TAR存档”进行下载。

image.png

这次下载的版本是 v5.3.1,并且文件名是「confluent-community-5.3.1-2.12.tar.gz」。

安装Confluent Platform。

本次安装Confluent Platform,将参考以下页面进行操作。
由于只是想尝试Kafka Connect,因此将以本地模式(?)构建一个用于开发的独立模式运行。

    https://docs.confluent.io/current/quickstart/index.html

在安装之前,请按照以下步骤安装OpenJDK8。

# yum install java-1.8.0-openjdk-devel

# java -version
openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)

我正在设置JAVA_HOME并将Java路径添加到PATH中。

# echo "export JAVA_HOME=$(readlink -e $(which java)|sed 's:/bin/java::')" > /etc/profile.d/java.sh
# echo "PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/profile.d/java.sh
# source /etc/profile

接下来在/opt目录下解压confluent-community-5.3.1-2.12.tar.gz。

# tar xvzf /tmp/confluent-community-5.3.1-2.12.tar.gz -C /opt

安装Confluent CLI

为了在单独的环境中运行,并使用confluent local命令,您需要安装Confluent CLI。

# curl -L https://cnfl.io/cli | sh -s -- -b /opt/confluent-5.3.1/bin
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0   162    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 10113  100 10113    0     0   6247      0  0:00:01  0:00:01 --:--:-- 9875k
confluentinc/cli info checking S3 for latest tag
confluentinc/cli info found version: latest for latest/linux/amd64
confluentinc/cli info installed /opt/confluent-5.3.1/bin/confluent
confluentinc/cli info please ensure /opt/confluent-5.3.1/bin is in your PATH

将Confluent bin目录添加到PATH中。

将Confluent bin目录的安装位置添加到PATH中。

# echo "export PATH=/opt/confluent-5.3.1/bin:$PATH" >> /etc/profile.d/confluent.sh
# source /etc/profile

启动Confluent Platform.

使用confluent local start命令启动Confluent Platform。
该命令将启动Confluent Platform的所有组件,包括Kafka、Kafka Connect和KSQL等。
需要注意的是,由于不是商业版,因此无法使用Confluent Control Center。

$ confluent local start
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]

安装Oracle JDBC驱动器

从Oracle官网下载Oracle JDBC驱动程序(ojdbc8.jar),然后按照以下方式进行存储。

    /opt/confluent-5.3.1/share/java/kafka-connect-jdbc/ojdbc8.jar

接下来重新启动Kafka连接。

# confluent local stop connect
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping connect
connect is [DOWN]
# confluent local start connect
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting connect
connect is [UP]

创建源表

创建一个Oracle数据库中的表来加载数据。
我们将创建一个名为”emtest”的表,如下所示。
“no”列将用于存储连续的数据并成为通过Kafka Connect逐步加载数据的键值。

CREATE TABLE emtest
 (
 no NUMBER(3,0),
 name VARCHAR2(50),
 update_at TIMESTAMP(0)
 );

创建Kafka Connector。

使用JdbcSourceConnector,将数据从Oracle加载到Kafka进行配置。
JdbcSourceConnector是一个用于通过JDBC从数据库加载数据的连接器,可以在Oracle之外的PostgreSQL等数据库中使用。

首先,创建JdbcSourceConnector的配置文件。
您可以在任何地方创建该文件,但我选择了类似的位置(/opt/confluent-5.3.1/etc/kafka-connect-jdbc)进行创建。

# cd /opt/confluent-5.3.1/etc/kafka-connect-jdbc
# vi source-oracle.properties

設定文件的內容如下所示。

{
  "name": "TestOracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:oracle:thin:@192.168.10.232:1521:testdb",
    "connection.user": "hr",
    "connection.password": "hr",
    "table.whitelist": "EMTEST",
    "mode": "incrementing",
    "incrementing.column.name": "NO",
    "validate.non.null": "false",
    "topic.prefix": "test."
  }
}

mode属性被设置为”incrementing”,以便加载增量数据。
incrementing.column.name属性用于指定判断增量的键名。
topic.prefix属性用于指定主题名的前缀。在这里,我们指定为”test.”,主题名将会自动创建为”test.EMTEST”,其中EMTEST是根据表名生成的。

如果您想要在Kafka中加载的消息以默认的Avro格式进行转换。如果要将其转换为String,则需要在转换器中指定“org.apache.kafka.connect.storage.StringConverter”。

使用以下命令将创建的配置文件注册到Kafka Connect。

# curl -X POST -H "Content-Type: application/json" --data @source-oracle.json http://localhost:8083/connectors
{"name":"TestOracle","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:oracle:thin:@192.168.10.232:1521:testdb","connection.user":"hr","connection.password":"hr","table.whitelist":"EMTEST","mode":"incrementing","incrementing.column.name":"NO","validate.non.null":"false","topic.prefix":"test.","name":"TestOracle"},"tasks":[],"type":"source"}

为了查看在这个位置发布给卡夫卡的消息,可以使用kafka-avro-console-consumer命令。如果不是Avro格式的消息,则可以使用kafka-console-consumer命令。

$ confluent local consume test.EMTEST -- --value-format avro --from-beginning

在这里将数据插入到EMTEST表中,然后可以从Kafka中获取存储的消息并显示在控制台上。

$ confluent local consume test.EMTEST -- --value-format avro --from-beginning

{"NO":{"bytes":"\u0015"},"NAME":{"string":"CCCC"},"UPDATE_AT":{"long":1569248672000}}

查看日志(参考)

据说可以使用以下命令查看Connect的日志。

confluent local log connect

结束Confluent

要终止Confluent,请使用confluent local stop命令。

$ confluent local stop
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

使用「confluent local destroy」似乎会破坏整个环境(因为我没有使用过,所以不太清楚)。

请提供以下选项的中文同义词:

    • Confluent Platform Quick Start

 

    • Easy Ways to Generate Test Data in Kafka

 

    Introduction to Kafka Connectors