我尝试使用「Confluent + RabbitMQ」来进行物联网数据的流式处理

第二步:通过RabbitMQ中间件的代理来确认数据接收

总结

image.png

用以下的三个步骤逐步解释上述内容。这次我们来解释第二步。
第一步:在Docker容器环境中构建Confluent Platform。
第二步:通过RabbitMQ代理确认接收到的数据。
第三步:确认接收到经过流处理后的数据。

本地环境

macOS 大狮山 11.3
Python 3.8.3
Docker 版本 20.10.7,构建编号为 f0df350(CPU:8,内存:10GB,交换空间:1GB)

确认对RabbitMQ的访问

image.png

IoT数据生成Python程序

    将数据发送到RabbitMQ的程序如下所示。
import random
import json
import time
from datetime import date, datetime
import argparse
import string
from faker.factory import Factory
import pika

# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")

# IoT機器のダミーセクション(大文字アルファベットを定義)
section = string.ascii_uppercase

# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
    iot_items = json.dumps({
        'items': [{
            'id': i,                            # id
            'time': generate_time(),            # データ生成時間
            'proc': proc,                       # データ生成プロセス名
            'section': random.choice(section),  # IoT機器セクション
            'iot_num': fake.zipcode(),          # IoT機器番号
            'iot_state': fake.prefecture(),     # IoT設置場所
            'vol_1': random.uniform(100, 200),  # IoT値−1
            'vol_2': random.uniform(50, 90)     # IoT値−2
            } 
            for i in range(count)
        ]
    }, ensure_ascii=False).encode('utf-8')
    return iot_items


# IoT機器で計測されたダミーデータの生成時間
def generate_time():
    dt_time = datetime.now()
    gtime = json_trans_date(dt_time)
    return gtime

# date, datetimeの変換関数
def json_trans_date(obj):
    # 日付型を文字列に変換
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外は対象外.
    raise TypeError ("Type %s not serializable" % type(obj))


