数据集成服务:介绍和实验Kafka-Connect
我将介绍自Apache Kafka 0.9版本开始内置的Kafka Connect。
卡夫卡连接
请先查看下面的图表,Kafka Connect是用于在Kafka和周边系统之间进行流数据交换的通信规范、库和工具。
连接器有两种类型,一种用于从周边系统获取数据到Kafka作为源,另一种用于将数据从Kafka发送到周边系统作为接收器。数据流是单向的。已经实现了数十种连接器,并且支持各种不同的周边系统。当然,您也可以自己创建连接器。
卡夫卡中的数据格式基本上是使用Avro。连接器会统一将周边系统特定的数据格式转换为Avro,这样就可以确保适用于任何源头和目标组合的情况下运行。
Avro拥有Schema Registry,可以对模式更改进行支持。
连接器有独立模式和分布模式。在分布模式下,通过在多台服务器上进行分布式处理,确保了可扩展性和容错性。
Kafka Connect的运行时具备了REST API,可以进行多个连接器的启动/停止/状态管理。
Kafka还管理分区偏移量,并且在恢复故障时确保数据没有重复或丢失的机制存在。
引入
Kafka是Apache的一个项目,但开发的主导权在Confluent手中。在这里,我们会使用Confluent Platform来引入Kafka。
环境: 由于Ubuntu14的限制,版本较旧…
我会遵循这里的文档。Scala的版本为2.11。
http://docs.confluent.io/3.0.0/installation.html#installation-apt
包安装
wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11
安装Oracle Java 1.7
如果你使用的是最新版本的Ubuntu,那么这个步骤应该是不必要的。
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java7-installer
sudo apt-get install oracle-java7-set-default
添加confluent用户。
sudo adduser confluent
创建各种目录
mkdir /var/log/kafka
mkdir /var/lib/zookeeper
mkdir /var/lib/kafka
chown -R confluent:confluent /var/log/kafka /var/lib/zookeeper /var/lib/kafka
第三方连接器的jar文件以kafka-connect-XXXX的名称保存在/usr/share/java目录下。当通过Kafka Connect的REST API发出启动指令时,它会被自动识别和加载。
启动Zookeeper、Kafka和Schema-Registry。
su - confluent
/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
/usr/bin/kafka-server-start /etc/kafka/server.properties &
/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &
启动Kafka-Connect
以分布式模式启动。
connect-distributed /etc/schema-registry/connect-avro-distributed.properties
当我尝试调用REST API时,连接器列表为空。这很正常,因为我还没有运行任何内容。
$ curl http://localhost:8083/connectors | jq .
[]
尝试将syslog保存到Elasticsearch中的实验。
使用syslog源连接器和elasticsearch接收连接器,将syslog的内容保存到Elasticsearch中。请额外安装2.x版本的Elasticsearch。
设置syslog
Ubuntu14使用rsyslog来将日志写入到TCP套接字的5514端口。
将`*.* @@localhost:5514`这一行添加到`/etc/rsyslog.d/50-default.conf`文件的开头,并重新启动rsyslog。
service rsyslog restart
syslog源连接器 zhì jiē)
将syslog源以如下方式启动。连接器将在端口5514上接收日志。
curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
"name": "tcpsyslog",
"config": {
"tasks.max":1,
"connector.class":"io.confluent.kafka.connect.syslog.source.TCPSyslogSourceConnector",
"kafka.topic":"syslog-tcp",
"syslog.port":5514,
"syslog.reverse.dns.remote.ip": true
}
}
'
Elasticsearch连接器
我们也会启动Elasticsearch Sink。
curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": 1,
"topics": "syslog-tcp",
"key.ignore": true,
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect"
}
}
'
确认动作
查看连接器列表,可以看到tcpsyslog和elasticsearch-sink连接器。
$ curl http://localhost:8083/connectors | jq .
[
"elasticsearch-sink",
"tcpsyslog"
]
过一段时间,syslog应该会写入一些内容,所以我们去检查一下ES吧。
$ curl http://localhost:9200/_cat/indices
red open syslog-tcp 5 1
某个Kafka主题创建了与syslog-tcp索引相同的索引!让我们去读取一下。
$ curl http://localhost:9200/syslog-tcp/_search?pretty
...
{
"_source": {
"hostname": "localhost",
"remote_address": "localhost/127.0.0.1:51631",
"charset": "UTF-8",
"message": "vagrant-ubuntu-trusty-64 kernel: [16583.344413] Hardware name: innotek GmbH VirtualBox/VirtualBox, BIOS VirtualBox 12/01/2006",
"level": 4,
"host": "vagrant-ubuntu-trusty-64",
"facility": 0,
"date": 1478654589000
},
"_score": 1,
"_id": "syslog-tcp+0+75",
"_type": "kafka-connect",
"_index": "syslog-tcp"
}
...
…噢,内含了一份像是内核日志的东西!成功了。管道很快就建立完成了。
总结:数据集成作为服务
我已经介绍了Kafka-Connect并进行了简单的实验。通过连接器,实现了”兴趣的分离”,使系统实现了松耦合。这是向数据集成服务迈出的重要一步,只要将数据发送到Kafka中,后续就可以按照需求进行处理。
下次我想介绍Kafka-Streams。Kafka-Streams与Kafka-Connect不同,可以实现双向通信。将它们结合起来,可以构建一个非常强大且灵活的流水线。
买一送一
Confluent公司认可的连接器
社区开发的连接器