我在本地的Docker环境中简单地运行了 Kafka,这是第四回
简述
受到卡夫卡(Kafka)的吸引,一位只了解基础设施的软件工程师决定先在自己的电脑上试运行一下……为了参考资深前辈在Qiita等网站上发布的文章,购买了一台新的MacBookPro,并将多次进行确认的操作步骤分几次记录下来。有关卡夫卡的概述请参阅这篇文章。
执行环境
macOS Big Sur 11.1
Docker 版本 20.10.2,构建 2291f61
Python 3.8.3
创建KSQL容器
在第一个 docker-compose.yml 中添加 ksql-server(KSQL 服务器)和 ksql-cli(KSQL 客户端)的容器定义。
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
hostname: zookeeper
container_name: zookeeper
ports:
- "32181:32181"
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
cli:
image: confluentinc/cp-kafka:5.5.1
hostname: cli
container_name: cli
depends_on:
- broker
entrypoint: /bin/sh
tty: true
ksql-server:
image: confluentinc/cp-ksql-server:5.4.3
hostname: ksql-server
container_name: ksql-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksql-server
KSQL_APPLICATION_ID: "IoT-demo-1"
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_AUTO_OFFSET_RESET: "earliest"
ksql-cli:
image: confluentinc/cp-ksql-cli:5.4.3
container_name: ksql-cli
volumes:
- $PWD/ksql.commands:/tmp/ksql.commands
depends_on:
- broker
- ksql-server
entrypoint: /bin/sh
tty: true
networks:
default:
external:
name: iot_network
创建和确认容器
构建并启动已定义的容器。
$ docker-compose up -d
前略
zookeeper is up-to-date
broker is up-to-date
cli is up-to-date
Creating ksql-server ... done
Creating ksql-cli ... done
检查正在运行的容器。
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
cli /bin/sh Up 9092/tcp
ksql-cli /bin/sh Up
ksql-server /etc/confluent/docker/run Up (healthy) 0.0.0.0:8088->8088/tcp
zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
创建主题以进行数据流式传输
连接到经纪人。
$ docker exec -it broker /bin/bash
root@broker:/#
创建一个名为“topic-11”的主题,以将数据流化。
root@broker:/# kafka-topics --bootstrap-server broker:9092 --create --topic topic-11 --partitions 3 replication-factor 1
Created topic topic-11.
确认所创建的主题。
root@broker:/# kafka-topics --bootstrap-server broker:9092 --describe --topic topic-11
Topic: topic-11 PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: topic-11 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-11 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-11 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
在KSQL中创建流式处理
我要连接到 ksql-cli。
$ docker exec -it ksql-cli /bin/bash
root@35620515e9f1:/#
使用 ksql-cli 连接到 ksql-server。
root@35620515e9f1:/# ksql http://ksql-server:8088
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017-2019 Confluent Inc.
CLI v5.4.3, Server v5.4.3 located at http://ksql-server:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
我将创建一个名为“topic01_stream1”的流来接收制片人发送的数据(topic-01)。
ksql> CREATE STREAM topic01_stream1 (id INT, time VARCHAR, proc VARCHAR, section VARCHAR, iot_num VARCHAR, iot_state VARCHAR, vol_1 DOUBLE, vol_2 DOUBLE) WITH (KAFKA_TOPIC = 'topic-01', VALUE_FORMAT='JSON', KEY='section');
Message
----------------
Stream created
----------------
我会查看创建的“topic01_stream1”的信息。
ksql> describe extended topic01_stream1;
Name : TOPIC01_STREAM1
Type : STREAM
Key field : SECTION
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : topic-01 (partitions: 3, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ID | INTEGER
TIME | VARCHAR(STRING)
PROC | VARCHAR(STRING)
SECTION | VARCHAR(STRING)
IOT_NUM | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VOL_1 | DOUBLE
VOL_2 | DOUBLE
---------------------------------------
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic-01)
使用以下条件提取「topic01_stream1」的流数据,并创建一个名为「topic01_stream2」的流,将结果发送到「topic-11」。
抽取条件为:section=’E’ 或 section=’C’ 或 section=’W’。
ksql> CREATE STREAM topic01_stream2 WITH (KAFKA_TOPIC = 'topic-11', VALUE_FORMAT='JSON') AS SELECT t01s1.section as section, t01s1.time as time, t01s1.proc as proc, t01s1.iot_num as iot_num, t01s1.iot_state as iot_state, t01s1.vol_1 as vol_1, t01s1.vol_2 as vol_2 FROM topic01_stream1 t01s1 WHERE section='E' OR section='C' OR section='W';
Message
----------------------------------------------------------------------------------------------------
Stream TOPIC01_STREAM2 created and running. Created by query with query ID: CSAS_TOPIC01_STREAM2_1
----------------------------------------------------------------------------------------------------
确认“topic01_stream2”中的信息。
ksql> describe extended topic01_stream2;
Name : TOPIC01_STREAM2
Type : STREAM
Key field : SECTION
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : topic-11 (partitions: 3, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SECTION | VARCHAR(STRING)
TIME | VARCHAR(STRING)
PROC | VARCHAR(STRING)
IOT_NUM | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VOL_1 | DOUBLE
VOL_2 | DOUBLE
---------------------------------------
Queries that write from this STREAM
-----------------------------------
CSAS_TOPIC01_STREAM2_1 : CREATE STREAM TOPIC01_STREAM2 WITH (KAFKA_TOPIC='topic-11', PARTITIONS=3, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT
T01S1.SECTION "SECTION",
T01S1.TIME "TIME",
T01S1.PROC "PROC",
T01S1.IOT_NUM "IOT_NUM",
T01S1.IOT_STATE "IOT_STATE",
T01S1.VOL_1 "VOL_1",
T01S1.VOL_2 "VOL_2"
FROM TOPIC01_STREAM1 T01S1
WHERE (((T01S1.SECTION = 'E') OR (T01S1.SECTION = 'C')) OR (T01S1.SECTION = 'W'))
EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0 total-messages: 2 last-message: 2021-02-04T09:12:35.209Z
(Statistics of the local KSQL server interaction with the Kafka topic topic-11)
确认已创建的流与主题的关联信息。
ksql> show streams;
Stream Name | Kafka Topic | Format
----------------------------------------
TOPIC01_STREAM1 | topic-01 | JSON
TOPIC01_STREAM2 | topic-11 | JSON
----------------------------------------
在KSQL中验证流式处理
进行设置以查看在「topic01_stream2」上流式传输的信息。
ksql> select * from topic01_stream2 emit changes;
Press CTRL-C to interrupt
我正在等待从制作人那里接收提取的数据。
为了发送数据,打开一个新的终端并连接到生产者,然后切换到程序所在的目录。
$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#
运行IoTSampleData-v2.py,并发送生成的数据(数据条数:30条)。
root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 30
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f43c5e36850>
データ作成件数:30
データ作成時間:0.12617969512939453 [sec]
在 kqsl 提示符中将来自生产者的提取数据显示出来。
ksql> select * from topic01_stream2 emit changes;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|ROWTIME |ROWKEY |SECTION |TIME |PROC |IOT_NUM |IOT_STATE |VOL_1 |VOL_2 |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|161243152299|2021/02/04 |E |2021-02-04T1|111 |734-3238 |大分県 |139.02429108|51.183520722|
|2 | | |8:38:42.8751| | | |703143 |012844 |
| | | |73 | | | | | |
|161243152299|2021/02/04 |W |2021-02-04T1|111 |409-8822 |高知県 |140.98584152|58.169693933|
|2 | | |8:38:42.8753| | | |262225 |88201 |
| | | |73 | | | | | |
|161243152299|2021/02/04 |C |2021-02-04T1|111 |169-2154 |長崎県 |119.11701506|69.181332688|
|3 | | |8:38:42.8756| | | |472588 |56029 |
| | | |75 | | | | | |
Press CTRL-C to interrupt
我确认通过topic-01,Producer上的Python程序生成的数据已经被KSQL(topic01_stream1 → topic01_stream2)进行了流式抽取处理。
关于下一次
下次(第5次)將確認使用KSQL進行流式抽取處理的資料能夠被消費者接收到。
第一回:在本地的Docker环境中运行Kafka基本组件
第二回:确认从Kafka的生产者发送的消息通过代理到达消费者
第三回:确认通过代理将生产者上的Python程序生成的数据传递给消费者
第四回:确认通过在生产者上生成的数据经过topic-01传递到KSQL(topic01_stream1 → topic01_stream2)进行流数据抽取处理
第五回:确认通过在生产者上生成的数据经过topic-01 → Ksql → topic-11传递到消费者上的Python程序中接收
第六回:确认通过在生产者上生成的数据经过topic-01 → Ksql → topic-11传递到消费者上的Python程序并将其写入S3
第七回:确认在两个生产者容器上生成的各自数据通过topic-01 → Ksql → topic-11传递到消费者上的Python程序中接收
参考信息 (Chinese paraphrase of “参考情報”)
我已参考以下信息,并表示感谢。
Kafka的Docker教程
从Kafka到KSQL的简易Docker环境搭建