尝试使用能在 Apache Kafka 上执行 SQL 的 KSQL
KSQL的发布已经在Confluent宣布了。这是一个可以在Kafka上进行SQL流处理的工具。让我们立即开始使用吧。
请提供下列文本的中文翻译:
参考
-
- Introducing KSQL: Open Source Streaming SQL for Apache Kafka
-
- KSQL from Confluent | Streaming SQL for Apache Kafka™ – YouTube
- ksql/docs/quickstart at 0.1.x · confluentinc/ksql
Apache Kafka 是什么?
Apache Kafka 是由 LinkedIn 开发的分布式消息传递系统。它具有基于主题的消息管理和按消费者组进行排队等功能。
KSQL 是什么?
KSQL是由Confluent开发的一款开源产品,在Kafka上使用SQL的托管平台服务和流处理功能。
-
- Web: https://www.confluent.io/product/ksql/
GitHub: https://github.com/confluentinc/ksql
Documents: https://github.com/confluentinc/ksql/tree/0.1.x/docs/
Blog Post: https://www.confluent.io/blog/ksql-open-source-streaming-sql-for-apache-kafka/
这个应用程序似乎可以用于从日志数据中检测错误发生等。
安装
用 Docker 启动很方便。首先需要下载仓库。
$ git clone git@github.com:confluentinc/ksql.git
Cloning into 'ksql'...
remote: Counting objects: 17903, done.
remote: Total 17903 (delta 0), reused 0 (delta 0), pack-reused 17903
Receiving objects: 100% (17903/17903), 4.14 MiB | 560.00 KiB/s, done.
Resolving deltas: 100% (8522/8522), done.
只需在 ksql/docs/quickstart 中运行 docker-compose up,即可启动 Kafka、Zookeeper 和 KSQL。
$ cd ksql/docs/quickstart
$ ls
README.md docker-compose.yml ksql-quickstart-schemas.jpg quickstart-docker.md quickstart-non-docker.md
$ docker-compose up -d
Creating network "quickstart_default" with the default driver
Pulling zookeeper (confluentinc/cp-zookeeper:latest)...
latest: Pulling from confluentinc/cp-zookeeper
...
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------------------------------------------------------------
quickstart_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
quickstart_ksql-cli_1 perl -e while(1){ sleep 99 ... Up
quickstart_ksql-datagen-pageviews_1 bash -c echo Waiting for K ... Up
quickstart_ksql-datagen-users_1 bash -c echo Waiting for K ... Up
quickstart_schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
quickstart_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
要进入容器内的KSQL CLI,需要使用docker-compose exec命令。只需要直接执行以下命令即可。
$ docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
======================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Kafka =
Copyright 2017 Confluent Inc.
CLI v0.1, Server v0.1 located at http://localhost:9098
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
创建流和表
在快速入门中,我们以具有以下模式的页面浏览量(pageviews)和用户(users)作为示例。
为了从页面浏览量和用户中创建流和表,请执行以下查询。
-- ストリームの作成
CREATE STREAM pageviews_original(
viewtime bigint,
userid varchar,
pageid varchar
) WITH(
kafka_topic = 'pageviews',
value_format = 'DELIMITED'
);
-- テーブルの作成
CREATE TABLE users_original(
registertime bigint,
gender varchar,
regionid varchar,
userid varchar
) WITH(
kafka_topic = 'users',
value_format = 'JSON'
);
我会在KSQL CLI上运行一下。它被正确创建了。自动创建了两列,分别命名为ROWTIME和ROWKEY。ROWTIME列存储了Kafka消息的时间戳,ROWKEY列存储了消息的键。
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
Message
----------------
Stream created
ksql> DESCRIBE pageviews_original;
Field | Type
----------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');
Message
---------------
Table created
ksql> DESCRIBE users_original;
Field | Type
--------------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
REGISTERTIME | BIGINT
GENDER | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
USERID | VARCHAR(STRING)
要获取流和表的列表,请执行以下查询语句。
ksql> SHOW STREAMS;
Stream Name | Kafka Topic | Format
-----------------------------------------------------------------
PAGEVIEWS_ORIGINAL | pageviews | DELIMITED
ksql> SHOW TABLES;
Table Name | Kafka Topic | Format | Windowed
--------------------------------------------------------------
USERS_ORIGINAL | users | JSON | false
查询的写法 (In Chinese: Chaxun de xiefa)
通过SELECT获取消息
在SELECT语句中,您可以从流中获取数据。默认情况下,似乎可以获取到查询执行之后的消息。如果不设置限制(LIMIT),消息会不断显示。当您想结束获取数据时,请执行。
ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_51
Page_37
Page_75
LIMIT reached for the partition.
Query terminated
创建使用其他流或表的流。
您可以创建多个流或表,并创建新的流。在这里,我们正在创建一个用于获取性别为女性的用户页面浏览信息的流。结果将流经Kafka的PAGEVIEWS_FEMALE主题。
CREATE STREAM pageviews_female AS
SELECT
users_original.userid AS userid,
pageid,
regionid,
gender
FROM
pageviews_original
LEFT JOIN
users_original
ON pageviews_original.userid = users_original.userid
WHERE
gender = 'FEMALE'
;
ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
Message
----------------------------
Stream created and running
ksql> DESCRIBE pageviews_female;
Field | Type
----------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
我尝试执行SELECT语句,仅成功获取到FEMALE的数据!
ksql> SELECT * FROM pageviews_female LIMIT 3;
1503994511734 | User_7 | User_7 | Page_54 | Region_1 | FEMALE
1503994518152 | User_5 | User_5 | Page_94 | Region_3 | FEMALE
1503994519403 | User_5 | User_5 | Page_14 | Region_3 | FEMALE
LIMIT reached for the partition.
Query terminated
基于喜好的筛选
看起来我们可以使用LIKE句来进一步筛选数据,并且还可以指定主题名称。
CREATE STREAM pageviews_female_like_89 WITH(
kafka_topic = 'pageviews_enriched_r8_r9',
value_format = 'DELIMITED'
) AS
SELECT
*
FROM
pageviews_female
WHERE
regionid LIKE '%_8'
OR regionid LIKE '%_9'
;
窗户处理
在流处理中,您还可以执行重要的窗口处理。在这里,我们将根据地区从pageviews_female中获取页面浏览量进行汇总。
CREATE TABLE pageviews_regions AS
SELECT
gender,
regionid,
COUNT(*) AS numusers
FROM
pageviews_female WINDOW TUMBLING(
size 30 second
)
GROUP BY
gender,
regionid
HAVING COUNT(*) > 1
;
获取已创建的 pageviews_regions 流。
ksql> SELECT regionid, numusers FROM pageviews_regions LIMIT 5;
Region_8 | 3
Region_8 | 4
Region_6 | 2
Region_4 | 2
Region_6 | 4
LIMIT reached for the partition.
Query terminated
查看查询
您可以使用 SHOW QUERIES 获取已执行的查询列表。无法输出通过 SELECT 获取消息或像最初执行的 CREATE STREAM 这样的查询。只会显示输出到特定 Kafka 主题的查询结果。
SHOW QUERIES;
Query ID | Kafka Topic | Query String
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
2 | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
3 | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
收拾
停止查询可以使用 TERMINATE。请指定查询 ID。
ksql> TERMINATE 2;
要退出 KSQL CLI,请执行 exit 命令。
ksql> exit
Exiting KSQL.
不要忘记停止容器。
$ docker-compose down
Stopping quickstart_ksql-cli_1 ... done
Stopping quickstart_ksql-datagen-users_1 ... done
Stopping quickstart_ksql-datagen-pageviews_1 ... done
Stopping quickstart_schema-registry_1 ... done
Stopping quickstart_kafka_1 ... done
Stopping quickstart_zookeeper_1 ... done
Removing quickstart_ksql-cli_1 ... done
Removing quickstart_ksql-datagen-users_1 ... done
Removing quickstart_ksql-datagen-pageviews_1 ... done
Removing quickstart_schema-registry_1 ... done
Removing quickstart_kafka_1 ... done
Removing quickstart_zookeeper_1 ... done
Removing network quickstart_default
心得
-
- 複数のトピックにまたがった集計処理が簡単に書けて便利
集計結果を特定のトピックにまとめることができる
ログからアラートに投げるとか、バッチ処理に使うとか、いろいろできそう
ストリームとテーブルの違いが難しい。
concepts にストリームとテーブルの説明があるが…うーん?