数据集成服务:介绍和实验Kafka-Connect

我将介绍自Apache Kafka 0.9版本开始内置的Kafka Connect。

卡夫卡连接

请先查看下面的图表,Kafka Connect是用于在Kafka和周边系统之间进行流数据交换的通信规范、库和工具。

kafka-connect.png

连接器有两种类型,一种用于从周边系统获取数据到Kafka作为源,另一种用于将数据从Kafka发送到周边系统作为接收器。数据流是单向的。已经实现了数十种连接器,并且支持各种不同的周边系统。当然,您也可以自己创建连接器。

卡夫卡中的数据格式基本上是使用Avro。连接器会统一将周边系统特定的数据格式转换为Avro,这样就可以确保适用于任何源头和目标组合的情况下运行。

Avro拥有Schema Registry,可以对模式更改进行支持。

连接器有独立模式和分布模式。在分布模式下,通过在多台服务器上进行分布式处理,确保了可扩展性和容错性。

Kafka Connect的运行时具备了REST API,可以进行多个连接器的启动/停止/状态管理。

Kafka还管理分区偏移量,并且在恢复故障时确保数据没有重复或丢失的机制存在。

引入

Kafka是Apache的一个项目,但开发的主导权在Confluent手中。在这里,我们会使用Confluent Platform来引入Kafka。

环境: 由于Ubuntu14的限制,版本较旧…

我会遵循这里的文档。Scala的版本为2.11。
http://docs.confluent.io/3.0.0/installation.html#installation-apt

包安装

wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11

安装Oracle Java 1.7

如果你使用的是最新版本的Ubuntu,那么这个步骤应该是不必要的。

sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java7-installer
sudo apt-get install oracle-java7-set-default

添加confluent用户。

sudo adduser confluent

创建各种目录

mkdir /var/log/kafka
mkdir /var/lib/zookeeper
mkdir /var/lib/kafka
chown -R confluent:confluent /var/log/kafka /var/lib/zookeeper /var/lib/kafka

第三方连接器的jar文件以kafka-connect-XXXX的名称保存在/usr/share/java目录下。当通过Kafka Connect的REST API发出启动指令时,它会被自动识别和加载。

启动Zookeeper、Kafka和Schema-Registry。

su - confluent
/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
/usr/bin/kafka-server-start /etc/kafka/server.properties &
/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &

启动Kafka-Connect

以分布式模式启动。

connect-distributed /etc/schema-registry/connect-avro-distributed.properties

当我尝试调用REST API时,连接器列表为空。这很正常,因为我还没有运行任何内容。

$ curl http://localhost:8083/connectors | jq .
[]

尝试将syslog保存到Elasticsearch中的实验。

使用syslog源连接器和elasticsearch接收连接器,将syslog的内容保存到Elasticsearch中。请额外安装2.x版本的Elasticsearch。

设置syslog

Ubuntu14使用rsyslog来将日志写入到TCP套接字的5514端口。

将`*.* @@localhost:5514`这一行添加到`/etc/rsyslog.d/50-default.conf`文件的开头,并重新启动rsyslog。

service rsyslog restart

syslog源连接器 zhì jiē)

将syslog源以如下方式启动。连接器将在端口5514上接收日志。

curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
    "name": "tcpsyslog",
    "config": {
        "tasks.max":1,
        "connector.class":"io.confluent.kafka.connect.syslog.source.TCPSyslogSourceConnector",
        "kafka.topic":"syslog-tcp",
        "syslog.port":5514,
        "syslog.reverse.dns.remote.ip": true
    }
}
'

Elasticsearch连接器

我们也会启动Elasticsearch Sink。

curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": 1,
        "topics": "syslog-tcp",
        "key.ignore": true,
        "connection.url": "http://localhost:9200",
        "type.name": "kafka-connect"
    }
}
'

确认动作

查看连接器列表,可以看到tcpsyslog和elasticsearch-sink连接器。

$ curl http://localhost:8083/connectors | jq .
[
  "elasticsearch-sink",
  "tcpsyslog"
]

过一段时间,syslog应该会写入一些内容,所以我们去检查一下ES吧。

$ curl http://localhost:9200/_cat/indices
red open syslog-tcp    5 1

某个Kafka主题创建了与syslog-tcp索引相同的索引!让我们去读取一下。

