Python、SensorTag、Kafka、Spark Streaming的流处理 – 第3部分:Apache Avro和Schema Registry

Landoop提供的fast-data-dev的Docker镜像包含了Confluent开源的Schema Registry和Schema Registry UI Web工具。上一次我们将SensorTag的数据以JSON格式发送到Kafka,现在我们也试试Apache Avro格式。Apache Avro提供了通过序列化和与语言无关的模式来进行数据交换的机制。Schema Registry是用于集中管理Avro模式的存储,可以通过REST API进行操作。

模式注册表

可以利用本地的Avro模式文件来序列化数据,但通过Schema Registry进行集中管理可以使反序列化Avro消息的一方可以参考共享的数据格式。

模式注册表界面

从fast-data-dev的主页面点击SCHEMAS,将打开Schema Registry UI页面。点击左上角的NEW按钮,将启动一个描述Avro模式的编辑器。

schema-registry.png

你可以使用Schema Registry UI的编辑器编写Avro模式。在保存之前,你可以验证编写的JSON是否符合正确的格式。

表單的Subject Name,如果是value schema,據說應該寫成主題名-value。從SensorTag以Avro格式傳送的主題名稱是sensortag-avro,所以在這種情況下,應該是sensortag-avro-value。在Schema的字段中,用JSON格式記述SensorTag用的Avro Schema。

{
  "type": "record",
  "name": "SensorAvroValue",
  "fields": [
    {
      "name": "bid",
      "type": "string"
    },
    {
      "name": "time",
      "type": "long"
    },
    {
      "name": "ambient",
      "type": "double"
    },
    {
      "name": "objecttemp",
      "type": "double"
    },
    {
      "name": "humidity",
      "type": "double"
    },
    {
      "name": "rh",
      "type": "double"
    }
  ]
}

树莓派3代

这一次,我们将使用支持Avro格式的confluent-kafka-python作为Raspberry Pi 3的Kafka Python客户端,而上一次使用的是kafka-python。

安装librdkafka。

为了安装confluent-kafka-python,需要librdkafka的头文件。请先构建librdkafka并更新共享库信息。

$ sudo apt-get update && sudo apt-get install git build-essential -y
$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdkafka
$ ./configure 
$ make && sudo make install
$ sudo ldconfig

安装 confluent-kafka

如果使用 Avro 格式,需要 Python 的头文件。在使用 Avro 格式时,pip 包的名称是 confluent-kafka[avro]。如果不需要 Avro,只需使用 confluent-kafka。

$ sudo apt-get update && sudo apt-get install python-dev -y
$ sudo pip install confluent-kafka[avro]

阿弗罗生产者

根据官方的confluent-kafka-python页面上的代码参考,编写Avro Producer。官方示例中使用本地的模式文件。由于似乎没有实现从Schema Registry获取模式的功能,所以我们需要通过Schema Registry的REST API直接获取模式作为字符串,并多花一些时间来实施。

from bluepy.sensortag import SensorTag
import sys
import time
import calendar
import requests
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

def main():
    argvs = sys.argv
    argc = len(argvs)
    if (argc != 2):
        print 'Usage: # python {0} bd_address'.format(argvs[0])
        quit()
    bid = argvs[1]
    print('Connecting to ' + bid)

    timeout = 10.0

    tag = SensorTag(bid)
    tag.IRtemperature.enable()
    tag.humidity.enable()

    time.sleep(1.0)

    get_schema_req_data = requests.get(
        "http://<fast-data-devのIPアドレス>:8081/subjects/sensortag-avro-value/versions/latest")
    get_schema_req_data.raise_for_status()

    schema_string = get_schema_req_data.json()['schema']
    value_schema = avro.loads(schema_string)

    avroProducer = AvroProducer({
        'api.version.request':True,
        'bootstrap.servers': '<fast-data-devのIPアドレス>:9092',
        'schema.registry.url': '<fast-data-devのIPアドレス>:8081'
    }, default_value_schema=value_schema)

    while True:
        tAmb, tObj = tag.IRtemperature.read()
        humidity, rh = tag.humidity.read()

        value = {
            "bid" : bid,
            "time" : calendar.timegm(time.gmtime()),
            "ambient": tAmb,
            "objecttemp": tObj,
            "humidity": humidity,
            "rh": rh
        }

        avroProducer.produce(topic='sensortag-avro', value=value)
        avroProducer.flush()
        print(value)
        time.sleep(timeout)

    tag.disconnect()
    del tag

