我在本地的Docker环境中快速地运行了 Kafka 第7次回顾

概述

受到 Kafka 的吸引,一名只懂得基础设施的软件工程师购买了新的 MacBook Pro,并参考了在 Qiita 等网站上发布的前辈们的文章,分步骤记录了在自己的电脑上进行 Kafka 的操作确认的过程,并将其分为数次进行了描述。关于 Kafka 的概述,请参考此文章。

Kafka-7.png

执行环境

MacOS Big Sur 11.1
Docker 版本 20.10.2,构建 2291f61
Python 3.8.3

在消费者设置中接收消息的选项

为了接收数据,连接到消费者并转到程序所在的目录。

$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#

运行IoTTopicData-v1.py,并设置数据接收配置(终端输出)。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力

目前没有任何提示出现,但仍在等待从生产者那里接收消息。

启动两个Producer容器

为了发送数据,您需要打开另一个终端,并启动两个生产者。之后,请检查这两个容器是否已经启动。

$ docker-compose up -d --scale iot=2
Creating iotsampledata_iot_2 ... done

$ docker-compose ps
       Name           Command   State   Ports
---------------------------------------------
iotsampledata_iot_1   python3   Up           
iotsampledata_iot_2   python3   Up           

从两个制片人发送的信息

为了同时运行两个容器程序,我们将创建一个名为「script_iot.sh」的脚本文件。
我们将设置每个容器发送 500 条数据(参数:–count 500)。
我们还将设置一个参数(参数:–proc 1111 or 2222)以区分是从哪个容器发送的数据。

#!/bin/zsh
docker exec iotsampledata_iot_1 python /app/opt/IoTSampleData-v2.py --mode kf --proc 1111 --count 500 &
docker exec iotsampledata_iot_2 python /app/opt/IoTSampleData-v2.py --mode kf --proc 2222 --count 500

我将运行这个脚本。

$ ./iot_multi.sh
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f1e5b861ad0>

データ作成件数:500
データ作成時間:0.21667098999023438 [sec]

Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f165d9c8450>

データ作成件数:500
データ作成時間:0.22739458084106445 [sec]

消费者接收到由生产者发送的数据并显示在提示符中。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
  中略
topic-11:0:766: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.211665', 'PROC': '1111', 'IOT_NUM': '832-6134', 'IOT_STATE': '茨城県', 'VOL_1': 179.96703574789623, 'VOL_2': 80.56843267707185}
topic-11:0:767: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.211852', 'PROC': '1111', 'IOT_NUM': '673-3797', 'IOT_STATE': '福井県', 'VOL_1': 147.72401039767132, 'VOL_2': 85.72532419479631}
topic-11:0:768: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.216597', 'PROC': '2222', 'IOT_NUM': '503-2390', 'IOT_STATE': '千葉県', 'VOL_1': 152.7394266108338, 'VOL_2': 57.05013141176613}
topic-11:0:769: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.216701', 'PROC': '2222', 'IOT_NUM': '516-4680', 'IOT_STATE': '青森県', 'VOL_1': 108.40957664102798, 'VOL_2': 87.16620915206302}
topic-11:0:770: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.217158', 'PROC': '2222', 'IOT_NUM': '461-3098', 'IOT_STATE': '熊本県', 'VOL_1': 156.59169261796896, 'VOL_2': 84.65261961696028}
topic-11:0:771: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.217178', 'PROC': '2222', 'IOT_NUM': '715-8459', 'IOT_STATE': '宮城県', 'VOL_1': 170.13111876865736, 'VOL_2': 63.72228869990629}
topic-11:0:772: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.217307', 'PROC': '2222', 'IOT_NUM': '153-1059', 'IOT_STATE': '石川県', 'VOL_1': 109.43435107461053, 'VOL_2': 71.26109660837363}
topic-11:0:773: key=b'2021/02/06' value={'SECTION': 'W', 'TIME': '2021-02-06T01:36:51.212614', 'PROC': '1111', 'IOT_NUM': '019-2207', 'IOT_STATE': '長崎県', 'VOL_1': 136.0842636605171, 'VOL_2': 51.32787199610963}
topic-11:0:774: key=b'2021/02/06' value={'SECTION': 'W', 'TIME': '2021-02-06T01:36:51.212961', 'PROC': '1111', 'IOT_NUM': '783-1782', 'IOT_STATE': '宮崎県', 'VOL_1': 123.00662563145016, 'VOL_2': 59.17796383111264}
 後略

通过 topic-01 → Ksql → topic-11 的路径,可以确认在两个Producer容器上生成的Python程序的数据可以通过Consumer上的Python程序接收到(通过检查PROC键的值,可以看到数据在各自的容器中被接收)。

最后

我在本地的Docker环境中迅速地运行了Kafka,总共进行了7次尝试。虽然没有包括模式注册表和连接器等内容,但我已确认了Kafka的基本运行情况。

第1回:在本地的Docker环境中运行Kafka基本组件
第2回:通过Broker,确认能够从Kafka的Producer发送的消息被Consumer接收
第3回:通过Broker,确认Producer上的Python程序生成的数据能够被Consumer接收
第4回:确认Producer上生成的数据通过topic-01并经过KSQL(topic01_stream1 → topic01_stream2)进行流式抽取处理
第5回:通过topic-01 → Ksql → topic-11,确认Producer上生成的数据可以被Consumer上的Python程序接收
第6回:通过topic-01 → Ksql → topic-11,确认Producer上生成的数据可以通过Consumer上的Python程序写入S3
第7回:通过topic-01 → Ksql → topic-11,确认两个Producer容器生成的各自数据可以被Consumer上的Python程序接收

请提供相关信息。

根据以下信息作为参考,我非常感谢您:
Kafka Docker教程
从Kafka到KSQL,通过Docker轻松搭建环境

广告
将在 10 秒后关闭
bannerAds