我在本地的Docker环境中快速运行了Kafka。第六次尝试
总结
受到卡夫卡的吸引,只能在手头的电脑上尝试运行一下了…然后,只懂得基础架构的系统工程师购买了一台新的 MacBookPro,并参考了在 Qiita 等上面发布的前辈们的文章,将操作确认的步骤分成了几次记载。
关于卡夫卡的概述,请参阅这篇文章。
执行环境
macOS Big Sur 11.1:
苹果操作系统大苏尔11.1版本
Docker version 20.10.2, build 2291f61:
Docker 版本20.10.2,构建号2291f61
Python 3.8.3:
Python 3.8.3版本
创建消费者容器。
将S3的凭证信息定义在“.env”文件中。
KEY_ID=xxxxxxxxxxxxxxxxxx
KEY_VALUE=zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
REGION=ap-northeast-1
用于创建Python程序运行容器的目录结构如下。
$ tree
.
├── .env
├── Dockerfile
├── docker-compose.yml
├── opt
│ └── IoTTopicData-v1.py
└── requirements.txt
docker-compose.yml的内容如下所示。
由于是本地的Docker环境,所以Python程序(IoTTopicData-v1.py)没有使用COPY,而是使用了volumes。
version: '3'
services:
iot:
build: .
working_dir: '/app/'
tty: true
volumes:
- ./opt:/app/opt
environment:
AWS_ACCESS_KEY_ID: ${KEY_ID}
AWS_SECRET_ACCESS_KEY: ${KEY_VALUE}
AWS_DEFAULT_REGION: ${REGION}
networks:
default:
external:
name: iot_network
DockerFile的内容如下所示。
最后一行的”requirements.txt”是用于分别定义Python程序所需函数的。
FROM python:3.7.5-slim
USER root
RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm
RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt
以下是 requirements.txt 的内容。
其中定义了在 Python 程序中要导入的必要函数。
boto3
kafka-python
创建和确认Consumer容器
构建并启动定义的容器。
$ docker-compose up -d
前略
Creating iottopicdata_ktp_1 ... done
我将确认启动。
$ docker-compose ps
Name Command State Ports
--------------------------------------------
iottopicdata_ktp_1 python3 Up
在Consumer上运行的程序
拡展後的Python程式如下。每五個數據為一組,將其寫入S3存儲桶「boto3-cloudian」中。寫入時的文件名將遵循IoTSelectDataFromKafkaTopic/年/月/日/時的層次結構目錄,其檔名格式為年月日時分秒.毫秒.json。
import json
import time
import argparse
import boto3
import pprint
from datetime import datetime
from kafka import KafkaProducer
from kafka import KafkaConsumer
BUCKET_NAME = 'boto3-cloudian'
# ターミナル出力用
def topic_to_tm(consumer):
print('ターミナル 出力')
# Read data from kafka
try :
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
except KeyboardInterrupt :
print('\r\n Output to Terminal - interrupted!')
return
# Cloudian/S3 出力用
def topic_to_s3(consumer):
print('Cloudian/S3 出力')
# Read data from kafka
try :
i = 1
buffer = []
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
buffer.append(message.value)
if i == 5 :
# pprint.pprint (buffer)
write_to_s3(buffer)
buffer.clear()
i = 1
else :
i += 1
except KeyboardInterrupt :
print('\r\n Output to ObjectStorage - interrupted!')
return
# Cloudian/S3のオブジェクトへのデータ書き込み
def write_to_s3(buffer):
# Create S3 Object key
object_key = get_object_key()
print(object_key)
# client = boto3.client('s3', endpoint_url='http://s3-pic.networld.local') # Cloudianへのアクセス時
client = boto3.client('s3') # S3へのアクセス時
client.put_object(
Bucket=BUCKET_NAME,
Key=object_key,
Body=json.dumps(buffer,ensure_ascii=False).encode('utf-8')
)
# Cloudian/S3のオブジェクトキー(ファイル)名の生成
def get_object_key():
#S3階層可変(日時)値
variable_key = 'IoTSelectDataFromKafkaTopic/' + datetime.now().strftime('%Y/%m/%d/%H/')
# S3に作成するオブジェクト名指定
key_name = datetime.now().strftime('%Y%m%d%H%M%S.%f') + '.json'
object_key = variable_key + key_name
return object_key
# Kafka Topic からのデータ受取
def get_kafka_topic():
# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('topic-11',
bootstrap_servers = ['broker:29092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
print(consumer)
return consumer
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='KafkaからのIoT機器のなんちゃってStreamデータの取得')
parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ s3(オブジェクトストレージ出力)')
args = parser.parse_args()
start = time.time()
consumer = get_kafka_topic()
if (args.mode == 's3'):
topic_to_s3(consumer)
else :
topic_to_tm(consumer)
making_time = time.time() - start
print("")
print("Streamデータ取得待機時間:{0}".format(making_time) + " [sec]")
print("")
在Consumer上的消息接收设置。
为了接收数据,连接到消费者并移动到程序所在的目录。
还要检查AWS的凭证信息是否正确。
$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#
root@c23123e17068:/app/opt# env | grep AWS
AWS_DEFAULT_REGION=ap-northeast-1
AWS_SECRET_ACCESS_KEY=zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
AWS_ACCESS_KEY_ID=xxxxxxxxxxxxxxxxxx
我們將執行 IoTTopicData-v1.py 並將來自 topic-11 的數據寫入 S3 的設置。
root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode s3
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
Cloudian/S3 出力
↑ 目前没有显示任何提示,但正在等待从生产者那里接收消息。
向制片人发出的消息发送
为了发送数据,打开一个新的终端连接到生产者并切换到程序所在的目录。
$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#
运行IoTSampleData-v2.py并发送生成的数据(100条)。
root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 100
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7fb0125b5e90>
データ作成件数:100
データ作成時間:0.13501501083374023 [sec]
在上述操作执行后,topic-11的数据将会以以下方式输出到消费者的终端上:
root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode s3
<kafka.consumer.group.KafkaConsumer object at 0x7fee27e25650>
Cloudian/S3 出力
topic-11:0:2: key=b'2021/02/05' value={'SECTION': 'C', 'TIME': '2021-02-05T23:22:38.974127', 'PROC': '111', 'IOT_NUM': '537-7989', 'IOT_STATE': '岡山県', 'VOL_1': 136.47748912214027, 'VOL_2': 89.4598706488899}
topic-11:0:3: key=b'2021/02/05' value={'SECTION': 'W', 'TIME': '2021-02-05T23:22:38.974149', 'PROC': '111', 'IOT_NUM': '424-8856', 'IOT_STATE': '富山県', 'VOL_1': 123.0593268339469, 'VOL_2': 78.46322776492022}
topic-11:0:4: key=b'2021/02/05' value={'SECTION': 'W', 'TIME': '2021-02-05T23:22:38.974710', 'PROC': '111', 'IOT_NUM': '116-0745', 'IOT_STATE': '山梨県', 'VOL_1': 116.11217771503935, 'VOL_2': 74.09635644937616}
topic-11:0:5: key=b'2021/02/05' value={'SECTION': 'C', 'TIME': '2021-02-05T23:22:38.974862', 'PROC': '111', 'IOT_NUM': '843-7881', 'IOT_STATE': '奈良県', 'VOL_1': 108.3586171123569, 'VOL_2': 62.99783651862181}
topic-11:0:6: key=b'2021/02/05' value={'SECTION': 'C', 'TIME': '2021-02-05T23:22:38.974949', 'PROC': '111', 'IOT_NUM': '310-1209', 'IOT_STATE': '熊本県', 'VOL_1': 182.1592585797406, 'VOL_2': 86.23441031130653}
IoTSelectDataFromKafkaTopic/2021/02/05/23/20210205232239.222971.json
中略
IoTSelectDataFromKafkaTopic/2021/02/05/23/20210205232239.584630.json
中略
IoTSelectDataFromKafkaTopic/2021/02/05/23/20210205232239.717087.json
後略
通过Producer上的Python程序生成的数据经过topic-01 → Ksql → topic-11的处理,我们确认可以通过Consumer上的Python程序将其写入S3。
下一次的事项
下一次(第7次)我们将考虑从多个物联网设备中拿取数据,我们将启动多个生产者容器来确认相同的事情。
第一篇:在本地的Docker环境中运行Kafka的基本组件。
第二篇:确认通过Kafka的Producer发送的消息经过Broker传递后能够被Consumer接收到。
第三篇:确认通过Producer端的Python程序生成的数据经过Broker传递后能够被Consumer接收到。
第四篇:确认经过Producer生成的数据通过topic-01,经过KSQL(topic01_stream1 → topic01_stream2)进行流式提取处理的结果。
第五篇:确认经过Producer生成的数据通过topic-01 → Ksql → topic-11传递后能够被Consumer端的Python程序接收到。
第六篇:确认经过Producer生成的数据通过topic-01 → Ksql → topic-11传递后能够被Consumer端的Python程序写入到S3中。
第七篇:确认通过两个Producer容器生成的各自数据经过topic-01 → Ksql → topic-11传递后能够被Consumer端的Python程序接收到。
参考信息
根据以下信息作为参考,我非常感谢。
Kafka Docker教程
从Kafka到KSQL,使用Docker进行简单环境配置。