我在本地的Docker环境中轻松地运行了Kafka。第三次尝试

总结
简报

因为被 Kafka 吸引,只知道基础设施的软件工程师决定在自己手头的电脑上试运行一下。他购买了一台全新的 MacBookPro,并参考了 Qiita 等网站上前辈们发布的文章,将操作步骤分成几部分进行说明,以便确认 Kafka 的运行情况。关于 Kafka 的概要,请参考这篇文章。

kafka-3.png

执行环境

macOS Big Sur 11.1
Docker版本20.10.2,构建2291f61
Python 3.8.3

最开始

为了创建一个新的主题(主题-01),我们将连接到经纪人。创建后,我们将确认其内容。

$ docker exec -it broker /bin/bash
root@broker:/#

root@broker:/# kafka-topics --bootstrap-server broker:9092 --create --topic topic-01 --partitions 3 replication-factor 1
Created topic topic-01

root@broker:/# kafka-topics --bootstrap-server broker:9092 --describe --topic topic-01
Topic: topic-01 PartitionCount: 3   ReplicationFactor: 1    Configs: 
    Topic: topic-01 Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic-01 Partition: 1    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic-01 Partition: 2    Leader: 1   Replicas: 1 Isr: 1

创建Producer容器

我将创建一个新的Producer容器。以下是用于创建Python程序容器的目录结构。

$ tree
.
├── Dockerfile
├── docker-compose.yml
├── opt
│   └── IoTSampleData-v2.py
└── requirements.txt

docker-compose.yml的内容如下。
由于是本地的Docker环境,所以Python程序(IoTSampleData-v2.py)没有使用COPY,而是使用了volumes。

version: '3'
services:
  iot:
    build: .
    working_dir: '/app/'
    tty: true
    volumes:
      - ./opt:/app/opt

networks:
  default:
    external:
      name: iot_network

以下是DockerFile。
最后一行的「requirements.txt」将在Python程序中单独定义所需的函数。

FROM python:3.7.5-slim
USER root

RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8

ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm

RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt

下面是requirements.txt的内容。
它定义了在Python程序中import所需的函数。

faker
kafka-python

创建和确认Producer容器

构建并启动定义好的容器。

$ docker-compose up -d
    前略
Creating iotsampledata_iot_1 ... done

我们将确认启动。

$ docker-compose ps
       Name           Command   State   Ports
---------------------------------------------
iotsampledata_iot_1   python3   Up           

检查所有正在运行的容器。

$ docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED         STATUS         PORTS                                                    NAMES
4e4f79c219e1   iotsampledata_iot                 "python3"                3 minutes ago   Up 3 minutes                                                            iotsampledata_iot_1
37e2e1f360f5   confluentinc/cp-kafka:5.5.1       "/bin/sh"                2 hours ago     Up 2 hours     9092/tcp                                                 cli
78d0a02910fe   confluentinc/cp-kafka:5.5.1       "/etc/confluent/dock…"   2 hours ago     Up 2 hours     0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp         broker
ad55284f0174   confluentinc/cp-zookeeper:5.5.1   "/etc/confluent/dock…"   2 hours ago     Up 2 hours     2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp   zookeeper

消费者的消息接收设置

我会连接到 CLI。

$ docker exec -it cli /bin/bash
root@cli:/#

作为消费者,我会进行消息接收设置。

root@cli:/# kafka-console-consumer --bootstrap-server broker:29092 --topic topic-01 --group G1 --from-beginning

目前没有显示任何提示,但正在等待从Producer接收信息。

从制作人发送的消息传递

以下是在生产环境中运行的数据生成程序。
请查看程序中的”items”以确认生成的数据的列结构。
这是该程序的修改版。

import random
import json
import time
from datetime import date, datetime
from collections import OrderedDict
import argparse
import string
import pprint
from faker.factory import Factory
from kafka import KafkaProducer
from kafka import KafkaConsumer

# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")

# IoT機器のダミーセクション(小文字アルファベットを定義)
section = string.ascii_uppercase


# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
    iot_items = json.dumps({
        'items': [{
            'id': i,                            # id
            'time': generate_time(),            # データ生成時間
            'proc': proc,                       # データ生成プロセス  :プログラム実行時のパラメータ
            'section': random.choice(section),  # IoT機器セクション  :A-Z文字をランダムに割当
            'iot_num': fake.zipcode(),          # IoT機器番号    :郵便番号をランダムに割当
            'iot_state': fake.prefecture(),     # IoT設置場所    :都道府県名をランダムに割当
            'vol_1': random.uniform(100, 200),  # IoT値−1       :100-200の間の値をランダムに割当(小数点以下、14桁)
            'vol_2': random.uniform(50, 90)     # IoT値−2      :50-90の間の値をランダムに割当(小数点以下、14桁)
            } 
            for i in range(count)
        ]
    }, ensure_ascii=False).encode('utf-8')
    return iot_items


# IoT機器で計測されたダミーデータの生成時間
def generate_time():
    dt_time = datetime.now()
    gtime = json_trans_date(dt_time)
    return gtime

# date, datetimeの変換関数
def json_trans_date(obj):
    # 日付型を文字列に変換
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外は対象外.
    raise TypeError ("Type %s not serializable" % type(obj))


