尝试使用能在 Apache Kafka 上执行 SQL 的 KSQL

KSQL的发布已经在Confluent宣布了。这是一个可以在Kafka上进行SQL流处理的工具。让我们立即开始使用吧。

请提供下列文本的中文翻译:

参考

    • Introducing KSQL: Open Source Streaming SQL for Apache Kafka

 

    • KSQL from Confluent | Streaming SQL for Apache Kafka™ – YouTube

 

    ksql/docs/quickstart at 0.1.x · confluentinc/ksql

Apache Kafka 是什么?

Apache Kafka 是由 LinkedIn 开发的分布式消息传递系统。它具有基于主题的消息管理和按消费者组进行排队等功能。

kafka

KSQL 是什么?

KSQL是由Confluent开发的一款开源产品,在Kafka上使用SQL的托管平台服务和流处理功能。

    • Web: https://www.confluent.io/product/ksql/

GitHub: https://github.com/confluentinc/ksql

Documents: https://github.com/confluentinc/ksql/tree/0.1.x/docs/

Blog Post: https://www.confluent.io/blog/ksql-open-source-streaming-sql-for-apache-kafka/

这个应用程序似乎可以用于从日志数据中检测错误发生等。

安装

用 Docker 启动很方便。首先需要下载仓库。

$ git clone git@github.com:confluentinc/ksql.git
Cloning into 'ksql'...
remote: Counting objects: 17903, done.
remote: Total 17903 (delta 0), reused 0 (delta 0), pack-reused 17903
Receiving objects: 100% (17903/17903), 4.14 MiB | 560.00 KiB/s, done.
Resolving deltas: 100% (8522/8522), done.

只需在 ksql/docs/quickstart 中运行 docker-compose up,即可启动 Kafka、Zookeeper 和 KSQL。

$ cd ksql/docs/quickstart 
$ ls
README.md                   docker-compose.yml          ksql-quickstart-schemas.jpg quickstart-docker.md        quickstart-non-docker.md
$ docker-compose up -d
Creating network "quickstart_default" with the default driver
Pulling zookeeper (confluentinc/cp-zookeeper:latest)...
latest: Pulling from confluentinc/cp-zookeeper
...
$ docker-compose ps
               Name                              Command               State                           Ports                          
-------------------------------------------------------------------------------------------------------------------------------------
quickstart_kafka_1                    /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp       
quickstart_ksql-cli_1                 perl -e while(1){ sleep 99 ...   Up                                                             
quickstart_ksql-datagen-pageviews_1   bash -c echo Waiting for K ...   Up                                                             
quickstart_ksql-datagen-users_1       bash -c echo Waiting for K ...   Up                                                             
quickstart_schema-registry_1          /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp                                 
quickstart_zookeeper_1                /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp

要进入容器内的KSQL CLI,需要使用docker-compose exec命令。只需要直接执行以下命令即可。

$ docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
                       ======================================
                       =      _  __ _____  ____  _          =
                       =     | |/ // ____|/ __ \| |         =
                       =     | ' /| (___ | |  | | |         =
                       =     |  <  \___ \| |  | | |         =
                       =     | . \ ____) | |__| | |____     =
                       =     |_|\_\_____/ \___\_\______|    =
                       =                                    =
                       =   Streaming SQL Engine for Kafka   =
Copyright 2017 Confluent Inc.                         

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

创建流和表

在快速入门中,我们以具有以下模式的页面浏览量(pageviews)和用户(users)作为示例。

为了从页面浏览量和用户中创建流和表,请执行以下查询。

-- ストリームの作成 
CREATE STREAM pageviews_original(
  viewtime bigint,
  userid varchar,
  pageid varchar
) WITH(
  kafka_topic = 'pageviews',
  value_format = 'DELIMITED'
);

-- テーブルの作成
CREATE TABLE users_original(
  registertime bigint,
  gender varchar,
  regionid varchar,
  userid varchar
) WITH(
  kafka_topic = 'users',
  value_format = 'JSON'
);

我会在KSQL CLI上运行一下。它被正确创建了。自动创建了两列,分别命名为ROWTIME和ROWKEY。ROWTIME列存储了Kafka消息的时间戳,ROWKEY列存储了消息的键。

ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');

 Message        
----------------
 Stream created 
ksql> DESCRIBE pageviews_original;

 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 VIEWTIME | BIGINT          
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 

ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');

 Message       
---------------
 Table created 
ksql> DESCRIBE users_original;

 Field        | Type            
--------------------------------
 ROWTIME      | BIGINT          
 ROWKEY       | VARCHAR(STRING) 
 REGISTERTIME | BIGINT          
 GENDER       | VARCHAR(STRING) 
 REGIONID     | VARCHAR(STRING) 
 USERID       | VARCHAR(STRING)

要获取流和表的列表,请执行以下查询语句。

ksql> SHOW STREAMS;

 Stream Name              | Kafka Topic              | Format    
