在Databricks Delta Lake中简化Change Data Capture
使用Databricks Delta简化变更数据捕获 – 这是Databricks博客的翻译。
建议阅读Databricks Delta Lake关于高效UPSERT和DELETE的MERGE命令的使用方法,以实现高效UPSERT。
在Databricks中,常见的用例是在客户端执行从多个来源到一系列Databricks Delta表的变更数据捕获(CDC)。这些来源可以是在本地或云上存在的操作性数据库或数据仓库。以下是连接它们的粘合剂-通过以下方法生成的变更集。
-
- Oracle GoldenGateやInformatica PowerExchangeのようなETL
-
- ベンダーが提供する変更テーブル(例:Oracle Change Data Capture)
- insert/update/deleteトリガーを用いて変更をキャプチャするユーザー管理のデータベーステーブル
背景( )
数据捕获和变更数据捕获(CDC)是指捕捉一系列数据源中的变更,并将这些变更合并到目标表中,通常是数据仓库的过程。这些过程通常在夜间、小时间隔或更短的周期内(例如每15分钟)执行。在这里,我们将这个周期称为刷新周期。
在刷新周期内,特定表中的变更记录集被称为变更集。最后,具有相同主键的一系列记录在变更集中被称为记录集。直观地说,它们被视为针对最终表中同一记录的不同更改。
表1:表格T在2018年01月01日17:00:00时的变更集C
表1展示了特定时间点上表格T的变更集C。变更集C有4个列。
-
- FLAGは変更のタイプI/U/D(insert/update/delete)を示します。
-
- IDはレコードを一意に識別します。
-
- VALUEはレコードが更新された際の値を示します。
- CDC_TIMESTAMPはいつレコードが追加、更新、削除されたのかを示しています。ターゲットテーブルTはFLAG以外は同じスキーマとなっています。
在这个变更集中,ID1的记录进行了添加、更新和删除(从第一行到第三行)。因此,ID=1的记录集包含了三条记录。ID2的记录仅进行了更新,ID3的记录被删除。可以认为记录ID2和ID3是事先添加的。
以前的CDC是指Databricks Delta Lake之前的变更数据捕获。
- 我們已經在超負荷的Oracle實例上進行負載,並且對於何時以及如何執行ETL工作有限制。基於純粹的Parquet表(在Databricks Delta Lake之前),同時執行的能力有限,因此夜間的刷新率最適合。
使用Databricks Delta Lake进行CDC。
针对某位客户,我们在ETL管道上实施了CDC技术,确保其最大化和高频率的刷新。根据这位客户的情况,Informatica每15分钟将65个表中的变化作为更改集写入S3。这些更改集本身很小(<1000条记录),但目标表非常庞大。其中大约有6个表具有5000万至1亿行的记录,其余的表行数小于5000万。为与Informatica保持同步,我们在Oracle中每15分钟运行一次该管道。在Databricks Delta中,最初我们认为由于S3的延迟问题,处理时间可能会达到1小时左右,但令人高兴的是,通过调整群集的规模,我们成功实现了30分钟甚至15分钟的刷新。
使用Insert Overwrite命令
这种方法的基本思想是保持一个用于累积特定记录集的分拣表,以及保留用户可以查询的最新快照的最终表。
选项一: 图1:从Informatica的源到Databricks Delta云存储的Insert Overwrite流程
在每个刷新循环中,Spark的作业将执行两个INSERT操作。
Insert(Insert 1): 当該リフレッシュ周期で、S3やKafkaからチェンジセットを読み込み、ステージングテーブルに変更をINSERTする。
Insert Overwrite(Insert 2): ステージングテーブルから全てのレコードセットの現在のバージョンを取得し、ファイナルテーブル上で対応するレコードを上書きする。
选项1: 图2:从Kafka源码到Databricks Delta云存储的Insert Overwrite流程
CDC実践者熟悉的分类方案是一种处理不同类型更新的方法,被称为”slowly changing dimensions”(SCDs)。我们的暂存表与SCD Type2方案类似,而最终表则与SCD Type1方案类似。
实施
让我们来详细看一下这两个步骤。首先是第一个插入步骤。
%scala
val changeSets = Array(file1, file2, …)
spark.read.parquet(changeSets :_*).createOrReplaceTempView("incremental")
%sql
INSERT INTO T_STAGING
PARTITION(CREATE_DATE_YEAR)
SELECT ID, VALUE, CDC_TIMESTAMP
FROM INCREMENTAL
在这里,我们定义了一个用于向第二个命令的INSERT INTO提供变更集的临时视图。关于INSERT INTO,除了PARTITION之外,其他部分都相对容易理解,我们来详细看一下这个。
在云数据存储和HDFS中,记录保存在文件中,所以请记住更新的单位也是文件。对于Databricks Delta Lake,正如在这篇文章中所述,这些文件是Parquet文件。当需要更新记录时,Spark需要加载整个文件并进行重新写入。因此,将更新局部化以减少更新目标文件的数量非常重要。基于以上原因,我们通过在PARTITION子句(Azure|AWS)中指定分区列,将暂存表和最终表都分区到最小化通过CDC操作的行数的列上,以便Databricks Delta Lake能将记录添加到T_STAGING的适当分区。
接下来,让我们看一下第二个插入。
%sql
INSERT OVERWRITE TABLE T_FINAL
PARTITION(CREATE_DATE_YEAR)
SELECT ID, VALUE, CDC_TIMESTAMP
FROM (
SELECT A.*,
RANK() OVER (PARTITION BY ID ORDER BY CDC_TIMESTAMP DESC) AS RNK
FROM T_STAGING A.*
WHERE CREATE_DATE_YEAR IN (2018, 2016, 2015)
) B
WHERE B.RNK = 1 AND B.FLAG < > 'D'
让我们从T_STAGING读取的内部查询开始。请记住,staging表包含针对特定记录集的任意数量的insert、update和delete操作。由于这些更改是跨多个刷新周期添加到staging表中的,因此这些更改可能是由特定的change set(例如,针对表1中ID=1的3个更改)或多个change set引起的。通过内部RANK和外部筛选器B.RNK=1 and B.FLAG <> ‘D’,我们可以确保以下内容。
-
- 只选择特定记录集中的最新更改。
- 如果最新更改为’D’,则不将整个记录集添加到最终阶段,以实现删除记录。
接下来,我们将注意到WHERE CREATE_DATE_YEAR IN (…)子句。通过与外部查询的PARTITION(CREATE_DATE_YEAR)配合使用,Databricks Delta Lake将确保仅覆盖特定的分区,如2018年、2016年和2015年,并且不会触及其他分区。上述示例中将分区硬编码以便更易理解,但在实际实施中,这些分区将作为Scala列表提供,并将动态生成查询与变更集相关的内容,如下所示。
val partitionsToOverwrite = spark.sql("select year(to_date(create_date, "MM/dd/yyyy")) from incremental")
...
spark.sql(s"""
INSERT OVERWRITE T_FINAL
...
WHERE CREATE_DATE_YEAR IN ( ${partitionsToOverwrite.mkString(",") )
...
""")
性能可以被理解为一个产品或系统在特定条件下的表现。
如前所述,Databricks Delta Lake通过CDC管道以并行的方式执行,使用户能够在查询中对数据进行一致性的视图。在这里,我们将介绍用于优化读取器和写入器的两个功能。
パーティションプルーニング: 上の2つ目のinsert(ライター)においては、Databricks Delta Lakeは、更新するパーティションのみを読み取り、再書き込みするために、PARTITIONの指定とWHERE句のINリストを参照します。実際、これによって操作されるテーブルの割合は半分、通常はそれ以下となり、2つ目のinsertにおけるT_FINALの更新およびT_STAGINGのSELECTの局所化を改善します。
データスキッピング/ZORDERインデックス: T_FINALにクエリーを行うユーザーは、BIツールからアドホックのSQLクエリーと多岐に渡ります。ここでは、WHERE句にパーティションカラムCREATE_DATE_YEARが含まれる場合もあれば含まれない場合もあります。
%sql
SELECT …
FROM T_FINAL
WHERE COL1 = val and COL2 = val
在这种情况下,COL1和COL2都不包含在分区列中。但是,用户可以创建针对这两个列的Z-order索引。
OPTIMIZE T_FINAL ZORDER BY (COL1, COL2)
在内部,Databricks Delta Lake会根据Z值将Parquet文件进行聚类,以只访问包含COL1 = val和COL2 = val的文件,在上述查询中。
对于可以利用Z-order索引来扩展一系列查询的情况,我会提供两点说明。
-
- 上記のケースにおいては、COL1のみ(あるいはCOL2のみ)にフィルタリングを行うクエリーもインデックスの恩恵を受けることができます。これは、RDBMSの複合インデックスと異なり、Z-orderインデックスにおいては、インデックスされたカラムリストのプレフィクスに基づくフィルターに対するバイアスが無いためです。
- 上のケースと異なり、クエリーがパーティションカラムに対するフィルタリングを行う場合には、クエリー実行の際にパーティションプルーニングとZ-orderインデックスの両方が劇的に処理対象のファイルの数を削減します。
请参考这篇优秀的文章,了解数据跳跃和Z-order索引与分区修剪的配合工作方式。
同时有效
正如之前的文章所提到的,Databricks Delta Lake在云存储中引入了事务功能。您可以如下利用此功能:在覆盖分区的同时,Databricks Delta Lake会创建新的Parquet文件,并将旧的Parquet文件保留给正在查询数据的用户。在覆盖操作完成后启动的查询将引用新数据。Delta利用事务日志使查询引用一致的数据版本。
紧缩和清洁
随着时间的流逝,既老又不再使用的记录会在T_STAGING和T_FINAL中积累。例如,T_STAGING中RANK > 1的记录或被标记为旧的T_FINAL记录通过文件覆盖而变旧。这些记录不会影响查询的准确性,但随着时间的推移会使CDC和查询性能下降。幸运的是,在Databricks Delta Lake中,这种维护任务变得简单。删除T_FINAL中旧文件的操作如下所示。
%sql
VACUUM T_FINAL
如果未指定保留期参数(请参阅VACUUM文档:Azure|AWS),事务日志中不会保留任何超过7天的文件。这个时间足够长,足够认为不存在任何访问这些文件的读取器。
在T_STAGING中,对于排名大于1的所有记录,将其全部删除。一个简单的方法是将T_FINAL复制到T_STAGING中。
%sql
INSERT OVERWRITE T_STAGING SELECT * FROM T_FINAL
作为维护任务,您可以将上述两个命令以及上述讲解的OPTIMIZE命令整理到笔记本中,并将其安排为Databricks作业进行定时执行。
不久将发布(2018/10文章)的Databricks Runtime 5.0中,可以通过使用MERGE INTO来改善Databricks Delta Lake的性能,并支持记录的删除(D)操作。
管道的全面运营
作为一个平台,Databricks不仅可以构建ETL管道,还可以缩短将这些管道投入实际运营所需的时间。在这里,我们将介绍两个可以在实际运营CDC管道时利用的Apache Spark功能和有用技术。
设定驱动编程
在构建大规模应用程序时,通常采用的设计模式是通过配置来控制软件的行为(例如:YAML或JSON配置文件)。通过Spark的SQL、Scala、Python等通用编程语言的支持,可以将配置保存在表中,并实现基于配置的动态SQL生成,因此可以说这种设计模式非常适用于此。让我们来看看这在CDC上下文中是如何工作的。
首先,请记住我们的CDC管道中有65个表。每个65个表都是一行,列中包含了构建CDC SQL语句所需的信息的CONFIG表。
表2 – 用于执行一系列表的CDC管道的设置表格
为了获取特定表的配置信息,并为该表应用CDC逻辑,可以使用以下代码。
val hiveDb = “mydb”
val CONFIG_TABLE = “CONFIG”
// Table is a notebook input widget
val table=s"""${dbutils.widgets.get("wTable")}"""
val (partitionColumnExpression, partitionColumnAlias, rankExpression, isInsertOnly) = spark.sql(s"""
SELECT PARTITION_COLUMN_EXPRESSION, PARTITION_COLUMN_ALIAS, RANK_EXPRESSION, IS_INSERT_ONLY
FROM ${hiveDb}.${CONFIG_TABLE}
WHERE TABLE_NAME=LOWER('$table')
""").as[(String, String, String, Boolean)].head
...
/*
* Insert 1 above would look like following. Here, the table
* variable is set to T1 or T2 from the config table
*/
spark.sql(s"""
INSERT INTO ${table}_STAGING
PARTITION(${partitionColumnAlias)
SELECT ${projectListFromIncremental}
FROM INCREMENTAL
""")
...
// Insert 2 could look like
val partitionsToOverwrite = spark.sql(s"""SELECT DISTINCT ${partitionColumnExpression} FROM INCREMENTAL""").as[String].collect
spark.sql(s"""
INSERT OVERWRITE TABLE ${table}_FINAL
PARTITION(${partitionColumnAlias})
SELECT ${projectListFromIncremental}
FROM (SELECT A.*, RANK() OVER (${rankExpression}) AS RNK
FROM ${table}_STAGING A.*
WHERE ${partitionColumnAlias} IN (${partitionsToOverwrite.mkString(“,”) )
) B
WHERE B.RNK = 1 AND B.FLAG < > ‘D’
""")
笔记本工作流程和任务
假设上述处理已经被实现在一个名为ProcessIncremental的笔记本中。在这里,我们将使用笔记本工作流来为每个65个表执行处理,并检测出值得注意的变更集,并创建一个控制器笔记本来对它们执行ProcessIncremental。
val startDate = “20180101”
val tables = spark.sql(s"""
SELECT TABLE_NAME
FROM $hiveDb.$CONFIG_TABLE
""").as[String].collect.map(_.toLowerCase)
tables.foreach { tbl =>
val processTheseChangeSets = dbutils.notebook.run("GetNextChangeSets", 0, Map(
"wHiveDb" -> hiveDb,
"wTable" -> tbl,
"wStartDate" -> startDate
)
)
if(!processTheseChangeSets.isEmpty) {
val stats = dbutils.notebook.run("ProcessIncremental", 0, Map(
"wHiveDb" -> hiveDb,
"wIncrFiles" -> processTheseChangeSets,
"wTable" -> tbl)
)
)
}
只需要一个选项,以下是对原文的汉语本地化改写:
您可以轻松地将控制器笔记本作为Databricks作业进行调度,以便按照您喜欢的频率运行CDC管道。最后,尽管循环是串行的,但您可以使用par命令或Scala的功能将串行集合转换为并行集合,从而轻松地将其更改为并行循环。
总结
在这篇文章中,我们解释了将CDC工具(如Oracle GoldenGate、Informatica PowerExchange)和供应商(如Oracle Change Data Capture)管理的变更表和由用户的insert/update/delete触发器管理的变更表与Databricks Delta Lake合并时的参考架构。为了优化用户的读取体验,我们讨论了在将这些变更反映到Databricks Delta Lake时使用的Spark SQL的深挖、以及两个性能方面的考虑(分区和Z-order索引)、以及伴随的压缩和清理等辅助部分的考虑。此外,通过配置驱动的ETL流水线构建和笔记本工作流以及作业驱动的工作流的全面运用,我们看到了Databricks如何加速工作流的构建和运营。
Databricks 免费试用
Databricks 免费试用