我尝试使用「Confluent + RabbitMQ」来进行物联网数据的流式处理
第二步:通过RabbitMQ中间件的代理来确认数据接收
总结
用以下的三个步骤逐步解释上述内容。这次我们来解释第二步。
第一步:在Docker容器环境中构建Confluent Platform。
第二步:通过RabbitMQ代理确认接收到的数据。
第三步:确认接收到经过流处理后的数据。
本地环境
macOS 大狮山 11.3
Python 3.8.3
Docker 版本 20.10.7,构建编号为 f0df350(CPU:8,内存:10GB,交换空间:1GB)
确认对RabbitMQ的访问
IoT数据生成Python程序
- 将数据发送到RabbitMQ的程序如下所示。
import random
import json
import time
from datetime import date, datetime
import argparse
import string
from faker.factory import Factory
import pika
# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")
# IoT機器のダミーセクション(大文字アルファベットを定義)
section = string.ascii_uppercase
# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
iot_items = json.dumps({
'items': [{
'id': i, # id
'time': generate_time(), # データ生成時間
'proc': proc, # データ生成プロセス名
'section': random.choice(section), # IoT機器セクション
'iot_num': fake.zipcode(), # IoT機器番号
'iot_state': fake.prefecture(), # IoT設置場所
'vol_1': random.uniform(100, 200), # IoT値−1
'vol_2': random.uniform(50, 90) # IoT値−2
}
for i in range(count)
]
}, ensure_ascii=False).encode('utf-8')
return iot_items
# IoT機器で計測されたダミーデータの生成時間
def generate_time():
dt_time = datetime.now()
gtime = json_trans_date(dt_time)
return gtime
# date, datetimeの変換関数
def json_trans_date(obj):
# 日付型を文字列に変換
if isinstance(obj, (datetime, date)):
return obj.isoformat()
# 上記以外は対象外.
raise TypeError ("Type %s not serializable" % type(obj))
# メイン : ターミナル出力用
def tm_main(count, proc, wait):
print('ターミナル 出力')
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
# pprint.pprint(json_dict)
for item in json_dict['items']:
print(item)
time.sleep(wait)
# メイン : IoTHub 出力用
def mq_main(count, proc, wait):
print('IoTHub 出力')
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
channel, properties = rabbitmq_init()
for i, item in enumerate(json_dict['items']):
message = json.dumps(item).encode('utf-8')
try:
print("Sending message: {}".format(message))
properties.message_id = str(i)
channel.basic_publish(exchange='', routing_key='IoTHub', body=message, properties=properties)
time.sleep(wait)
except KeyboardInterrupt:
print ( "MQTT Client Interrupt Stopped" )
break
# MQTT(RabbitMQ) 接続定義
def rabbitmq_init():
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='IoTHub')
properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')
return channel, properties
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
parser.add_argument('--count', type=int, default=10, help='データ作成件数')
parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ mq(MQTT出力)')
parser.add_argument('--wait', type=float, default=1, help='データWait時間(x.x秒)')
args = parser.parse_args()
start = time.time()
if (args.mode == 'mq'):
mq_main(args.count, args.proc, args.wait)
else :
tm_main(args.count, args.proc, args.wait)
making_time = time.time() - start
print("")
print(f"データ作成件数:{args.count}")
print("データ作成時間:{0}".format(making_time) + " [sec]")
print("")
确认程序的参数信息。
$ python IoTSampleData-v5.py -h
usage: IoTSampleData-v5.py [-h] [--count COUNT] [--proc PROC] [--mode MODE] [--wait WAIT]
IoT機器のなんちゃってダミーデータの生成
optional arguments:
-h, --help show this help message and exit
--count COUNT データ作成件数
--proc PROC データ作成プロセス名
--mode MODE tm(ターミナル出力)/ mq(MQTT出力)
--wait WAIT データWait時間(x.x秒)
要将数据发送到 RabbitMQ,需要使用参数“–mode mq”。
将从IoT数据生成程序发送数据到RabbitMQ。
-
- 由于在上述程序中定义了”routing_key=’IoTHub'”,所以当执行此程序时,将自动在RabbitMQ中创建名为”IoTHub”的队列。该队列将积累数据。
- 执行程序(数据发送至MQ,数据发送数量:5条,数据发送间隔:1秒)。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1
IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-07-14T08:21:52.210384", "proc": "111", "section": "E", "iot_num": "752-9500", "iot_state": "\\u6803\\u6728\\u770c", "vol_1": 125.1396008658748, "vol_2": 64.62129087138834}'
Sending message: b'{"id": 1, "time": "2021-07-14T08:21:52.210467", "proc": "111", "section": "K", "iot_num": "810-7586", "iot_state": "\\u5cf6\\u6839\\u770c", "vol_1": 107.10688686955699, "vol_2": 78.66865000214042}'
Sending message: b'{"id": 2, "time": "2021-07-14T08:21:52.210487", "proc": "111", "section": "R", "iot_num": "399-6917", "iot_state": "\\u9999\\u5ddd\\u770c", "vol_1": 180.24083536198077, "vol_2": 79.81508156740844}'
Sending message: b'{"id": 3, "time": "2021-07-14T08:21:52.210505", "proc": "111", "section": "G", "iot_num": "152-1350", "iot_state": "\\u6ecb\\u8cc0\\u770c", "vol_1": 188.35767194882712, "vol_2": 80.33187715056255}'
Sending message: b'{"id": 4, "time": "2021-07-14T08:21:52.210519", "proc": "111", "section": "N", "iot_num": "842-1028", "iot_state": "\\u6ecb\\u8cc0\\u770c", "vol_1": 170.42473551436927, "vol_2": 52.295427164150915}'
データ作成件数:5
データ作成時間:5.029109001159668 [sec]
创建数据接收主题
创建连接器
カテゴリ入力項目値Commontasks.max1Connectionrabbitmq.hostrabbitmqConnectionrabbitmq.usernameguestConnectionrabbitmq.passwordguestConnectionrabbitmq.port5672Sourcekafka.topictopic_201Sourcerabbitmq.queueIoTHub
在话题中确认数据接收
通过IoT数据生成程序将数据发送,并通过RabbitMQ通过代理商的“topic_201”接收数据,我们确认了这个功能。
进行流媒体处理的数据接收确认
カテゴリ入力項目値CommonValue converter classorg.apache.kafka.connect.converters.ByteArrayConverter
4. 运行物联网数据生成程序并发送新的数据。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1
IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-07-14T14:07:30.776174", "proc": "111", "section": "L", "iot_num": "319-1762", "iot_state": "\\u5cf6\\u6839\\u770c", "vol_1": 157.6113944081462, "vol_2": 71.87482721564037}'
Sending message: b'{"id": 1, "time": "2021-07-14T14:07:30.776255", "proc": "111", "section": "E", "iot_num": "606-3548", "iot_state": "\\u5927\\u5206\\u770c", "vol_1": 101.4282144148702, "vol_2": 54.41472842473606}'
Sending message: b'{"id": 2, "time": "2021-07-14T14:07:30.776276", "proc": "111", "section": "J", "iot_num": "846-2110", "iot_state": "\\u9ce5\\u53d6\\u770c", "vol_1": 162.78211638795443, "vol_2": 76.92165187757502}'
Sending message: b'{"id": 3, "time": "2021-07-14T14:07:30.776294", "proc": "111", "section": "Y", "iot_num": "196-8432", "iot_state": "\\u798f\\u4e95\\u770c", "vol_1": 118.78173693261178, "vol_2": 74.39179273564206}'
Sending message: b'{"id": 4, "time": "2021-07-14T14:07:30.776309", "proc": "111", "section": "K", "iot_num": "881-3972", "iot_state": "\\u798f\\u5cf6\\u770c", "vol_1": 198.90396115859022, "vol_2": 59.419668368865416}'
データ作成件数:5
データ作成時間:5.025936841964722 [sec]
通过IoT数据生成程序发送数据,并通过RabbitMQ在Broker的”topic_201″接收到数据。同时,我们也确认能够按照所需的每个项目取得值,以便进行流处理。
在接下来的步骤中,我们将尝试对”topic_201″的数据进行流处理(查询处理),并检查是否能够提取数据。
准备迈向下一个阶段
在执行以下命令之前,请删除“topic_201”的所有数据。请执行以下命令。
$ docker exec -it broker /bin/bash
[appuser@broker ~]$
[appuser@broker ~]$ /bin/kafka-topics --zookeeper zookeeper:2181 --delete --topic topic_201
Topic topic_201 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[appuser@broker ~]$ exit
$
这个课题的步骤信息
第一步:在Docker容器环境中构建Confluent平台。
第二步:通过RabbitMQ代理确认接收到的数据。
第三步:确认流处理后的数据接收情况。