Python、SensorTag、Kafka、Spark Streaming的流处理 – 第3部分:Apache Avro和Schema Registry
Landoop提供的fast-data-dev的Docker镜像包含了Confluent开源的Schema Registry和Schema Registry UI Web工具。上一次我们将SensorTag的数据以JSON格式发送到Kafka,现在我们也试试Apache Avro格式。Apache Avro提供了通过序列化和与语言无关的模式来进行数据交换的机制。Schema Registry是用于集中管理Avro模式的存储,可以通过REST API进行操作。
模式注册表
可以利用本地的Avro模式文件来序列化数据,但通过Schema Registry进行集中管理可以使反序列化Avro消息的一方可以参考共享的数据格式。
模式注册表界面
从fast-data-dev的主页面点击SCHEMAS,将打开Schema Registry UI页面。点击左上角的NEW按钮,将启动一个描述Avro模式的编辑器。
你可以使用Schema Registry UI的编辑器编写Avro模式。在保存之前,你可以验证编写的JSON是否符合正确的格式。
表單的Subject Name,如果是value schema,據說應該寫成主題名-value。從SensorTag以Avro格式傳送的主題名稱是sensortag-avro,所以在這種情況下,應該是sensortag-avro-value。在Schema的字段中,用JSON格式記述SensorTag用的Avro Schema。
{
"type": "record",
"name": "SensorAvroValue",
"fields": [
{
"name": "bid",
"type": "string"
},
{
"name": "time",
"type": "long"
},
{
"name": "ambient",
"type": "double"
},
{
"name": "objecttemp",
"type": "double"
},
{
"name": "humidity",
"type": "double"
},
{
"name": "rh",
"type": "double"
}
]
}
树莓派3代
这一次,我们将使用支持Avro格式的confluent-kafka-python作为Raspberry Pi 3的Kafka Python客户端,而上一次使用的是kafka-python。
安装librdkafka。
为了安装confluent-kafka-python,需要librdkafka的头文件。请先构建librdkafka并更新共享库信息。
$ sudo apt-get update && sudo apt-get install git build-essential -y
$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdkafka
$ ./configure
$ make && sudo make install
$ sudo ldconfig
安装 confluent-kafka
如果使用 Avro 格式,需要 Python 的头文件。在使用 Avro 格式时,pip 包的名称是 confluent-kafka[avro]。如果不需要 Avro,只需使用 confluent-kafka。
$ sudo apt-get update && sudo apt-get install python-dev -y
$ sudo pip install confluent-kafka[avro]
阿弗罗生产者
根据官方的confluent-kafka-python页面上的代码参考,编写Avro Producer。官方示例中使用本地的模式文件。由于似乎没有实现从Schema Registry获取模式的功能,所以我们需要通过Schema Registry的REST API直接获取模式作为字符串,并多花一些时间来实施。
from bluepy.sensortag import SensorTag
import sys
import time
import calendar
import requests
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def main():
argvs = sys.argv
argc = len(argvs)
if (argc != 2):
print 'Usage: # python {0} bd_address'.format(argvs[0])
quit()
bid = argvs[1]
print('Connecting to ' + bid)
timeout = 10.0
tag = SensorTag(bid)
tag.IRtemperature.enable()
tag.humidity.enable()
time.sleep(1.0)
get_schema_req_data = requests.get(
"http://<fast-data-devのIPアドレス>:8081/subjects/sensortag-avro-value/versions/latest")
get_schema_req_data.raise_for_status()
schema_string = get_schema_req_data.json()['schema']
value_schema = avro.loads(schema_string)
avroProducer = AvroProducer({
'api.version.request':True,
'bootstrap.servers': '<fast-data-devのIPアドレス>:9092',
'schema.registry.url': '<fast-data-devのIPアドレス>:8081'
}, default_value_schema=value_schema)
while True:
tAmb, tObj = tag.IRtemperature.read()
humidity, rh = tag.humidity.read()
value = {
"bid" : bid,
"time" : calendar.timegm(time.gmtime()),
"ambient": tAmb,
"objecttemp": tObj,
"humidity": humidity,
"rh": rh
}
avroProducer.produce(topic='sensortag-avro', value=value)
avroProducer.flush()
print(value)
time.sleep(timeout)
tag.disconnect()
del tag
if __name__ == '__main__':
main()
使用 hcitool 来确认 SensorTag 的 BD 地址。
$ sudo hcitool lescan
LE Scan ...
...
B0:B4:48:BE:5E:00 CC2650 SensorTag
...
使用BD地址作为参数来执行Python脚本。
$ python avro_producer_sensortag.py <SensorTagのBDアドレス>
我們將開始將以下類似的日誌輸出發送到Kafka代理器。
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495463, 'humidity': 27.04132080078125, 'objecttemp': 22.5, 'ambient': 26.84375, 'rh': 69.05517578125}
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495475, 'humidity': 27.02117919921875, 'objecttemp': 22.75, 'ambient': 26.84375, 'rh': 69.05517578125}
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495486, 'humidity': 27.04132080078125, 'objecttemp': 22.96875, 'ambient': 26.84375, 'rh': 69.05517578125}
请用多种方式解释这个概念。
Avro 消费者
Avro Consumer的代码直接使用了confluent-kafka-python库中的示例。
import requests
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
c = AvroConsumer({
'api.version.request':True,
'bootstrap.servers': '<fast-data-devのIPアドレス>:9092',
'group.id': 'raspiavro',
'schema.registry.url': 'http://<fast-data-devのIPアドレス>:8081'})
c.subscribe(['sensortag-avro'])
running = True
while running:
try:
msg = c.poll(10)
print(msg)
if msg:
if not msg.error():
print(msg.value())
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
c.close()
执行编写好的Python脚本。
$ python avro_consumer_sensortag.py
在示例中,我们使用10秒间隔进行轮询。如果时间不匹配,将无法获取数据,因此会返回None。
<cimpl.Message object at 0x7655de88>
<cimpl.Message object at 0x764ee6f0>
{u'bid': u'B0:B4:48:BE:5E:00', u'time': 1501495204L, u'humidity': 27.27294921875, u'objecttemp': 22.78125, u'ambient': 27.09375, u'rh': 69.671630859375}
<cimpl.Message object at 0x7655de88>
None
<cimpl.Message object at 0x7655de88>
{u'bid': u'B0:B4:48:BE:5E:00', u'time': 1501495215L, u'humidity': 27.26287841796875, u'objecttemp': 22.9375, u'ambient': 27.09375, u'rh': 69.671630859375}
<cimpl.Message object at 0x747caa98>
kafka-avro-console-consumer 的中文翻译:卡夫卡Avro控制台消费者。
最后,我们也可以通过服务器端的kafka-avro-console-consumer命令来获取消息。
$ docker-compose exec kafka-stack \
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic sensortag-avro
这里也可以获取SensorTag的数据。
{"bid":"B0:B4:48:BE:5E:00","time":1501495384,"ambient":26.9375,"objecttemp":22.96875,"humidity":27.11181640625,"rh":69.05517578125}
{"bid":"B0:B4:48:BE:5E:00","time":1501495396,"ambient":26.90625,"objecttemp":22.6875,"humidity":27.0916748046875,"rh":69.05517578125}