# メイン : ターミナル出力用
def tm_main(count, proc):
    print('ターミナル 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)
    pprint.pprint(json_dict)


# メイン : Kafka出力用
def kf_main(count, proc):
    print('Kafka 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    producer = KafkaProducer(bootstrap_servers=['broker:29092'])
    date = datetime.now().strftime("%Y/%m/%d")

    for item in json_dict['items']:
        # print(item)
        # result = producer.send('topic-01', json.dumps(item).encode('utf-8'))
        result = producer.send('topic-01', key=date.encode('utf-8'), value=json.dumps(item).encode('utf-8'))

    print(result)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
    parser.add_argument('--count', type=int, default=10, help='データ作成件数')
    parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
    parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ kf(Kafka出力)')
    args = parser.parse_args()

    start = time.time()

    if (args.mode == 'kf'): 
        kf_main(args.count, args.proc)
    else :
        tm_main(args.count, args.proc)

    making_time = time.time() - start

    print("")
    print(f"データ作成件数:{args.count}")
    print("データ作成時間:{0}".format(making_time) + " [sec]")
    print("")

为了发送数据,打开另一个终端,并连接到Producer,然后转到程序所在的目录。

$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#

运行IoTSampleData-v2.py,并发送生成的数据(默认为10条)。

root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f7b5ca50490>

データ作成件数:10
データ作成時間:0.09887456893920898 [sec]

在消费者的提示下,显示了由生产者发送的数据(请原谅字符编码转换)。

root@cli:/# kafka-console-consumer --bootstrap-server broker:29092 --topic topic-01 --group G1 --from-beginning
{"id": 0, "time": "2021-02-04T16:26:15.900303", "proc": "111", "section": "S", "iot_num": "115-4984", "iot_state": "\u5bcc\u5c71\u770c", "vol_1": 145.2739911204906, "vol_2": 56.15103042985286}
{"id": 1, "time": "2021-02-04T16:26:15.900343", "proc": "111", "section": "O", "iot_num": "467-0063", "iot_state": "\u9577\u5d0e\u770c", "vol_1": 196.68828974426407, "vol_2": 83.83189487506144}
{"id": 2, "time": "2021-02-04T16:26:15.900365", "proc": "111", "section": "H", "iot_num": "475-9700", "iot_state": "\u611b\u5a9b\u770c", "vol_1": 194.72768503158943, "vol_2": 68.37182675896713}
{"id": 3, "time": "2021-02-04T16:26:15.900386", "proc": "111", "section": "W", "iot_num": "322-4370", "iot_state": "\u4e09\u91cd\u770c", "vol_1": 148.90496235643883, "vol_2": 67.71055729821154}
{"id": 4, "time": "2021-02-04T16:26:15.900405", "proc": "111", "section": "G", "iot_num": "461-8598", "iot_state": "\u9ad8\u77e5\u770c", "vol_1": 125.97506447191827, "vol_2": 70.61188682350212}
{"id": 5, "time": "2021-02-04T16:26:15.900416", "proc": "111", "section": "T", "iot_num": "105-7057", "iot_state": "\u798f\u4e95\u770c", "vol_1": 172.64015749557626, "vol_2": 68.7370674440578}
{"id": 6, "time": "2021-02-04T16:26:15.900429", "proc": "111", "section": "K", "iot_num": "691-1826", "iot_state": "\u9577\u5d0e\u770c", "vol_1": 117.36354421367344, "vol_2": 81.55938819205682}
{"id": 7, "time": "2021-02-04T16:26:15.900502", "proc": "111", "section": "W", "iot_num": "355-5626", "iot_state": "\u4e09\u91cd\u770c", "vol_1": 123.18728419707203, "vol_2": 88.59540935089659}
{"id": 8, "time": "2021-02-04T16:26:15.900534", "proc": "111", "section": "O", "iot_num": "102-4557", "iot_state": "\u6ecb\u8cc0\u770c", "vol_1": 117.00423119320587, "vol_2": 83.8472348239608}
{"id": 9, "time": "2021-02-04T16:26:15.900544", "proc": "111", "section": "D", "iot_num": "675-3926", "iot_state": "\u5cf6\u6839\u770c", "vol_1": 176.01299983418306, "vol_2": 51.670329829172005}

通过Broker,我们可以确认在Producer上生成的Python程序数据能够被Consumer接收到。

关于下一次

下次(第四次)我们将使用KSQL从Producer发送过来的数据中提取。

第1回:在本地Docker环境中运行Kafka基本组件
第2回:确认通过Kafka的Producer发送的消息经过Broker传递后能够在Consumer端接收
第3回:确认通过Producer端的Python程序生成的数据经过Broker传递后能够在Consumer端接收
第4回:确认通过Producer端生成的数据经过topic-01传递到KSQL(topic01_stream1 → topic01_stream2)进行流式抽取处理
第5回:确认通过Producer端生成的数据经过topic-01 → Ksql → topic-11传递后能够在Consumer端的Python程序中接收
第6回:确认通过Producer端生成的数据经过topic-01 → Ksql → topic-11传递后能够在Consumer端的Python程序中写入S3
第7回:确认在两个Producer容器上生成的各自数据经过topic-01 → Ksql → topic-11传递后能够在Consumer端的Python程序中接收

请提供相关信息

我参考了以下信息。非常感谢。
Kafka 在 Docker 中的教程
通过 Docker 简便地建立 Kafka 到 KSQL 的环境

广告
将在 10 秒后关闭
bannerAds