-----------------------------------------------------------------
 PAGEVIEWS_ORIGINAL       | pageviews                | DELIMITED 

ksql> SHOW TABLES;

 Table Name        | Kafka Topic       | Format    | Windowed 
--------------------------------------------------------------
 USERS_ORIGINAL    | users             | JSON      | false   

查询的写法 (In Chinese: Chaxun de xiefa)

通过SELECT获取消息

在SELECT语句中,您可以从流中获取数据。默认情况下,似乎可以获取到查询执行之后的消息。如果不设置限制(LIMIT),消息会不断显示。当您想结束获取数据时,请执行。

ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_51
Page_37
Page_75
LIMIT reached for the partition.
Query terminated

创建使用其他流或表的流。

您可以创建多个流或表,并创建新的流。在这里,我们正在创建一个用于获取性别为女性的用户页面浏览信息的流。结果将流经Kafka的PAGEVIEWS_FEMALE主题。

CREATE STREAM pageviews_female AS
SELECT
  users_original.userid AS userid,
  pageid,
  regionid,
  gender
FROM
  pageviews_original
  LEFT JOIN
    users_original
  ON  pageviews_original.userid = users_original.userid
WHERE
  gender = 'FEMALE'
;
ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';

 Message                    
----------------------------
 Stream created and running 
ksql> DESCRIBE pageviews_female;

 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 
 REGIONID | VARCHAR(STRING) 
 GENDER   | VARCHAR(STRING) 

我尝试执行SELECT语句,仅成功获取到FEMALE的数据!

ksql> SELECT * FROM pageviews_female LIMIT 3;
1503994511734 | User_7 | User_7 | Page_54 | Region_1 | FEMALE
1503994518152 | User_5 | User_5 | Page_94 | Region_3 | FEMALE
1503994519403 | User_5 | User_5 | Page_14 | Region_3 | FEMALE
LIMIT reached for the partition.
Query terminated

基于喜好的筛选

看起来我们可以使用LIKE句来进一步筛选数据,并且还可以指定主题名称。

CREATE STREAM pageviews_female_like_89 WITH(
  kafka_topic = 'pageviews_enriched_r8_r9',
  value_format = 'DELIMITED'
) AS
SELECT
  *
FROM
  pageviews_female
WHERE
  regionid LIKE '%_8'
OR  regionid LIKE '%_9'
;

窗户处理

在流处理中,您还可以执行重要的窗口处理。在这里,我们将根据地区从pageviews_female中获取页面浏览量进行汇总。

CREATE TABLE pageviews_regions AS
SELECT
  gender,
  regionid,
  COUNT(*) AS numusers
FROM
  pageviews_female WINDOW TUMBLING(
    size 30 second
  )
GROUP BY
  gender,
  regionid
HAVING COUNT(*) > 1
;

获取已创建的 pageviews_regions 流。

ksql> SELECT regionid, numusers FROM pageviews_regions LIMIT 5;
Region_8 | 3
Region_8 | 4
Region_6 | 2
Region_4 | 2
Region_6 | 4
LIMIT reached for the partition.
Query terminated

查看查询

您可以使用 SHOW QUERIES 获取已执行的查询列表。无法输出通过 SELECT 获取消息或像最初执行的 CREATE STREAM 这样的查询。只会显示输出到特定 Kafka 主题的查询结果。

SHOW QUERIES;

 Query ID | Kafka Topic       | Query String                                                                                                                                                                                                                      
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 1        | PAGEVIEWS_FEMALE  | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; 
 2        | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';     
 3        | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;     

收拾

停止查询可以使用 TERMINATE。请指定查询 ID。

ksql> TERMINATE 2;

要退出 KSQL CLI,请执行 exit 命令。

ksql> exit
Exiting KSQL.

不要忘记停止容器。

$ docker-compose down
Stopping quickstart_ksql-cli_1 ... done
Stopping quickstart_ksql-datagen-users_1 ... done
Stopping quickstart_ksql-datagen-pageviews_1 ... done
Stopping quickstart_schema-registry_1 ... done
Stopping quickstart_kafka_1 ... done
Stopping quickstart_zookeeper_1 ... done
Removing quickstart_ksql-cli_1 ... done
Removing quickstart_ksql-datagen-users_1 ... done
Removing quickstart_ksql-datagen-pageviews_1 ... done
Removing quickstart_schema-registry_1 ... done
Removing quickstart_kafka_1 ... done
Removing quickstart_zookeeper_1 ... done
Removing network quickstart_default

心得

    • 複数のトピックにまたがった集計処理が簡単に書けて便利

集計結果を特定のトピックにまとめることができる
ログからアラートに投げるとか、バッチ処理に使うとか、いろいろできそう

ストリームとテーブルの違いが難しい。

concepts にストリームとテーブルの説明があるが…うーん?

广告
将在 10 秒后关闭
bannerAds