# メイン : ターミナル出力用
def tm_main(count, proc, wait):
    print('ターミナル 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    # pprint.pprint(json_dict)
    for item in json_dict['items']:
        print(item)
        time.sleep(wait)


# メイン : IoTHub 出力用
def mq_main(count, proc, wait):
    print('IoTHub 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    channel, properties = rabbitmq_init()
    for i, item in enumerate(json_dict['items']):
        message = json.dumps(item).encode('utf-8')

        try:
            print("Sending message: {}".format(message))
            properties.message_id = str(i)
            channel.basic_publish(exchange='', routing_key='IoTHub', body=message, properties=properties)
            time.sleep(wait)
        except KeyboardInterrupt:
            print ( "MQTT Client Interrupt Stopped" )
            break

# MQTT(RabbitMQ) 接続定義
def rabbitmq_init():
    pika_param = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(pika_param)
    channel = connection.channel()
    channel.queue_declare(queue='IoTHub')
    properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')

    return channel, properties


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
    parser.add_argument('--count', type=int, default=10, help='データ作成件数')
    parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
    parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ mq(MQTT出力)')
    parser.add_argument('--wait', type=float, default=1, help='データWait時間(x.x秒)')
    args = parser.parse_args()

    start = time.time()

    if (args.mode == 'mq'): 
        mq_main(args.count, args.proc, args.wait)
    else :
        tm_main(args.count, args.proc, args.wait)

    making_time = time.time() - start

    print("")
    print(f"データ作成件数:{args.count}")
    print("データ作成時間:{0}".format(making_time) + " [sec]")
    print("")

确认程序的参数信息。

$ python IoTSampleData-v5.py -h

usage: IoTSampleData-v5.py [-h] [--count COUNT] [--proc PROC] [--mode MODE] [--wait WAIT]

IoT機器のなんちゃってダミーデータの生成

optional arguments:
  -h, --help     show this help message and exit
  --count COUNT  データ作成件数
  --proc PROC    データ作成プロセス名
  --mode MODE    tm(ターミナル出力)/ mq(MQTT出力)
  --wait WAIT    データWait時間(x.x秒)

要将数据发送到 RabbitMQ,需要使用参数“–mode mq”。

将从IoT数据生成程序发送数据到RabbitMQ。

    1. 由于在上述程序中定义了”routing_key=’IoTHub'”,所以当执行此程序时,将自动在RabbitMQ中创建名为”IoTHub”的队列。该队列将积累数据。

 

    执行程序(数据发送至MQ,数据发送数量:5条,数据发送间隔:1秒)。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1

IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-07-14T08:21:52.210384", "proc": "111", "section": "E", "iot_num": "752-9500", "iot_state": "\\u6803\\u6728\\u770c", "vol_1": 125.1396008658748, "vol_2": 64.62129087138834}'
Sending message: b'{"id": 1, "time": "2021-07-14T08:21:52.210467", "proc": "111", "section": "K", "iot_num": "810-7586", "iot_state": "\\u5cf6\\u6839\\u770c", "vol_1": 107.10688686955699, "vol_2": 78.66865000214042}'
Sending message: b'{"id": 2, "time": "2021-07-14T08:21:52.210487", "proc": "111", "section": "R", "iot_num": "399-6917", "iot_state": "\\u9999\\u5ddd\\u770c", "vol_1": 180.24083536198077, "vol_2": 79.81508156740844}'
Sending message: b'{"id": 3, "time": "2021-07-14T08:21:52.210505", "proc": "111", "section": "G", "iot_num": "152-1350", "iot_state": "\\u6ecb\\u8cc0\\u770c", "vol_1": 188.35767194882712, "vol_2": 80.33187715056255}'
Sending message: b'{"id": 4, "time": "2021-07-14T08:21:52.210519", "proc": "111", "section": "N", "iot_num": "842-1028", "iot_state": "\\u6ecb\\u8cc0\\u770c", "vol_1": 170.42473551436927, "vol_2": 52.295427164150915}'

データ作成件数:5
データ作成時間:5.029109001159668 [sec]
image.png

创建数据接收主题

image.png

创建连接器

image.png
カテゴリ入力項目値Commontasks.max1Connectionrabbitmq.hostrabbitmqConnectionrabbitmq.usernameguestConnectionrabbitmq.passwordguestConnectionrabbitmq.port5672Sourcekafka.topictopic_201Sourcerabbitmq.queueIoTHub
image.png

在话题中确认数据接收

image.png

通过IoT数据生成程序将数据发送,并通过RabbitMQ通过代理商的“topic_201”接收数据,我们确认了这个功能。

进行流媒体处理的数据接收确认

image.png
カテゴリ入力項目値CommonValue converter classorg.apache.kafka.connect.converters.ByteArrayConverter
image.png

4. 运行物联网数据生成程序并发送新的数据。

$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1

IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-07-14T14:07:30.776174", "proc": "111", "section": "L", "iot_num": "319-1762", "iot_state": "\\u5cf6\\u6839\\u770c", "vol_1": 157.6113944081462, "vol_2": 71.87482721564037}'
Sending message: b'{"id": 1, "time": "2021-07-14T14:07:30.776255", "proc": "111", "section": "E", "iot_num": "606-3548", "iot_state": "\\u5927\\u5206\\u770c", "vol_1": 101.4282144148702, "vol_2": 54.41472842473606}'
Sending message: b'{"id": 2, "time": "2021-07-14T14:07:30.776276", "proc": "111", "section": "J", "iot_num": "846-2110", "iot_state": "\\u9ce5\\u53d6\\u770c", "vol_1": 162.78211638795443, "vol_2": 76.92165187757502}'
Sending message: b'{"id": 3, "time": "2021-07-14T14:07:30.776294", "proc": "111", "section": "Y", "iot_num": "196-8432", "iot_state": "\\u798f\\u4e95\\u770c", "vol_1": 118.78173693261178, "vol_2": 74.39179273564206}'
Sending message: b'{"id": 4, "time": "2021-07-14T14:07:30.776309", "proc": "111", "section": "K", "iot_num": "881-3972", "iot_state": "\\u798f\\u5cf6\\u770c", "vol_1": 198.90396115859022, "vol_2": 59.419668368865416}'

データ作成件数:5
データ作成時間:5.025936841964722 [sec]
image.png

通过IoT数据生成程序发送数据,并通过RabbitMQ在Broker的”topic_201″接收到数据。同时,我们也确认能够按照所需的每个项目取得值,以便进行流处理。
在接下来的步骤中,我们将尝试对”topic_201″的数据进行流处理(查询处理),并检查是否能够提取数据。

准备迈向下一个阶段

在执行以下命令之前,请删除“topic_201”的所有数据。请执行以下命令。

$ docker exec -it broker /bin/bash

[appuser@broker ~]$ 
[appuser@broker ~]$ /bin/kafka-topics --zookeeper zookeeper:2181 --delete --topic topic_201
Topic topic_201 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[appuser@broker ~]$ exit
$

这个课题的步骤信息

第一步:在Docker容器环境中构建Confluent平台。
第二步:通过RabbitMQ代理确认接收到的数据。
第三步:确认流处理后的数据接收情况。

广告
将在 10 秒后关闭
bannerAds