Apache Flink SQL的实践编程

这篇文章是Apache Flink基础教程系列的一部分,重点介绍了使用5个示例来实践Flink SQL编程。

此博客是从英语版翻译而来。您可以在此处查看原版。我们部分使用了机器翻译。如有错误,请您指正,不胜感激。

比起傑克·吳

本文介绍了Ververica的开源sql-training项目和基于Flink 1.7.2的教程练习。通过Flink的SQL编程实践,总共使用了五个示例,主要涵盖了以下内容。

    • SQL CLIクライアントの使い方。

 

    • ストリーム上でSQLクエリを実行する方法。

 

    • ウィンドウアグリゲートと非ウィンドウアグリゲートを実行して、それらの違いを理解する。

 

    • SQLを使用してKafkaデータを消費する方法。

 

    • SQLを使用してKafkaとElasticSearchに結果を書き込む方法。

 

    この記事では、すでに基本的なSQLの知識があることを前提としています。

营造一个环境。

由於這個教程是基於Docker的,所以不需要安裝其他軟件或附加程序。此練習不依賴Java環境、Scala或IDE。

注意:默认情况下,Docker配置的资源可能不足,可能导致Flink作业在运行中崩溃。因此,建议将Docker内的资源设置为3~4GB,CPU设置为3~4核。

image.png

在本次教程中,我们使用Docker Compose来安装环境,以容纳各种服务的容器。

    • Flink SQL Client:クエリを送信し、結果を可視化します。

 

    • Flink JobManagerとTaskManager:Flink SQLタスクを実行します。

 

    Apache Kafka:入力ストリームの生成と結果ストリームの書き込み。
image.png
    • Apache ZooKeeper: Kafkaの依存関係。

 

    ElasticSearch: 結果を書き込む。

有Docker Compose的配置文件可供使用。您可以直接下载docker-compose.yml文件。

接下来,打开命令行窗口,输入保存docker-compose.yml文件的目录,并执行以下命令。

    Linux & MacOSの場合
docker-compose up -d
    Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d

docker-compose命令会为每个必要的容器启动。在首次启动Docker时,它会自动从Docker Hub下载镜像(大约2.3GB),这可能需要一些时间。之后启动将非常迅速。操作成功后,命令行将显示以下输出,您也可以访问Flink Web UI的http://localhost:8081。

image.png

执行Flink SQL CLI客户端

运行以下命令,进入Flink SQL CLI。

docker-compose exec sql-client ./sql-client.sh

执行此命令后,Flink SQL CLI客户端将在容器内启动。然后,将显示以下形式的“welcome”界面。

スクリーンショット 2020-08-24 11.51.06.png

介绍数据

一些表和数据已经预先注册在Docker Compose中,可以通过执行”show tables”来查看。通过使用文章中的Rides表数据,您可以查看包括时间和地点在内的出租车行驶记录数据流。您可以执行”DESCRIBE Rides”命令来查看表的结构。

Flink SQL> DESCRIBE Rides;
root
 |-- rideId: Long           // Ride ID (including two records, an input and an output)
 |-- taxiId: Long           // Taxi ID 
 |-- isStart: Boolean       // Start or end
 |-- lon: Float             // Longitude
 |-- lat: Float             // Latitude
 |-- rideTime: TimeIndicatorTypeInfo(rowtime)     // Time
 |-- psgCnt: Integer        // Number of passengers

有关Rides表的详细信息,请参阅training-config.yaml。

一例:过滤器

假设现在我们想要查阅纽约的驾驶记录。 de

注意:在Docker环境中,预先定义了一些内置函数,例如isInNYC(lon, lat)用于检查纽约是否具有经纬度,以及toAreaId(lon, lat)用于将特定地区进行转换。

因此,可以使用isInNYC快速过滤纽约的驾驶记录。在SQL CLI中执行以下查询。

SELECT * FROM Rides WHERE isInNYC(lon, lat)

SQL CLI 将提交 SQL 任务到 Docker 集群;它会持续从数据源(例如Kafka中的Rides流)中拉取数据,并通过isInNYC进行必要数据的过滤。此外,SQL CLI 进入可视化模式,始终刷新和实时显示过滤结果。

スクリーンショット 2020-08-24 11.53.33.png

另外,您还可以访问 http://localhost:8081,来确认 Flink 作业的执行状态。

例2:团队汇总。

另一个要求是计算搭载不同数量乘客的驾驶事件的数量。例如,搭载1名乘客的驾驶事件的数量,搭载2名乘客的驾驶事件的数量等等。

在中国以本地语言将以下内容改写为一种选择:

使用psgCnt对乘客数进行分组,并使用COUNT(*)计算每个组的事件数。注意在分组之前,需要过滤在纽约发生的驾驶记录数据(isInNYC)。在SQL CLI中执行以下查询。

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;

SQL CLI中的结果可视化如下;结果每秒钟都会变化,但乘客的最大数不可以超过6人。

スクリーンショット 2020-08-24 11.54.14.png

第三个例子:对 Window 进行汇总。

为了对纽约的交通流动进行持续监控,需要每5分钟计算每个区域进入的车辆数量。我们只关注至少有5辆车进入的区域,尤其是主要关注的区域。

由于需要对窗口进行聚合处理(每5分钟进行一次),因此需要使用滚动窗口语法。

