使用Python、SensorTag、Kafka、Spark Streaming进行流数据处理 – 第四部分:使用Kafka Connect将数据输出到MongoDB
Kafka Connect是一种将数据库、KVS等外部系统与Kafka进行连接和协作的机制。它是用于可扩展的流处理的DataPipeline工具。我们刚好将SensorTag的数据格式更改为Apache Avro。Kafka Connect UI默认使用Avro格式。我们将尝试通过Kafka主题将SensorTag的数据通过Sink(输出)到MongoDB。
卡夫卡连接界面
通常,我们可以使用Kafka Connect的CLI或REST API来进行连接器的配置。而通过Kafka Connect UI,我们可以使用编辑器来编写连接器的配置和KCQL,并可以执行连接器。
连接器
肯定创业契机
KCQL(Kafka Connect Query Language)可以像SQL一样,用来描述Kafka Connect中Source和Sink的配置。例如,如果要将Kafka的topic传输到MongoDB中,可以按照文档中的示例进行如下描述。
name=mongo-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector
tasks.max=1
topics=orders-topic
connect.mongo.sink.kcql=INSERT INTO orders SELECT * FROM orders-topic
connect.mongo.database=connect
connect.mongo.connection=mongodb://localhost:27017
connect.mongo.sink.batch.size=10
用法
使用fast-data-dev的docker-compose.yml构建的Kafka集群,我们将添加MongoDB进行数据协同测试。
- SensorTag -> Kafka -> Kafka Connect -> MongoDB
MongoDB是一种非关系型数据库管理系统。
在使用fast-data-dev构建的Kafka集群的docker-compose.yml中添加MongoDB服务。
version: '2'
services:
kafka-stack:
image: landoop/fast-data-dev:latest
environment:
- FORWARDLOGS=0
- RUNTESTS=0
- ADV_HOST=<fast-data-devのIPアドレス>
ports:
- 3030:3030
- 9092:9092
- 2181:2181
- 8081:8081
connect-node-1:
image: landoop/fast-data-dev-connect-cluster:latest
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-node-2:
image: landoop/fast-data-dev-connect-cluster:latest
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-node-3:
image: landoop/fast-data-dev-connect-cluster:latest
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-ui:
image: landoop/kafka-connect-ui:latest
depends_on:
- connect-node-1
environment:
- CONNECT_URL=http://connect-node-1:8083
ports:
- 8000:8000
mongo:
image: mongo
ports:
- 27017:27017
volumes:
- mongo_data:/data/db
volumes:
mongo_data:
driver: local
我将启动MongoDB服务。
$ docker-compose up -d mongo
Kafka Connect 用户界面
打开Kafka Connect UI页面,点击NEW按钮,然后从Sink选项中选择MongoDB。在Create New Connector画面的编辑器中按照以下方式进行描述,并点击Create按钮。
name=MongoSinkConnector
connector.class=com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector
topics=sensortag-avro
tasks.max=1
connect.mongo.database=connect-db
connect.mongo.connection=mongodb://mongo:27017
connect.mongo.sink.kcql=INSERT INTO sensortag SELECT * FROM sensortag-avro
从样本中进行了以下更改。
- connect.mongo.database
<MongoDBのデータベース名>
- connect.mongo.connection
mongodb://<MongoDBのサービス名>:27017
- connect.mongo.sink.kcql
INSERT INTO <MongoDBのコレクション名> SELECT * FROM <Kafkaのトピック名>
树莓派3代
我们将使用SSH连接到Raspberry Pi 3。我们将使用之前创建的Avro格式的Python脚本将数据发送到Kafka。
from bluepy.sensortag import SensorTag
import sys
import time
import calendar
import requests
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def main():
argvs = sys.argv
argc = len(argvs)
if (argc != 2):
print 'Usage: # python {0} bd_address'.format(argvs[0])
quit()
bid = argvs[1]
print('Connecting to ' + bid)
timeout = 10.0
tag = SensorTag(bid)
tag.IRtemperature.enable()
tag.humidity.enable()
time.sleep(1.0)
get_schema_req_data = requests.get(
"http://<fast-data-devのIPアドレス>:8081/subjects/sensortag-avro-value/versions/latest")
get_schema_req_data.raise_for_status()
schema_string = get_schema_req_data.json()['schema']
value_schema = avro.loads(schema_string)
avroProducer = AvroProducer({
'api.version.request':True,
'bootstrap.servers': '<fast-data-devのIPアドレス>:9092',
'schema.registry.url': '<fast-data-devのIPアドレス>:8081'
}, default_value_schema=value_schema)
while True:
tAmb, tObj = tag.IRtemperature.read()
humidity, rh = tag.humidity.read()
value = {
"bid" : bid,
"time" : calendar.timegm(time.gmtime()),
"ambient": tAmb,
"objecttemp": tObj,
"humidity": humidity,
"rh": rh
}
avroProducer.produce(topic='sensortag-avro', value=value)
avroProducer.flush()
print(value)
time.sleep(timeout)
tag.disconnect()
del tag
if __name__ == '__main__':
main()
将SensorTag的BD地址传递给Python脚本并执行。每10秒发送Avro格式的数据到Kafka的sensortag-avro主题。
$ python avro_producer_sensortag.py <SensorTagのBDアドレス>
Connecting to B0:B4:48:BE:5E:00
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501541405, 'humidity': 26.9708251953125, 'objecttemp': 21.8125, 'ambient': 26.78125, 'rh': 73.62060546875}
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501541416, 'humidity': 26.990966796875, 'objecttemp': 22.625, 'ambient': 26.8125, 'rh': 73.52294921875}
MongoDB => 莽谷数据库
我进入MongoDB容器,使用Kafka Connect UI连接到已设置的数据库。通过KCQL中的INSERT INTO指定的集合(sensortag),通过Kafka输出了SensorTag的数据。
$ docker-compose exec mongo mongo connect-db
> show collections;
sensortag
> db.sensortag.find()
{ "_id" : ObjectId("597fb4f4a7b11b00636cfc13"), "bid" : "B0:B4:48:BE:5E:00", "time" : NumberLong(1501541619), "ambient" : 26.96875, "objecttemp" : 22.9375, "humidity" : 27.152099609375, "rh" : 73.52294921875 }
{ "_id" : ObjectId("597fb4ffa7b11b00636cfc14"), "bid" : "B0:B4:48:BE:5E:00", "time" : NumberLong(1501541630), "ambient" : 26.96875, "objecttemp" : 22.9375, "humidity" : 27.1722412109375, "rh" : 73.431396484375 }
{ "_id" : ObjectId("597fb50ba7b11b00636cfc15"), "bid" : "B0:B4:48:BE:5E:00", "time" : NumberLong(1501541642), "ambient" : 27, "objecttemp" : 23.15625, "humidity" : 27.18231201171875, "rh" : 73.431396484375 }