在比上午搭乘公交车所需时间更短的时间内搭建实时分析流水线

在比上午乘坐公交车时间更短的时间内构建一个实时分析管道。

2022年的10月31日

以更短的时间构建实时分析管道,胜过早上乘公交。

我們將詳細介紹使用Apache Kafka®和Apache Flink®構建的事件流架構,以及將數據實時傳輸到實時分析數據倉庫ClickHouse®的過程。

如果有一个有趣的数据集,有很多思路和问题,并且时间紧迫,您会怎么做?在几分钟内提供高质量的洞察,希望简化创新。我们将利用Digitransit的数据集,在Aiven平台的工具和开源服务的支持下,展示一个示例来说明它的工作原理。

此外,戴夫还举办了一场关于这个话题的详细解说网络研讨会。录像可在此处观看:

构建数据分析管道的方法

需要覆盖什么内容

デジトランスプラットフォーム:**我々のソリューションのデータアーキテクチャコンポーネントとサードパーティAPIからの入力データソースをチェックする。

地下を掘る: 私たちの旅を始めるにあたり、私たちのチューブマップを見てみましょう!

最初の足: MQTTフィードを購読するためのKafka Connect®フレームワークをご覧ください。

Apache Flink:でマッピング、フラット化、いくつかの変換を行う。

最終目的地:では、分析クエリを実行できるようにデータを安全に取得します。

まとめ: 余分なクレジットを使って自分で試してみてください。Aiven ConsoleでCatch the Bus Challengeに挑戦し、他のリソースもご覧ください。

从开源平台转向铁路平台:我们演示的基础

Digitransit是一個支援芬蘭旅客每天使用的網站和移動應用程式。這個平台的特別之處(也是Aiven喜愛的原因之一)就是完全開源。他們提供了一套很棒的API,用於地圖製作、地理編碼、服務警報、行程更新和幾乎即時的車輛位置資料供應。

获取此数据源,并在Aiven上快速构建管道,利用这些事件来学习有关芬兰公共交通状况的新信息吧!

image

正如您所看到的,数据管道架构可能变得复杂(如果您想更深入了解,请参考future.com的优质信息)。但是在这里,我们强调了我们解决方案中使用的数据管道架构组件。

    • 入力データソースはDigitransitが提供するサードパーティAPIで、MQTTストリームとしてデータを送信します。

 

    • Apache Kafka®」(https://aiven.io/kafka)によるイベント・ストリーミング・プラットフォームを使用する。

ストリーム処理はApache Flink®を使用します。
リアルタイム分析データベースはClickHouse®を使用します。

而且所有这些都在Aiven上运行!

挖掘地底

这是全面的建设计划地图。

image

除了之前提到的核心服务外,我们还会在途中停靠几个附加站点,希望您能理解。

    • Digitransit MQTTフィードを購読するためのKafka Connect®フレームワーク

時系列データベースとしてのM3DB

ダッシュボード用のGrafana®インスタンス

M3DB和Grafana被用于监控流水线的状态,以确认其健康。可以检查消息接收率和消费者滞后,并为它们设置警报阈值。

在试用账户上,当您登录Aiven控制台时,您将看到所有可用的开源托管服务。

Kafka Connect®线路的详细设置。

在Kafka Connect®服务内部执行Stream Reactor的MQTT源连接器。不需要自定义编码,只需指定正确的端点、传递主题的详细信息并进行最小限度的设置以告知连接器要写入的数据格式(为简单起见,使用JSON,但也可以选择Avro)。

当执行此操作时,将在Kafka主题中显示此类记录。

image

我们拥有营运者ID、车辆ID、速度、方向、位置作为纬度/经度以及其他各种字段。然而,这里存在一些问题。

首先,整个数据以JSON对象的形式封装为VP(车辆位置)对象。其次,这些ID并没有太大意义。我们想知道运输操作员的名字。

为了进行前期处理,我要去 Apache Flink。

image

现在让我们通过Apache Flink来进行流前处理。以下是我们正在做的一些事情:

    • 各レコードをPostgreSQLデータベースにある小さな参照データセットに結合する。

 

    • 入れ子を取り除き、フラット化された値のセットを別のトピックに書き込みます。

 

    簡単な変換もできます。例えば、速度の値はメートル毎秒で報告されるが、マイル毎時やキロ毎時の方がいいかもしれない。ここで、元のカラムと一緒に新しい計算カラムを書くことができます。

这些转换被定义为SQL查询。在Kafka主题和Postgres参考数据表上定义了作为叠加层的Flink表,并从中选择并按照您喜欢的方式进行连接。

提供するもの 。
最终,Aiven 提供 ClickHouse 服务。

最后,我想要在数据到达ClickHouse后立即执行分析查询。由于ClickHouse具有出色的Kafka集成,因此我们可以在这里定义一个具有Kafka引擎类型的表,并运行消费者组,以便始终读取我们希望的主题上到达的新数据。

查询这个表时,只显示最新消费的记录,这仅仅是显示并没有意义。除非重置消费者群组的细节,否则不能查询相同的数据两次。因此,在这个表上创建一个ClickHouse材料化视图。

image

ClickHouse在后台始终从Kafka读取数据。这些数据经过物化视图的传输,并被持久化,因此可以无限次执行查询。

总结

如果具备这些工具和技能,就可以在自己的数据系统中创造价值并推动创新,能够在地下航行。

建立流媒体异常检测系统

使用Aiven for Apache Flink®进行数据转换,使用Aiven for Apache Kafka®进行数据流处理,使用Aiven for PostgreSQL®进行数据存储和查询。

阅读教程

参考书目

    • AivenのTerraformプロバイダーを使って、GitHubでこの記事で見たものをビルドする

データをダウンサンプリングすることなく、膨大なデータに対して高速なレスポンスタイムで分析を実行する方法を学ぶ。Apache Kafka®とAiven for ClickHouse®を接続する

Aiven Terraformプロバイダーを使用したAiven for Apache Kafka / Aiven for Apache Flink統合のセットアップ

如有其他意见或问题,请通过Twitter或LinkedIn与我们联系。请关注博客的RSS订阅并查看文档。如希望获取有关Aiven和我们的服务的最新消息,以及有关开源的相关信息,请订阅我们的月刊新闻简报!

广告
将在 10 秒后关闭
bannerAds