让我们在confluent的ksql中玩耍吧!

首先

在Confluent的4.x版本得到更新之后,Kafka终于也迎来了1.x版本!因此,我们对公司内部的访问日志数据流进行了全新的Kafka环境搭建。

由于结构并没有大的变化,所以我将省略这部分内容,但是ksql相比最初发布时已经有了很大的改进,所以我想介绍一下ksql。

KSQL是什么?

关于卡夫卡的内容在上面提到的文章中已经简要介绍了,因此不再赘述。
官方链接在这里。

Apache Kafka的流式SQL

这是一个使用Kafka和SQL查询实时流式数据的应用程序。

组成元素

为了生成KSQL,您需要理解两个组成要素。

以下是原文。

Here is the original text.

从DataSource来源生成Stream数据。
具体包括:
– Kafka的Topic
– 通过KSQL生成的Stream
– 通过KSQL生成的Table

随着时间的推移,类似于以下的流式数据源不断增加:
– “爱丽丝给鲍勃发送了100美元”
– “查理给鲍勃发送了50美元”

桌子

提供了所谓View的功能。可以选择、处理和聚合流数据,并构建类似关系数据库中的表。

以下也是DataSource,与Stream一样。
– Kafka的Topic
– 通过KSQL生成的Stream
– 通过KSQL生成的Table

以下是一个例子:
– 鲍勃的账户余额是150美元
– 爱丽丝的账户余额是150美元
– 查理的账户余额是50美元

以下是一个示例:
– 鲍勃的账户余额为150美元
– 爱丽丝的账户余额为150美元
– 查理的账户余额为50美元

以下是一个示例:
– 鲍勃的账户里还有150美元
– 爱丽丝的账户里还有150美元
– 查理的账户里还有50美元

但是与不断增长的流式数据不同,这个是根据流数据构建表的。

那么,有什么不同呢?

如果能够接受一定程度的语言不准确,可以将Stream视为连续到达的处理数据,而Table则是基于这些数据进行的聚合数据。

– 引き合いのテーブルでは、ボブの口座には150ドルあります。
– 引き合いのテーブルには、アリスの口座にも150ドルあります。
– 引き合いのテーブルでは、チャーリーの口座には50ドルしかありません。

通过流媒体,发生了以下事件:
– “爱丽丝向鲍勃发送了100美元”
– “查理向鲍勃发送了50美元”

当这些数据被发送到表中时,
-鲍勃的账户余额从150美元变为300美元
-艾丽斯的账户余额从150美元变为50美元
-查理的课程余额从50美元变为0美元
表会相应发生变化。

略有冗余

通过选择键值来创建只会积累流数据的表,不论怎样选择。

在上述例子中,结合两种角色来讲,
– Stream 是用于查询更新的数据
– Table 是根据查询进行更新和引用的视图

无论是Stream,还是可以用于对Streaming数据进行一些简单的处理,如何利用取决于想法。我之前提到的只是一个例子,所以请各自考虑如何使用方便。

来玩吧!

我建议那些想要简单快速开始的人直接执行QuickStart。这次我在公司的长期支持版本化的访问日志数据上进行了一些小调整,现在给大家介绍一下。

关于主题的内容

我将访问日志以LTVS格式导出到Nginx,并通过fluentd发送到Kafka。然后,对这些数据进行一些处理,将年月日时分秒分离为不同的列,并使用KafkaStreams去除不需要的数据,最后使用处理过的数据。

我们来创建一个Stream。

因为我认为这很难理解,所以让我们快速地创建一个流。
以下是要提交到ksql的查询。

ksql> CREATE STREAM parsed_json_stream
(
 VirtualHost STRING,
 Server STRING,
 ClientIP STRING,
 Status INTEGER,
 ResponseTime INTEGER,
 Year INTEGER,
 Month INTEGER,
 Day INTEGER,
 Hour INTEGER,
 Min INTEGER
) WITH (
 KAFKA_TOPIC='parsed_json',
 VALUE_FORMAT='JSON'
);