需要根据进入每个区域的车辆数量,按AreaId进行分组。在分组之前,应使用isStart字段对输入的车辆行驶记录进行过滤,并使用COUNT(*)来计算车辆数量。

车辆数量超过5辆的区域。这是根据统计值使用SQL HAVING句设置的过滤条件。

最终查询如下。

SELECT 
  toAreaId(lon, lat) AS area, 
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end, 
  COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat) and isStart
GROUP BY 
  toAreaId(lon, lat), 
  TUMBLE(rideTime, INTERVAL '5' MINUTE) 
HAVING COUNT(*) >= 5;

当使用SQL CLI执行查询时,可视化结果如下所示。按区域和窗口结束时间结果不会改变,但每5分钟会生成新的窗口批处理并输出结果。

由於在Docker環境中讀取源代碼而實現了10倍的速度,因此在演示中每30秒會生成一個新窗口的批處理程式(相對於實際速度的比較)。

スクリーンショット 2020-08-24 13.05.53.png

Window Aggregate和Group Aggregat的区别是什么?

从例子2和例子3的结果可以看出,Window Aggregate和Group Aggregate有明显的区别。主要区别在于,Window Aggregate的输出结果只在窗口结束时显示,并且输出结果是最终值且不会改变。输出流为Append流。

然而,Group Aggregate在处理数据的每一部分时都会输出最新结果。结果具有与数据库内数据相同的更新状态,其输出流是更新流。

另一个区别是窗口上有水印。使用该功能,可以确保稳定的状态大小,并精确地知道哪个窗口已过期以清除过期状态。

然而,在Group Aggregate的情况下,由于无法得知哪些数据已过期,导致状态大小变得无限大,因此在生产环境下并不稳定。因此,请在Group Aggregate的作业中设置状态的TTL。

スクリーンショット 2020-08-24 13.08.21.png

例如,若要每天计算各门店的实时PV,通常不会使用前一天的状态,所以需要将TTL设置为超过24小时。

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id

如果TTL值太小,会清除有用的状态和数据,引发数据准确性的问题。这也是用户需要考虑的参数之一。

示例4:写入到Kafka的附加流

在前文中,我们解释了Window Aggregate和Group Aggregate的区别,以及Append Stream和Update Stream的区别。在Flink中,我们可以将更新流写入支持更新的外部存储系统,如MySQL、HBase和ElasticSearch。

将Append流写入到任意的存储系统或者类似Kafka的日志系统中是可行的。

假设我们想要将“每10分钟的乘客数量”这个数据流写入Kafka。

Kafka 的结果表 Sink_TenMinPsgCnts 已经事先定义 (请参考 training-config.yaml 获取完整表定义)。

在执行查询之前,请执行以下命令,监视写入TenMinPsgCnts主题的数据。

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning

您可以在滚动窗口中记录每10分钟的乘客数量。您可以使用INSERT INTO Sink_TenMinPsgCnts命令直接将查询结果写入结果表中。

INSERT INTO Sink_TenMinPsgCnts 
SELECT 
  TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,  
  TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
  CAST(SUM(psgCnt) AS BIGINT) AS cnt 
FROM Rides 
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

如前所述,将TenMinPsgCnts的数据主题以JSON格式写入Kafka。

スクリーンショット 2020-08-24 13.10.35.png

在ElasticSearch中写入更新流。

最后,将持续更新的更新流写入ElasticSearch (ES)。我希望将”各区域出发次数”的流写入ES。

在ElasticSearch中定义了结果表Sink_AreaCnts(有关表的完整定义,请参考training-config.yaml)。这个表只包含两个字段:areaId和cnt。

同样地,您也可以使用INSERT INTO直接将查询结果写入到Sink_AreaCnts表中。

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt 
FROM Rides 
WHERE isStart
GROUP BY toAreaId(lon, lat);

当使用SQL CLI执行先行查询时,Elasticsearch会自动创建area-cnts索引。Elasticsearch提供了REST API,您可以访问以下URL。

    • area-cnts インデックスを表示するには: http://localhost:9200/area-cnts

area-cntsインデックスの統計情報を表示するには: http://localhost:9200/area-cnts/_stats

area-cntsインデックスの内容を返すには: http://localhost:9200/area-cnts/_search

エリア 49791 の車両数を表示するには: http://localhost:9200/area-cnts/_search?q=areaId:49791

执行查询之后,我们可以看到某些统计值(_all.primaries.docs.count和_all.primaries.docs.deleted)在不断增加:http://localhost:9200/area-cnts/_stats。

简而言之

在本文中,我们编写了一个使用Docker Compose快速开始Flink SQL编程的指南。还比较了Window Aggregate和Group Aggregate之间的区别,并介绍了将这两种类型的作业写入外部系统的方法。

如果您感兴趣的话,可以基于Docker环境进行更深入的实践,例如执行自定义的UDF、UDTF和UDAF,或查询其他内置的源表。

阿里巴巴云拥有两个数据中心在日本,并且拥有超过60个可用区的亚太地区领先的云基础设施提供商(2019年加特纳报告)。
详细了解阿里巴巴云,请点击这里。
阿里巴巴云日本官方页面。

广告
将在 10 秒后关闭
bannerAds