使用Python、SensorTag、Kafka、Spark Streaming进行流数据处理 – 第四部分:使用Kafka Connect将数据输出到MongoDB

Kafka Connect是一种将数据库、KVS等外部系统与Kafka进行连接和协作的机制。它是用于可扩展的流处理的DataPipeline工具。我们刚好将SensorTag的数据格式更改为Apache Avro。Kafka Connect UI默认使用Avro格式。我们将尝试通过Kafka主题将SensorTag的数据通过Sink(输出)到MongoDB。

卡夫卡连接界面

通常,我们可以使用Kafka Connect的CLI或REST API来进行连接器的配置。而通过Kafka Connect UI,我们可以使用编辑器来编写连接器的配置和KCQL,并可以执行连接器。

连接器

kafka-connect-1.png

肯定创业契机

KCQL(Kafka Connect Query Language)可以像SQL一样,用来描述Kafka Connect中Source和Sink的配置。例如,如果要将Kafka的topic传输到MongoDB中,可以按照文档中的示例进行如下描述。

name=mongo-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector
tasks.max=1
topics=orders-topic
connect.mongo.sink.kcql=INSERT INTO orders SELECT * FROM orders-topic
connect.mongo.database=connect
connect.mongo.connection=mongodb://localhost:27017
connect.mongo.sink.batch.size=10

用法

使用fast-data-dev的docker-compose.yml构建的Kafka集群,我们将添加MongoDB进行数据协同测试。

    SensorTag -> Kafka -> Kafka Connect -> MongoDB

MongoDB是一种非关系型数据库管理系统。

在使用fast-data-dev构建的Kafka集群的docker-compose.yml中添加MongoDB服务。

version: '2'
services:
  kafka-stack:
    image: landoop/fast-data-dev:latest
    environment:
      - FORWARDLOGS=0
      - RUNTESTS=0
      - ADV_HOST=<fast-data-devのIPアドレス>
    ports:
      - 3030:3030
      - 9092:9092
      - 2181:2181
      - 8081:8081
  connect-node-1:
    image: landoop/fast-data-dev-connect-cluster:latest
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-node-2:
    image: landoop/fast-data-dev-connect-cluster:latest
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-node-3:
    image: landoop/fast-data-dev-connect-cluster:latest
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-ui:
    image: landoop/kafka-connect-ui:latest
    depends_on:
      - connect-node-1
    environment:
      - CONNECT_URL=http://connect-node-1:8083
    ports:
      - 8000:8000
  mongo:
    image: mongo
    ports:
      - 27017:27017
    volumes:
      - mongo_data:/data/db
volumes:
  mongo_data:
    driver: local

我将启动MongoDB服务。

$ docker-compose up -d mongo

Kafka Connect 用户界面

打开Kafka Connect UI页面,点击NEW按钮,然后从Sink选项中选择MongoDB。在Create New Connector画面的编辑器中按照以下方式进行描述,并点击Create按钮。

name=MongoSinkConnector
connector.class=com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector
topics=sensortag-avro
tasks.max=1
connect.mongo.database=connect-db
connect.mongo.connection=mongodb://mongo:27017
connect.mongo.sink.kcql=INSERT INTO sensortag SELECT * FROM sensortag-avro

从样本中进行了以下更改。

    connect.mongo.database
<MongoDBのデータベース名>
    connect.mongo.connection
mongodb://<MongoDBのサービス名>:27017
    connect.mongo.sink.kcql
INSERT INTO <MongoDBのコレクション名> SELECT * FROM <Kafkaのトピック名>
kafka-connect-2.png

树莓派3代

我们将使用SSH连接到Raspberry Pi 3。我们将使用之前创建的Avro格式的Python脚本将数据发送到Kafka。

from bluepy.sensortag import SensorTag
import sys
import time
import calendar
import requests
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

def main():
    argvs = sys.argv
    argc = len(argvs)
    if (argc != 2):
        print 'Usage: # python {0} bd_address'.format(argvs[0])
        quit()
    bid = argvs[1]
    print('Connecting to ' + bid)

    timeout = 10.0

    tag = SensorTag(bid)
    tag.IRtemperature.enable()
    tag.humidity.enable()

    time.sleep(1.0)

    get_schema_req_data = requests.get(
        "http://<fast-data-devのIPアドレス>:8081/subjects/sensortag-avro-value/versions/latest")
    get_schema_req_data.raise_for_status()

    schema_string = get_schema_req_data.json()['schema']
    value_schema = avro.loads(schema_string)

    avroProducer = AvroProducer({
        'api.version.request':True,
        'bootstrap.servers': '<fast-data-devのIPアドレス>:9092',
        'schema.registry.url': '<fast-data-devのIPアドレス>:8081'
    }, default_value_schema=value_schema)

    while True:
        tAmb, tObj = tag.IRtemperature.read()
        humidity, rh = tag.humidity.read()

        value = {
            "bid" : bid,
            "time" : calendar.timegm(time.gmtime()),
            "ambient": tAmb,
            "objecttemp": tObj,
            "humidity": humidity,
            "rh": rh
        }

        avroProducer.produce(topic='sensortag-avro', value=value)
        avroProducer.flush()
        print(value)
        time.sleep(timeout)

    tag.disconnect()
    del tag

if __name__ == '__main__':
    main()

将SensorTag的BD地址传递给Python脚本并执行。每10秒发送Avro格式的数据到Kafka的sensortag-avro主题。

$ python avro_producer_sensortag.py <SensorTagのBDアドレス>
Connecting to B0:B4:48:BE:5E:00
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501541405, 'humidity': 26.9708251953125, 'objecttemp': 21.8125, 'ambient': 26.78125, 'rh': 73.62060546875}
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501541416, 'humidity': 26.990966796875, 'objecttemp': 22.625, 'ambient': 26.8125, 'rh': 73.52294921875}

MongoDB => 莽谷数据库

我进入MongoDB容器,使用Kafka Connect UI连接到已设置的数据库。通过KCQL中的INSERT INTO指定的集合(sensortag),通过Kafka输出了SensorTag的数据。

$ docker-compose exec mongo mongo connect-db
> show collections;
sensortag
> db.sensortag.find()
{ "_id" : ObjectId("597fb4f4a7b11b00636cfc13"), "bid" : "B0:B4:48:BE:5E:00", "time" : NumberLong(1501541619), "ambient" : 26.96875, "objecttemp" : 22.9375, "humidity" : 27.152099609375, "rh" : 73.52294921875 }
{ "_id" : ObjectId("597fb4ffa7b11b00636cfc14"), "bid" : "B0:B4:48:BE:5E:00", "time" : NumberLong(1501541630), "ambient" : 26.96875, "objecttemp" : 22.9375, "humidity" : 27.1722412109375, "rh" : 73.431396484375 }
{ "_id" : ObjectId("597fb50ba7b11b00636cfc15"), "bid" : "B0:B4:48:BE:5E:00", "time" : NumberLong(1501541642), "ambient" : 27, "objecttemp" : 23.15625, "humidity" : 27.18231201171875, "rh" : 73.431396484375 }
广告
将在 10 秒后关闭
bannerAds