if __name__ == '__main__':
    main()

使用 hcitool 来确认 SensorTag 的 BD 地址。

$ sudo hcitool lescan
LE Scan ...
...
B0:B4:48:BE:5E:00 CC2650 SensorTag
...

使用BD地址作为参数来执行Python脚本。

$ python avro_producer_sensortag.py <SensorTagのBDアドレス>

我們將開始將以下類似的日誌輸出發送到Kafka代理器。

{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495463, 'humidity': 27.04132080078125, 'objecttemp': 22.5, 'ambient': 26.84375, 'rh': 69.05517578125}
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495475, 'humidity': 27.02117919921875, 'objecttemp': 22.75, 'ambient': 26.84375, 'rh': 69.05517578125}
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495486, 'humidity': 27.04132080078125, 'objecttemp': 22.96875, 'ambient': 26.84375, 'rh': 69.05517578125}

请用多种方式解释这个概念。

Avro 消费者

Avro Consumer的代码直接使用了confluent-kafka-python库中的示例。

import requests
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

c = AvroConsumer({
    'api.version.request':True,
    'bootstrap.servers': '<fast-data-devのIPアドレス>:9092',
    'group.id': 'raspiavro',
    'schema.registry.url': 'http://<fast-data-devのIPアドレス>:8081'})
c.subscribe(['sensortag-avro'])

running = True
while running:
    try:
        msg = c.poll(10)
        print(msg)
        if msg:
            if not msg.error():
                print(msg.value())
            elif msg.error().code() != KafkaError._PARTITION_EOF:
                print(msg.error())
                running = False
    except SerializerError as e:
        print("Message deserialization failed for %s: %s" % (msg, e))
        running = False

c.close()

执行编写好的Python脚本。

$ python avro_consumer_sensortag.py

在示例中,我们使用10秒间隔进行轮询。如果时间不匹配,将无法获取数据,因此会返回None。

<cimpl.Message object at 0x7655de88>
<cimpl.Message object at 0x764ee6f0>
{u'bid': u'B0:B4:48:BE:5E:00', u'time': 1501495204L, u'humidity': 27.27294921875, u'objecttemp': 22.78125, u'ambient': 27.09375, u'rh': 69.671630859375}
<cimpl.Message object at 0x7655de88>
None
<cimpl.Message object at 0x7655de88>
{u'bid': u'B0:B4:48:BE:5E:00', u'time': 1501495215L, u'humidity': 27.26287841796875, u'objecttemp': 22.9375, u'ambient': 27.09375, u'rh': 69.671630859375}
<cimpl.Message object at 0x747caa98>

kafka-avro-console-consumer 的中文翻译:卡夫卡Avro控制台消费者。

最后,我们也可以通过服务器端的kafka-avro-console-consumer命令来获取消息。

$ docker-compose exec kafka-stack \
  kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic sensortag-avro

这里也可以获取SensorTag的数据。

{"bid":"B0:B4:48:BE:5E:00","time":1501495384,"ambient":26.9375,"objecttemp":22.96875,"humidity":27.11181640625,"rh":69.05517578125}
{"bid":"B0:B4:48:BE:5E:00","time":1501495396,"ambient":26.90625,"objecttemp":22.6875,"humidity":27.0916748046875,"rh":69.05517578125}
广告
将在 10 秒后关闭
bannerAds