我在本地的Docker环境中快速运行了Kafka。第六次尝试

总结

受到卡夫卡的吸引,只能在手头的电脑上尝试运行一下了…然后,只懂得基础架构的系统工程师购买了一台新的 MacBookPro,并参考了在 Qiita 等上面发布的前辈们的文章,将操作确认的步骤分成了几次记载。
关于卡夫卡的概述,请参阅这篇文章。

Kafka-6.png

执行环境

macOS Big Sur 11.1:
苹果操作系统大苏尔11.1版本

Docker version 20.10.2, build 2291f61:
Docker 版本20.10.2,构建号2291f61

Python 3.8.3:
Python 3.8.3版本

创建消费者容器。

将S3的凭证信息定义在“.env”文件中。

KEY_ID=xxxxxxxxxxxxxxxxxx
KEY_VALUE=zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
REGION=ap-northeast-1

用于创建Python程序运行容器的目录结构如下。

$ tree
.
├── .env
├── Dockerfile
├── docker-compose.yml
├── opt
│   └── IoTTopicData-v1.py
└── requirements.txt

docker-compose.yml的内容如下所示。
由于是本地的Docker环境,所以Python程序(IoTTopicData-v1.py)没有使用COPY,而是使用了volumes。

version: '3'
services:
  iot:
    build: .
    working_dir: '/app/'
    tty: true
    volumes:
      - ./opt:/app/opt
    environment:
      AWS_ACCESS_KEY_ID: ${KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${KEY_VALUE}
      AWS_DEFAULT_REGION: ${REGION}

networks:
  default:
    external:
      name: iot_network

DockerFile的内容如下所示。
最后一行的”requirements.txt”是用于分别定义Python程序所需函数的。

FROM python:3.7.5-slim
USER root

RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8

ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm

RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt

以下是 requirements.txt 的内容。
其中定义了在 Python 程序中要导入的必要函数。

boto3
kafka-python

创建和确认Consumer容器

构建并启动定义的容器。

$ docker-compose up -d
    前略
Creating iottopicdata_ktp_1 ... done

我将确认启动。

$ docker-compose ps
       Name          Command   State   Ports
--------------------------------------------
iottopicdata_ktp_1   python3   Up           

在Consumer上运行的程序

拡展後的Python程式如下。每五個數據為一組,將其寫入S3存儲桶「boto3-cloudian」中。寫入時的文件名將遵循IoTSelectDataFromKafkaTopic/年/月/日/時的層次結構目錄,其檔名格式為年月日時分秒.毫秒.json。

import json
import time
import argparse
import boto3
import pprint
from datetime import datetime
from kafka import KafkaProducer
from kafka import KafkaConsumer

BUCKET_NAME = 'boto3-cloudian'


# ターミナル出力用
def topic_to_tm(consumer):
    print('ターミナル 出力')

    # Read data from kafka
    try :
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                                message.offset, message.key,
                                                message.value))
    except KeyboardInterrupt :
        print('\r\n Output to Terminal - interrupted!')
        return

# Cloudian/S3 出力用
def topic_to_s3(consumer):
    print('Cloudian/S3 出力')

    # Read data from kafka
    try :
        i = 1
        buffer = []
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                            message.offset, message.key,
                                            message.value))
            buffer.append(message.value)

            if i == 5 :
                # pprint.pprint (buffer)
                write_to_s3(buffer)
                buffer.clear()
                i = 1
            else :
                i += 1
    except KeyboardInterrupt :
        print('\r\n Output to ObjectStorage - interrupted!')
        return

# Cloudian/S3のオブジェクトへのデータ書き込み
def write_to_s3(buffer):
    # Create S3 Object key
    object_key = get_object_key()
    print(object_key)

    # client = boto3.client('s3', endpoint_url='http://s3-pic.networld.local')    # Cloudianへのアクセス時
    client = boto3.client('s3')                                                 # S3へのアクセス時
    client.put_object(
        Bucket=BUCKET_NAME,
        Key=object_key,
        Body=json.dumps(buffer,ensure_ascii=False).encode('utf-8')
    )


# Cloudian/S3のオブジェクトキー(ファイル)名の生成
def get_object_key():
    #S3階層可変(日時)値
    variable_key = 'IoTSelectDataFromKafkaTopic/' + datetime.now().strftime('%Y/%m/%d/%H/')
    # S3に作成するオブジェクト名指定
    key_name = datetime.now().strftime('%Y%m%d%H%M%S.%f') + '.json'

    object_key = variable_key + key_name
    return object_key


# Kafka Topic からのデータ受取
def get_kafka_topic():
    # Initialize consumer variable and set property for JSON decode
    consumer = KafkaConsumer ('topic-11',
                        bootstrap_servers = ['broker:29092'],
                        value_deserializer=lambda m: json.loads(m.decode('utf-8')))

    print(consumer)
    return consumer


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='KafkaからのIoT機器のなんちゃってStreamデータの取得')
    parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ s3(オブジェクトストレージ出力)')
    args = parser.parse_args()

    start = time.time()

    consumer = get_kafka_topic()

    if (args.mode == 's3'): 
        topic_to_s3(consumer)
    else :
        topic_to_tm(consumer)

    making_time = time.time() - start

    print("")
    print("Streamデータ取得待機時間:{0}".format(making_time) + " [sec]")
    print("")

