Delta Live Tables的概念
以下是Delta Live Tables概念的翻译 | AWS上的Databricks [截至2022/8/1]
在这本书中,我们将介绍使用Delta Live Tables的基本概念,以便能够有效地使用它。
管道
在Delta Live Tables中,处理执行的主要单元是管道。管道是连接数据源和目标数据集的有向非循环图(DAG)。可以使用SQL查询、Spark SQL或Koalas数据框架的Python函数来定义Delta Live Tables数据集的内容。可以在管道中定义执行所需的设置。作为选项,还可以在定义数据集时指定数据质量约束。
使用Databricks笔记本实现Delta Live Tables的管道。您可以在一个笔记本或多个笔记本中实现管道。在一个笔记本中,所有查询都必须使用Python或SQL实现,但可以使用组合了Python笔记本和SQL笔记本的多个笔记本来设置管道。每个笔记本共享输出数据的存储位置,可以从管道的其他笔记本引用数据集。
可以使用Databricks Repos来存储和管理Delta Live Tables的笔记本。为了能够通过Databricks Repos来管理流水线的创建,需要进行以下设置。
-
- SQLノートブックの先頭にコメント– Databricks notebook sourceを追加します。
- Pythonノートブックの先頭にコメント# Databricks notebook sourceを追加します。
请参阅Delta Live Tables中有关创建、执行和管理管道的信息。有关设置多个笔记本的管道示例,请参阅设置多个笔记本的管道部分。
查询
通过定义数据源和目标数据集来定义数据转换,Delta Live Tables 的查询可以使用 Python 或 SQL 实现。
期望
为了指定对数据集内容的数据质量控制,使用期望值。与传统的数据库的CHECK约束不同,它可以防止添加违反约束的记录,期望值为处理违反数据质量要求的数据提供了灵活性。这种灵活性使得可以处理并存储不完美的数据,以及需要符合严格质量要求的数据。
您可以定义期望值,以保留验证失败的记录、删除记录或停止流水线。
管道设置
管道设置是以JSON格式定义的,可以包含以下执行管道所需的参数。
-
- Delta Lakeでターゲットデータセットを作成するためのテーブルとビューを定義するクエリーを含む(ノートブック形式の)ライブラリ
-
- 処理に必要となるテーブルとビューが格納されるクラウドストレージ上の格納場所。この格納場所には、DBFSあるいは指定する場所を設定できます。
- データの処理を行うSparkクラスターの追加設定
请查看Delta Live Tables的详细设置。
数据集
在Delta Live Tables中,存在两种类型的数据集,分别是视图和表。
-
- ビューはSQLにおける一時ビューと類似のものであり、何かしらの計算処理のエイリアスとなります。ビューを用いることで、複雑なクエリーをより小さく理解しやすいクエリーに分割することができます。また、1つ以上のテーブルに対するソースとして特定の変換処理を再利用することができます。パイプライン内でのみビューを利用することができ、インタラクティブにクエリーを実行することはできません。
- テーブルは従来のマテリアライズドビューと似たようなものです。Delta Live Tablesランタイムは自動でテーブルをDelta形式で作成し、テーブルを作成するクエリーの最新の結果でこれらのテーブルが更新されることを保証します。
你可以将视图或表定义为实时或流式实时视图和表。
实时表和实时视图会始终反映其所定义的查询结果。这包括在定义表或视图的查询被更新以及输入数据源被更新时。与传统的物化视图不同,所有实时表和实时视图都会在可以优化计算资源和时间时进行计算。
流媒体实时表和流媒体实时视图只处理在最后一个管道更新之后添加的数据。流媒体表和流媒体视图是有状态的。如果定义表的查询发生变化,新的查询将处理新数据,但不会重新计算现有数据。
流媒体直播平台在以下情境中具有价值。
-
- データの保持: ストリーミングライブテーブルは、例えば、Apache KafkaやAmazon Kinesisのような入力データソースでのデータ保持期間が短かったとしても、無限にデータを保持することができます。
- データソースの進化: KafkaからKinesisにデータソースが移行したとしても、データを保持することができます。
可以将表格公开,以便在后续的消费者搜索和查询中使用。
管线更新
创建了管道后可以执行。启动更新。更新将执行以下操作。
-
- 適切な設定でクラスターを起動します。
-
- 定義されたすべてのテーブルとビューを特定し、不正なカラム名、依存関係の欠如、構文エラーなどの解析エラーをチェックします。
- 最新のデータを用いてすべてのテーブルとビューを作成、更新します。
更新的表、视图以及它们的更新方式取决于更新类型。
Refresh all(全てをリフレッシュ): それぞれの入力データソースの現在状態を反映するために、すべてのライブテーブルが更新されます。すべてのストリーミングライブテーブルでは、新規の行がテーブルに追加されます。
Full refresh all(全てをフルリフレッシュ): それぞれの入力データソースの現在状態を反映するために、すべてのライブテーブルが更新されます。すねてのストリーミングライブテーブルでは、Delta Live Tablesはすべてのテーブルの全データをクリアーし、ストリーミングソースからすべてのデータをロードしようとします。
Refresh selection(選択してリフレッシュ): refresh selectionの挙動はrefresh allと同じですが、選択したテーブルのみにリフレッシュを実行することができます。選択されたライブテーブルは、それぞれの入力データソースの現在状態を反映するために更新されます。選択されたストリーミングライブテーブルでは、テーブルに新規行が追加されます。
Full refresh selection(選択してフルリフレッシュ): full refresh selectionの挙動はfull refresh allと同じですが、選択したテーブルのみにフルリフレッシュを実行することができます。選択されたライブテーブルの入力データソースの現在の状態を反映するために更新されます。選択されたストリーミングライブテーブルでは、それぞれのデータオブの全データをクリアし、ストリーミングソースからすべてのデータをロードしようとします。
在现有的实时表中,更新操作与物化视图中的SQL刷新具有相同的行为。而在新建的实时表中,这个行为与SQL的创建操作相同。
如果管道处于触发模式,则系统将在所有管道表更新之后停止处理。
当触发模式的更新成功后,每个表都将被保证根据更新开始时的数据进行更新。
在需要低延迟的场景中,可以将管道设置为连续模式进行更新。有关您的管道执行模式的详细信息,请参阅连续和触发管道。
连续触发管线 ,
Delta Live Tables支持两种执行模式。
-
- トリガー(triggered)パイプラインは、現時点で利用可能なデータを用いてそれぞれのテーブルをアップデートし、パイプラインを実行しているクラスターを停止します。Delta Live Tablesは自動でテーブル間の依存関係を解析し、外部ソースからの読み込みを行う計算処理を起動します。パイプライン内のテーブルは、依存しているデータソースが更新された後に更新されます。
- 連続(continuous)パイプラインは入力データの変更に合わせて継続的にテーブルをアップデートします。アップデートが起動すると、手動で停止されるまで処理を継続します。連続パイプラインには常時稼働のクラスターが必要となりますが、後段のコンシューマーが最新のデータを利用できることを保証します。
在触发管道中,集群仅在执行管道所需的时间段内运行,从而可以减少资源消耗和成本。然而,在管道被触发之前,新数据不会被处理。连续管道则需要持续运行的集群,这会产生额外成本,但可以减少处理延迟。
通过管道设置中的continuous标志来控制执行模式。默认情况下,管道以触发执行模式运行。如果需要对管道表进行低延迟更新,则将continuous设置为true。
{
...
"continuous": true,
...
}
执行模式和计算表的类型是独立的。无论哪种执行模式,都可以更新完整表和增量表。
如果您的管道中有一些表没有严格的延迟要求,您可以使用配置中的pipelines.trigger.interval参数,独立设置更新频率。
spark_conf={"pipelines.trigger.interval", "1 hour"}
这个选项可以在管道更新之间不停止群集,但可以释放用于更新您的管道的其他表的资源。
在连续的管道中的表格和视图。
可以在连续执行的流水线中包含实时表、实时视图、流式实时表和流式实时视图。为了避免不必要的处理,流水线会自动监测依赖的Delta表,并且只在依赖表的内容发生变化时执行更新操作。
Delta Live Tables运行时无法检测非Delta数据源的变更。表格会定期更新,但通过将默认触发器周期设置为较大的值,可以避免在集群中出现过度重新计算的增量处理,从而减慢处理速度。
开发,生产模式
通过切换开发模式和生产模式,可以优化管道的执行。在开发模式下执行管道时,Delta Live Tables系统将执行以下操作。
-
- 再起動のオーバーヘッドを回避するためにクラスターを再利用します。
- エラーを即座に検知し修正できるように、パイプラインのリトライを無効化します。
在生产模式下,Delta Live Tables系统将执行以下操作。
-
- メモリーリークや古い認証情報のような回復可能な特定のエラーに対してクラスターを再起動します。
- クラスターの起動失敗など特定のエラーイベントに対しては、処理の実行をリトライします。
切換開發模式和生產模式只會控制叢集和管道執行的行為。儲存位置是設定在管道設定中的一部分,切換這個模式不會受到影響。
Databricks 强化的自动扩展
此功能目前处于公共预览阶段。
Databricks的增强自动缩放功能旨在优化集群的利用率,通过根据工作负载规模自动配置集群资源,在最小化处理延迟的同时,最大程度地降低对处理流程的影响。
強化自動縮放功能將在現有的集群自動縮放功能上增加以下功能。
-
- 強化オートスケーリングは、ストリーミングワークロードの最適化機能を実装しており、バッチワークロードのパフォーマンスを改善するための機能強化も行われています。これらの最適化によって、より効率的にクラスターを使用し、リソースの使用量を削減することでコストを節約します。
- 強化オートスケーリングは、シャットダウンの過程でタスクを失敗させないことを保証しつつ、積極的に使用率の低いノードをシャットダウンします。既存のクラスターオートスケーリング機能は、アイドル状態のノードのみをスケールダウンします。
必要的东西。
要使用强化自动扩展功能,
-
- 可以通过配置的configuration对象设置pipelines.advancedAutoscaling.enabled为”true”。
- 在默认的管道集群中添加autoscale设置。下面的例子中,我们设置了一个强化自动伸缩集群,拥有最少5台、最多10台的工作节点。max_workers必须大于min_workers。
请注意:
强化自动扩展功能仅可在默认集群中使用。即使在维护集群设置中添加了自动扩展设置,也将使用传统集群自动扩展功能。
如果在没有设置pipelines.advancedAutoscaling.enabled的情况下添加了自动扩展设置,Delta Live Tables将使用传统集群自动扩展功能。
{
"configuration": {
"pipelines.advancedAutoscaling.enabled": "true"
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 5,
"max_workers": 10
}
}
]
}
如果Pipeline处于连续模式,更改了自动缩放的设置后,Pipeline将自动重新启动。重新启动后的一段时间内可能会出现一些延迟增加的情况。在延迟增加的期间过后,根据指定的自动缩放设置,集群大小将会更新,Pipeline的延迟将恢复到先前的状态。
强化自动扩展已启用的流程监控
您可以利用Delta Live Tables的事件日志来监控强化自动扩展的度量标准。您可以在用户界面中参考这些度量标准。强化自动扩展事件的事件类型是autoscale。以下是事件的示例。
Autoscale cluster to <X> executors while keeping alive <Y> executors and retiring <Z> executors
クラスターマネージャがリサイズリクエストを許可Submitted request to resize cluster <cluster-id> to size <X>.
リサイズが完了Achieved desired cluster size <X> for cluster <cluster-id>.
此外,您还可以通过直接查询事件日志来参考强化自动扩容的事件。
-
- Sparkタスクのスロット使用率など、クラスターパフォーマンスのメトリクスをイベントログでクエリーするには、Cluster performance metricsをご覧ください。
- 強化オートスケーリングのオペレーションにおけるクラスターリサイズのリクエストとレスポンスをモニタリングするには、Databricks Enhanced Autoscaling eventsをご覧ください。
产品版本
您可以选择适用于管道要求的功能来运行管道,我们提供了以下可用的Delta Live Tables产品版本。
coreはストリーミングのデータ取り込みのワークロードで使用します。パイプラインでチェンジデータキャプチャ(CDC)やDelta Live Tablesのエクスペクテーションのような高度な機能が不要であれば、coreエディションを選択してください。
proはストリーミングのデータ取り込みとCDCワークロードで使用します。pro製品エディションはcoreのすべての機能をサポートしており、加えて、ソースデータの変更に基づいてテーブルを更新する必要があるワークロードもサポートしています。
advancedはストリーミングのデータ取り込み、CDC、エクスペクテーションを必要とするワークロードで使用します。advanced製品エディションは、coreとproの機能をサポートしており、Delta Live Tablesのエクスペクテーションによるデータ品質制約の強化もサポートしています。
您可以在创建和编辑管道时选择产品版。您可以在每个管道中选择不同的版本。
如果管道尝试使用不受支持的功能,在所选择的产品版本中不支持如期望一样,您将收到一个显示错误原因的消息。在这种情况下,您可以编辑管道以选择合适的版本。
Databricks 免费试用
Databricks 免费试用