将流式数据导入Delta Lake变得简单

简化将流数据导入Delta Lake——Databricks博客翻译。

这本书是摘录译本,无法保证内容的准确性。关于确切的内容,请参阅原文。

大多数的商业决策对时间非常敏感,需要将来自不同类型的实时数据进行关联。在适当的时机捕捉适当的数据是实现及时决策的关键。时间敏感的数据源包括物联网传感器、社交媒体、点击流、数据库中的变更数据捕获等多个方面。为了从这些数据中获得关键洞察,首先需要将其纳入数据湖中。这些数据的关键特性是以无限连续的形式不断到达,这种形式被称为数据流。本文将重点讨论如何将数据流引入数据湖。

高水平的数据导入流程 de

来自各种数据源的流式数据会在被导入数据湖之前被存储在消息总线或对象存储中进行预备阶段。从预备区域获取的数据将会由Apache Spark的结构化流式处理(SS)将数据写入数据湖。预备环境分为两个主要部分,下文将讨论云对象存储和消息总线。

クラウドオブジェクトストレージは、クラウドにおけるセキュア、高信頼かつスケーラブルな永続化レイヤーです。Amazon S3、Azure ADLS/Blobストレージ、Google Cloud Storage(GCS)がクラウドで広く利用されているオブジェクトストレージです。通常、イベントはバッチにまとめられてクラウドオブジェクトストレージのファイルとして格納され、到着するたびにニアリアルタイムでこれらのファイルを取り込む必要があります。クラウドストレージからのニアリアルタイムのデータ取り込みを必要とするユースケースのサンプルとしては、通話データの記録、IoTイベントログなどがあります。

メッセージバスシステムは、パブリッシャー/サブスクライバモデルで動作する疎結合のデータバッファーを提供します。Apache Kafka、Apache Pulsar、Azure EventHub、AWS Kinesis、GCP Pub/Subなどがオープンソース、クラウドにおけるメッセージバスシステムの例となります。メッセージバスシステムは低いパブリッシュのレーテンシーや複数のコンシューマーをサポートするための大規模なファンアウトを保証するので、リアルタイムイベントの捕捉には適しています。メッセージバスを使うアプリケーションのサンプルには、クリックストリーム、クレジットカード不正検知などが含まれます。これらのアプリケーションにおいては、後段の処理ですぐに洞察を提供できる様にリアルタイムでデータを取り込む必要があります。

根据图示,来自各种源系统的数据首先到达对象存储或消息总线的暂存区域。这些数据将通过消息总线的流式连接器或对象存储的自动加载程序导入到数据湖中。Delta Live Tables(DLT)使用简单的声明性方法创建可靠的批处理数据和流式数据管道,并完全管理大规模基础架构。它基于Spark的结构化流处理,但本文不涵盖这个主题。接下来的章节将详细介绍从这些源获取流式数据时面临的一些挑战。

从对象存储中导入数据:自动加载器

通常情况下,文件与批量数据导入相关。然而,在许多情况下,从各种来源连续地以文件格式导入数据到基于云的对象存储成为一种常见的模式。通常,这种模式在需要允许几分钟延迟的近实时处理的用例中被广泛应用。此外,还需要确保仅处理一次、重新处理失败的导入作业、时间旅行以及模式漂移等非功能性要求。

为了解释从云对象存储加载到数据湖时面临的挑战,并改善客户体验并检测欺诈支付,我们考虑设计一个能够实时处理信用卡支付的系统。通常,来自不同支付渠道的交易会被合并成对象存储中的文件。为了进一步进行分析,我们需要将这些文件导入数据湖。由于这些是支付交易,所以需要确保它们只被处理一次,并在处理失败时能够保证不重复处理。在AWS云上进行这种处理时,需要使用以下复杂的架构。

    • Amazon SQS(Simple Queue Service)を用いて、スケーラブルな方法でAmazon S3に到着する支払いトランザクションファイルを追跡

 

    • Amazon SQLからデータを取り出し、後段の処理をトリガーするAmazon Lambda Functions

 

    制御テーブルを用いた支払いトランザクションファイルのステータスの監査

主要挑战是跟踪大量到达对象存储的文件,确保对这些文件进行一次性处理,并管理来自不同支付渠道的不同模式。

Auto Loader是一种云对象存储方式,当新文件到达时,它会逐步处理,用户不需要开发定制应用程序,使流数据的导入变得简单。通过保持内部状态,它可以追踪已处理的文件。如果处理失败,它将使用这个状态信息从上次处理的文件开始。此外,如果需要重新执行、重新处理数据,它还提供了处理目录中现有文件的选项。Auto Loader的优点包括以下几点。

    • 数十億のファイルを処理できる能力

 

    • 計算リソースの最適利用を用いた非同期バックフィル

 

    • パフォーマンスを改善するための最適化ディレクトリ一覧

 

    • スキーマの推定とスキーマドリフトへの対応

 

    自動ファイル通知サービスを活用することによるコスト効率の高いファイル通知

Auto Loader的工作原理是怎样的?

Auto Loader有两种模式来检测新文件:文件通知和目录列表。

文件通知:Auto Loader可以订阅来自输入目录的事件通知,并自动设置队列服务。文件通知模式具有高性能,可以扩展到存储大量文件的输入目录,但需要额外的云访问权限。该选项适用于文件不按顺序到达的情况,无需设置队列或通知。要启用此模式,请将选项cloudFiles.useNotifications设置为true,并指定创建云资源所需的访问权限。有关文件通知的详细信息,请参阅此处。