$ curl http://localhost:9200/syslog-tcp/_search?pretty
  ...
  {
    "_source": {
      "hostname": "localhost",
      "remote_address": "localhost/127.0.0.1:51631",
      "charset": "UTF-8",
      "message": "vagrant-ubuntu-trusty-64 kernel: [16583.344413] Hardware name: innotek GmbH VirtualBox/VirtualBox, BIOS VirtualBox 12/01/2006",
      "level": 4,
      "host": "vagrant-ubuntu-trusty-64",
      "facility": 0,
      "date": 1478654589000
    },
    "_score": 1,
    "_id": "syslog-tcp+0+75",
    "_type": "kafka-connect",
    "_index": "syslog-tcp"
  }
  ...

…噢,内含了一份像是内核日志的东西!成功了。管道很快就建立完成了。

总结:数据集成作为服务

我已经介绍了Kafka-Connect并进行了简单的实验。通过连接器,实现了”兴趣的分离”,使系统实现了松耦合。这是向数据集成服务迈出的重要一步,只要将数据发送到Kafka中,后续就可以按照需求进行处理。

下次我想介绍Kafka-Streams。Kafka-Streams与Kafka-Connect不同,可以实现双向通信。将它们结合起来,可以构建一个非常强大且灵活的流水线。

买一送一

Confluent公司认可的连接器

コネクタソース・シンクタグ開発元ダウンロードHDFSSinkHDFS, Hadoop, HiveConfluentConfluentJDBCSourceJDBC, MySQLConfluentConfluentJDBCSinkJDBC, MySQLConfluentConfluentAttunitySourceCDCAttunityAttunityCouchbaseSourceCouchbase, NoSQLCouchbaseCouchbaseDataStaxSinkCassandra, DataStaxData MountaineerData MountaineerElasticsearchSinksearch, Elastic, log, analyticsConfluentConfluetGoldenGateSourceCDC, OracleOracleCommunityJustOneSinkPostgresJustOneJustOneStriimSourceCDC, MS SQLServer, Oracle, MySQLStriimStriimSyncsort DMXSourceDB2, IMS, VSAM, CICSSyncsortSyncsortSyncsort DMXSinkDB2, IMS, VSAM, CICSSyncsortSyncsortVerticaSourceVerticaHP EnterpriseHP EnterpriseVerticaSinkVerticaHP EnterpriseHP EnterpriseVoltDBSinkVoltDB, NewSQLVoltDBVoltDB

社区开发的连接器

コネクタソース・シンクタグ開発元ダウンロードApache IgniteSourceFile SystemCommunityCommunityApache IgniteSinkFile SystemCommunityCommunityBloomberg TickerSourceApplication feedCommunityCommunityCassandraSourceCassandraCommunityCommunity 1CassandraSinkCassandraCommunityCommunityDynamoDB?Dynamo, NoSQLCommunityCommunityElasticsearchSinkElastic, search, log, analyticsCommunityCommunity 1 Community 2 Community 3FTPSourceFile SystemCommunityCommunityGoogle PubSubSourceMessagingCommunityCommunityGoogle PubSubSinkMessagingCommunityCommunityHazelcastSinkDatastore, In-memoryCommunityCommunityHbaseSinkHbase, NoSQLCommunityCommunity 1 Community 2InfluxDBSinkDatastore, Time-seriesCommunityCommunityInternet Relay ChatSourceApplication feedCommunityCommunityJenkinsSourceApplication feedCommunityCommunityJMSSinkMessagingCommunityCommunityKuduSinkKuduCommunityCommunityMixpanelSourceanalyticsCommunityCommunityMongoDBSourceMongo, MongoDB, NoSQLCommunityCommunityMongoDB CDC – DebeziumSourceMongoDB, CDCCommunityCommunityMQTTSourceMQTT, messagingCommunityCommunityMySQL CDC – DebeziumSourceMySQL, CDC, OracleCommunityCommunityRedisSinkDatastore, MessagingCommunityCommunityRethinkDBSourceNoSQL, StreamingCommunityCommunityRethinkDBSinkNoSQL, StreamingCommunityCommunitySalesforceSourceDatastoreCommunityCommunitySolrSinksearch, solrCommunityCommunity 1 Community 2SolrSourcesearch, solrCommunityCommunitySplunkSinkDatastore, logCommunityCommunitySplunkSourceDatastore, logCommunityCommunityspooldirSourceFile SystemCommunityCommunitySQSSourceAWS, SQS, MessagingCommunityCommunitySyslogSourcelogCommunityCommunityS3SinkAWS, S3, logCommunityCommunity 1 Community 2TwitterSourceApplication feedCommunityCommunity 1 Community 2TwitterSinkApplication feedCommunityCommunity
广告
将在 10 秒后关闭
bannerAds