如果您对访问日志很熟悉,只需一眼就可以理解。
原始主题中包含了其他各种访问日志数据,但我们限制了数据量,只集中在想要统计的数据集以及年月日时分这些目标项目上。

如果成功地获得了类似于Success! 的输出,我会进行检查。

ksql> show streams;

 Stream Name                   | Kafka Topic | Format
--------------------------------------------------
 PARSED_JSON_STREAM | parsed_json | JSON
--------------------------------------------------

在这一点上,Kafka并不会自动生成主题。
流只是消费流向主题的数据。

让我们做一张桌子

使用之前创建的Stream作为基础,创建一张Table。

CREATE TABLE RESP_STATUS_DAILY
WITH (VALUE_FORMAT='JSON')
AS SELECT 
 VirtualHost, Server, Status, Year, Month, Day, COUNT(*) AS Count
FROM PARSED_JSON_STREAM
GROUP BY VirtualHost, Server, Status, Year, Month, Day
HAVING COUNT(*) > 1;

如果您查看查询,就会明白,它以服务器名称、虚拟主机和状态码作为键进行每日聚合的形式。

这是一个用于聚合客户端IP的表格。
由于经常遇到类似DDoS攻击,所以这是一个用于实时聚合访问来源的表格。

CREATE TABLE CLIENT_COUNT
WITH (VALUE_FORMAT='JSON')
AS SELECT 
 VirtualHost, Server, ClientIP, Year, Month, Day, Hour, Min,  COUNT(*) AS Count
FROM PARSED_JSON_STREAM
GROUP BY VirtualHost, Server, Status, Year, Month, Day, Hour, Min
HAVING COUNT(*) > 1;    

现在,在确认一下Kafka的情况之后,我认为生成了几个Topic。一个是与表名相同的查询,而另一个是一个奇怪而又长的东西。
前者无论如何,后者是用于从流中创建表时进行聚合的Topic,我们可以通过仔细观察名称来了解这一点。
虽然我还没有检查内容,但数据量几乎与原来的Topic相同,所以我认为这可能是预处理阶段。
如果不注意,盲目地大量生成表可能会立即耗尽磁盘空间,所以要注意磁盘扩展。

也许需要更加仔细地设计一下,再制作一个Table。如果只是这个程度的话,与其通过ksql勉强实现,不如从另一个应用程序中消费数据,然后写入RDB,这样可能更加现实。

让我们向数据库发送查询。

我们来对已经建立好的表发出一个查询试试看。

ksql> select VirtualHost, Server, Status, Year, Month, Day, Count from DAILY_RESP_STATUS WHERE Status >= 400 LIMIT 10;
xxx-xxx.domain.net | server03 | 403 | 2018 | 5 | 18 | 4
xxx-xxx.domain.net | server01 | 403 | 2018 | 5 | 18 | 8
xxx-xxx.domain.net | server01 | 401 | 2018 | 5 | 18 | 3
xxx-xxx.domain.net | server02 | 403 | 2018 | 5 | 18 | 6

总的来说,结果并没有什么特别值得一提的。

目前,在对表进行查询时无法应用聚合操作。(截至2018年05月19日)

目前的KSQL还不支持对表进行聚合操作(虽然这个功能很快就会来临,但只限于非窗口的情况)。

最后。

作为实时数据处理的一部分,我介绍了ksql的说明和试验(只是尝试一下)。这个应用程序类似于更方便地使用KafkaStreams!尽管它刚发布不久,但它似乎有很多限制,但如果能够灵活使用,那么它对减少KafkaStreams的工作量可能非常有用,因为以前需要使用Scala/Java。

这里没有提到,但也有REST API可用,所以如果能成功创建表,我会希望从应用程序中频繁调用。

广告
将在 10 秒后关闭
bannerAds