目录列表:另一种确定新文件的方法是列出由Auto Loader设置的输入目录。通过使用目录列表模式,可以在没有对数据访问权限之外的权限的情况下启动Auto Loader流。在Databricks Runtime 9.1及更高版本中,Auto Loader会自动检测到文件在云存储中是否按顺序到达,并大大减少检测新文件所需的API调用总数。在默认模式下,会在连续7次增量目录列表之后执行完整目录列表。但是,您可以使用设置cloudFiles.backfillInterval来调整完整目录列表的频率。使用设置cloudFiles.useIncrementalListing可以明确启用或禁用增量列表。一旦明确启用此设置,Auto Loader将不会触发完整列表。有关目录列表的详细信息,请参阅此处。

从消息总线中提取数据。

流媒体数据本身没有边界。这些数据被暂存为缓冲区,并作为消息总线进行分阶段,多个生产者可以写入,大量的消费者可以进行异步通信。消息总线广泛应用于低延迟的使用案例,如欺诈检测、金融资产交易、游戏等。知名的消息总线包括Apache Kafka、Apache Pulsar、Azure EventHubs、Amazon Kinesis、Google Cloud Pub/Sub等。然而,连续数据采集面临可伸缩性、可恢复性、容错性等问题。

在从消息总线到湖屋的提取中,通过使用明确的Spark结构化流(SS)流水线,可以实例化适用于消息总线的正确源连接器和湖屋的接收器连接器。在这种情况下,主要问题是吞吐量和容错性。

延迟:并不总是有利于实现低延迟。选择适当的延迟可以降低成本,但准确性和成本是一种权衡关系。Spark结构化流处理可以通过定义流数据处理的触发器来逐步控制。通过缩短触发器周期,可以实现Spark结构化流处理作业的低延迟。建议根据延迟要求和数据源到达速度的平衡来设置结构化流处理的触发器间隔。如果设置一个非常小的触发器周期,系统可能会过于频繁地检查新数据的到达情况。

Spark的结构化流有三种触发类型。

    • デフォルト: デフォルトでは、Sparkの構造化ストリーミングは前回のバッチが完了するとすぐに次のバッチを処理します。多くのユースケースには、デフォルトのトリガーがみなさまの要件に適しています。

 

    • 固定インターバル: 固定インターバルを用いることで、ユーザーが指定したインターバルでジョブを処理することができます。通常は、固定インターバルは特定の時間待ったのちに、大規模なマイクロバッチを実行します。

 

    一回: データが固定の周期で到着するケースにおいて、一日中クラスターを起動し続けておくことがリソースの無駄になる場合があります。選択肢の一つとしてジョブをバッチモードで実行するというものがあります。しかし、バッチよりもSpark構造化ストリーミングジョブをOnceモードあるいはAvailableNowモードで実行する方がメリットがあります。これらの実行設定によって、クラスターを稼働し続ける必要はなく、定期的にクラスターを起動し、データを処理したのちにクラスターをシャットダウンすることで、劇的にコストを削減することができます。これはバッチジョブと似ていますが、処理したデータの記録、テーブルレベルの原子性を維持することによる耐障害性、処理にまたがるステートフルなオペレーションなど追加のメリットを提供します。

为了实现在Spark结构化流处理中实现高吞吐量,有多个参数可进行调整。除了选择触发器类型外,关键参数是数据提取作业的并行度。为了实现高吞吐量,可以增加消息总线的分区数。通常情况下,Spark的消息总线分区和Apache Kafka之间有1:1的映射关系。然而,在AWS Kinesis中,消息被预取到内存中,因此Kinesis的分片和Spark任务之间不存在直接映射关系。

我们可以考虑一个现实世界的示例,通过调整批处理大小和分区数来实现高吞吐量。以银行用例为例,假设通过流式作业在一天中处理实时交易。然而,一些实时接收到的事件可能是不准确的。因此,为了解决问题,将在一天结束时执行修正批处理。修正批处理使用相同的流式代码进行处理,但在不同的作业实例中执行。与流动的连续流相比,为了实现更高的吞吐量,增加了分区数和批处理大小。

耐障害性:由于Spakr结构化流式处理在微批处理中执行作业,因此具备实现耐障害性的两个不同优点。

    • 他のエグゼキューター上でタスクを再スケジューリングすることで、タスクは障害から効率的に復旧することができます。

 

    決定論的タスクは、同じタスクを行う複数の処理実行が同じ出力を提供することで、一回のみ処理されることを保証します。

在Spark的结构化流处理中,可以通过查询的检查点存储位置来恢复失败的作业。通过检查点存储位置上的偏移量,可以精确地从故障点重新启动作业。要在查询中指定检查点的存储位置,可以使用`option(“checkpointLocation”, “dbfs://checkpointPath”)`。

利用可重放的数据源和幂等的接收器,Spark结构化流式处理可以实现在生产级应用中满足一次处理语义的要求。

总结

在Lake House中进行流数据的采集是实现时间敏感决策的第一步。本文将流数据源分类为连续文件流和消息总线服务。Auto Loader利用Spark结构化流来简化从文件源实时采集数据的过程,并提供自动检测文件到达、处理大规模数据的可扩展性、模式推测和高效数据采集等功能。对于从消息总线服务中采集数据的情况,Spark结构化流提供了与各云服务提供商所使用的大部分消息总线服务集成的可靠数据采集框架。大多数生产级应用程序需要在最小化成本和实现高精度之间权衡延迟和吞吐量。

Databricks 免费试用

Databricks 免费试用

广告
将在 10 秒后关闭
bannerAds