在Consumer上的消息接收设置。

为了接收数据,连接到消费者并移动到程序所在的目录。
还要检查AWS的凭证信息是否正确。

$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#

root@c23123e17068:/app/opt# env | grep AWS
AWS_DEFAULT_REGION=ap-northeast-1
AWS_SECRET_ACCESS_KEY=zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
AWS_ACCESS_KEY_ID=xxxxxxxxxxxxxxxxxx

我們將執行 IoTTopicData-v1.py 並將來自 topic-11 的數據寫入 S3 的設置。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode s3
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
Cloudian/S3 出力

↑ 目前没有显示任何提示,但正在等待从生产者那里接收消息。

向制片人发出的消息发送

为了发送数据,打开一个新的终端连接到生产者并切换到程序所在的目录。

$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#

运行IoTSampleData-v2.py并发送生成的数据(100条)。

root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 100
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7fb0125b5e90>

データ作成件数:100
データ作成時間:0.13501501083374023 [sec]

在上述操作执行后,topic-11的数据将会以以下方式输出到消费者的终端上:

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode s3
<kafka.consumer.group.KafkaConsumer object at 0x7fee27e25650>
Cloudian/S3 出力
topic-11:0:2: key=b'2021/02/05' value={'SECTION': 'C', 'TIME': '2021-02-05T23:22:38.974127', 'PROC': '111', 'IOT_NUM': '537-7989', 'IOT_STATE': '岡山県', 'VOL_1': 136.47748912214027, 'VOL_2': 89.4598706488899}
topic-11:0:3: key=b'2021/02/05' value={'SECTION': 'W', 'TIME': '2021-02-05T23:22:38.974149', 'PROC': '111', 'IOT_NUM': '424-8856', 'IOT_STATE': '富山県', 'VOL_1': 123.0593268339469, 'VOL_2': 78.46322776492022}
topic-11:0:4: key=b'2021/02/05' value={'SECTION': 'W', 'TIME': '2021-02-05T23:22:38.974710', 'PROC': '111', 'IOT_NUM': '116-0745', 'IOT_STATE': '山梨県', 'VOL_1': 116.11217771503935, 'VOL_2': 74.09635644937616}
topic-11:0:5: key=b'2021/02/05' value={'SECTION': 'C', 'TIME': '2021-02-05T23:22:38.974862', 'PROC': '111', 'IOT_NUM': '843-7881', 'IOT_STATE': '奈良県', 'VOL_1': 108.3586171123569, 'VOL_2': 62.99783651862181}
topic-11:0:6: key=b'2021/02/05' value={'SECTION': 'C', 'TIME': '2021-02-05T23:22:38.974949', 'PROC': '111', 'IOT_NUM': '310-1209', 'IOT_STATE': '熊本県', 'VOL_1': 182.1592585797406, 'VOL_2': 86.23441031130653}
IoTSelectDataFromKafkaTopic/2021/02/05/23/20210205232239.222971.json
  中略
IoTSelectDataFromKafkaTopic/2021/02/05/23/20210205232239.584630.json
  中略
IoTSelectDataFromKafkaTopic/2021/02/05/23/20210205232239.717087.json
  後略
Kafka-6_s3.png

通过Producer上的Python程序生成的数据经过topic-01 → Ksql → topic-11的处理,我们确认可以通过Consumer上的Python程序将其写入S3。

下一次的事项

下一次(第7次)我们将考虑从多个物联网设备中拿取数据,我们将启动多个生产者容器来确认相同的事情。

第一篇:在本地的Docker环境中运行Kafka的基本组件。
第二篇:确认通过Kafka的Producer发送的消息经过Broker传递后能够被Consumer接收到。
第三篇:确认通过Producer端的Python程序生成的数据经过Broker传递后能够被Consumer接收到。
第四篇:确认经过Producer生成的数据通过topic-01,经过KSQL(topic01_stream1 → topic01_stream2)进行流式提取处理的结果。
第五篇:确认经过Producer生成的数据通过topic-01 → Ksql → topic-11传递后能够被Consumer端的Python程序接收到。
第六篇:确认经过Producer生成的数据通过topic-01 → Ksql → topic-11传递后能够被Consumer端的Python程序写入到S3中。
第七篇:确认通过两个Producer容器生成的各自数据经过topic-01 → Ksql → topic-11传递后能够被Consumer端的Python程序接收到。

参考信息

根据以下信息作为参考,我非常感谢。
Kafka Docker教程
从Kafka到KSQL,使用Docker进行简单环境配置。

广告
将在 10 秒后关闭
bannerAds