尝试使用Delta Live Tables的变更数据捕获(CDC)
我会浏览这里发布的笔记本。生成发生变化的数据,并使用Delta Live Tables(DLT)的更改数据捕获(CDC)将仅变更反映到后续表格中。
以下是翻译版本。我们只翻译了Python版本的管道笔记本。
数据准备
使用Faker生成CDC的基础数据。
Faker的安装
%pip install Faker
生成虚拟数据
from pyspark.sql import functions as F
from faker import Faker
from collections import OrderedDict
import uuid
# データの格納パス: 適宜変更してください
folder = "/tmp/takaaki.yayoi@databricks.com/demo/cdc_raw"
#dbutils.fs.rm(folder, True)
try:
dbutils.fs.ls(folder)
except:
print("フォルダーが存在しません、データを生成中...")
fake = Faker()
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)
fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
fake_address = F.udf(fake.address)
operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
fake_id = F.udf(lambda: str(uuid.uuid4()))
df = spark.range(0, 100000)
df = df.withColumn("id", fake_id())
df = df.withColumn("firstname", fake_firstname())
df = df.withColumn("lastname", fake_lastname())
df = df.withColumn("email", fake_email())
df = df.withColumn("address", fake_address())
df = df.withColumn("operation", fake_operation())
df = df.withColumn("operation_date", fake_date())
df.repartition(100).write.format("json").mode("overwrite").save(folder+"/customers")
df = spark.range(0, 10000)
df = df.withColumn("id", fake_id())
df = df.withColumn("transaction_date", fake_date())
df = df.withColumn("amount", F.round(F.rand()*1000))
df = df.withColumn("item_count", F.round(F.rand()*10))
df = df.withColumn("operation", fake_operation())
df = df.withColumn("operation_date", fake_date())
# 同じIDが生成された顧客とJoin
df = df.withColumn("t_id", F.monotonically_increasing_id()).join(spark.read.json(folder+"/customers").select("id").withColumnRenamed("id", "customer_id").withColumn("t_id", F.monotonically_increasing_id()), "t_id").drop("t_id")
df.repartition(10).write.format("json").mode("overwrite").save(folder+"/transactions")
确认数据
spark.read.json(folder+"/customers").display()
管道的初始配置设置
在这里,我们将初始化用于Delta Live Tables管道的数据库和文件路径。
-- データベースの作成: パイプラインのターゲットに指定してください
DROP DATABASE IF EXISTS cdc_data_taka CASCADE;
CREATE DATABASE cdc_data_taka;
# DLTパイプラインのストレージの初期化: パイプラインのストレージに指定してください
dbutils.fs.rm("/tmp/takaaki.yayoi@databricks.com/demo/dlt_cdc", True)
CDC管线的实施和执行。
整个流程如下所示。
CDC的重要性
Change Data Capture(CDC)是捕获数据库或数据仓库中记录更改的过程。这些更改通常被视为删除、添加和更新等操作。
从数据库中导出数据库转储,然后将其导入到Lakehouse/数据仓库/数据湖中是一种简单的数据复制方法,但这并不是可扩展的方法。
只捕捉数据库中的更改并将这些更改应用于目标数据库才是变更数据捕捉(CDC)。CDC可以减少开销并支持实时分析。它可以实现增量加载而无需进行批量加载。
疾控中心的方法
1 – 开发内部制造CDC流程。
复杂的任务:CDC数据复制并不是一次性简单的解决方案。由于数据库提供商之间的差异,记录格式不同,且访问日志记录不便,因此CDC变得困难。
定期维护:编写CDC过程脚本是第一步。必须对上述变化进行定期映射的定制解决方案需要进行维护,这需要大量时间和资源。
过度负担:企业的开发人员已经面临着对官方查询的过分依赖。构建自定义CDC解决方案需要额外的工作量,这可能会对正在产生利润的项目产生影响。
2 – CDC工具的运用:Debezium、Hevo Data、IBM Infosphere、Qlik Replicate、Talend、Oracle GoldenGate、StreamSets等。
在这个演示代码库中,将利用CDC工具传送过来的CDC数据。CDC工具能够读取数据库日志,因此在更新特定列时不需要开发者的帮助。
像Debezium这样的CDC工具可以捕获所有已更改的行。在Kafka日志中,应用程序会记录自开始使用以来的数据更改历史。
如何将您的SQL数据库与Lakehouse同步?
使用CDC工具、自动加载程序、DLT管道来实施CDC流程。
-
- CDCツールがデータベースログを読み込み、変更を含むJSONメッセージを生成し、Kafkaに対して変更説明を伴うレコードをストリーミング
-
- KafkaがINSERT, UPDATE, DELETEオペレーションを含むメッセージをストリーミングし、クラウドオブジェクトストレージ(S3、ADLSなど)に格納
-
- Auto Loaderを用いてクラウドオブジェクトストレージからメッセージをインクリメンタルにロードし、生のメッセージとして保存するためにブロンズテーブルに格納
- 次に、クレンジングされたブロンズレイヤーテーブルに APPLY CHANGES INTO を実行し、後段のシルバーテーブルに最新の更新データを伝搬
以下是处理来自外部数据库的CDC数据的实现。请注意输入可以是任何格式,包括像Kafka这样的消息队列。
像Debezium这样的CDC工具的输出是怎样的?
表达变更数据的JSON消息具有与以下列表相似的有趣字段:
-
- operation: オペレーションのコード(DELETE, APPEND, UPDATE, CREATE)
- operation_date: それぞれのオペレーションのアクションがあった日付、タイムスタンプ
Debezium的输出包含以下字段(此演示中不包含):
-
- before: 変更前の行
- after: 変更後の行
请查看这个参考资料以了解可能的领域。
使用Auto Loader(云文件)来实现增量数据加载。
通过模式的更新,与外部系统的协调可能变得困难。外部数据库会进行模式的更新和列的添加、更新,我们的系统需要对这些变化具有强大的适应性。Databricks的自动加载器(cloudFiles)可以立即处理模式的估算和演化。
通过使用Auto Loader,可以从云存储中导入数百万个文件,并支持大规模的模式估计和演化。在这个笔记本中,我们将利用Auto Loader来处理流数据(和批量数据)。
我们可以使用Auto Loader来创建一个流水线,以便导入由外部提供商提供的原始JSON数据。
分布式分类账技术(DLT)是使用Python编写的代码结构。
为了使用相关方法,您需要导入dlt Python模块。在这里,我们也导入了pyspark.sql.functions。
DLT的表格、视图和关联设置是使用装饰器进行设置的。
如果你没有接触过Python的装饰器,那么你可以将其视为在Python脚本中与下一个被表示为函数互动的以@开始的函数或类。
@dlt.table装饰器是将Python函数转换为Delta Live表的基本方法。
以下我们将探索到达数据。
青铜桌-自动装填器和DLT
## ストレージパスから取得する生のJSONデータを含むブロンズテーブルの作成
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
source = spark.conf.get("source")
@dlt.table(name="customer_bronze",
comment = "クラウドオブジェクトストレージのランディングゾーンからインクリメンタルに取り込まれる新規顧客",
table_properties={
"quality": "bronze"
}
)
def customer_bronze():
return (
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.load(f"{source}/customers")
)
银层 – 经过清洁的桌子(受到限制的应用)
@dlt.table(name="customer_bronze_clean",
comment="クレンジングされたブロンズ顧客ビュー(シルバーになるテーブルです)")
@dlt.expect_or_drop("valid_id", "id IS NOT NULL")
@dlt.expect("valid_address", "address IS NOT NULL")
@dlt.expect_or_drop("valid_operation", "operation IS NOT NULL")
def customer_bronze_clean():
return dlt.read_stream("customer_bronze") \
.select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
银色桌子的具象化
在名为customer_silver的银色表中,包含了最新的视图。它是原始表的副本。
在DLT管道设置中添加并启用applyChanges配置,以明确启用此功能,从而将Apply Changes Into操作传播到后续的银层。
删除不必要的客户记录 – 银色表 – 使用 DLT Python
dlt.create_target_table(name="customer_silver",
comment="クレンジング、マージされた顧客",
table_properties={
"quality": "silver"
}
)
dlt.apply_changes(
target = "customer_silver", # マテリアライズされる顧客テーブル
source = "customer_bronze_clean", # 入力のCDC
keys = ["id"], # upsert/deleteするために行をマッチする際の主キー
sequence_by = col("operation_date"), # 最新の値を取得するためにオペレーション日による重複排除
apply_as_deletes = expr("operation = 'DELETE'"), # DELETEの条件
except_column_list = ["operation", "operation_date", "_rescued_data"] # メタデータカラムの削除
)
创建管道
以下是本次设置的JSON示例。
{
"id": "7c2607ec-47a2-48d9-8572-9e498656b573",
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "/Users/takaaki.yayoi@databricks.com/20221212_dlt_cdc/2-Retail_DLT_CDC_Python"
}
}
],
"name": "dlt_cdc_retail",
"storage": "/tmp/takaaki.yayoi@databricks.com/demo/dlt_cdc",
"configuration": {
"source": "/tmp/takaaki.yayoi@databricks.com/demo/cdc_raw"
},
"target": "cdc_data_taka"
}
执行流水线
执行流水线后,将分析事件日志以监控事件日志和血统数据。
事件日志的分析
每个 DLT 管道都与其在管道中定义的存储位置拥有自己的事件表。通过这个表,可以确认正在发生的情况,并检查通过管道的数据质量。
安装
%sql
-- 適宜データベースを指定してください
CREATE TABLE IF NOT EXISTS cdc_data_taka.demo_cdc_dlt_system_event_log_raw using delta LOCATION '$storage_path/system/events';
select * from cdc_data_taka.demo_cdc_dlt_system_event_log_raw;
Delta Live Tables的期望分析
通过Delta Live Tables,可以通过期望跟踪数据质量。这些期望会与DLT的日志事件一起存储为技术表。为了分析这些信息,可以简单地创建一个视图。
1 – 事件日志的分析
在详情栏中包含了发送到事件日志的每个事件的元数据。根据事件类型,字段会有所不同。以下是行程疲劳的示例。
user_action
パイプラインの作成のようなアクションが行われた際に生じるイベントflow_definition
パイプラインのデプロイメントやアップデートが行われた際に生じるイベントであり、リネージュ、スキーマ、実行計画情報を持ちますoutput_dataset
と input_datasets
出力のテーブル/ビュー、前段のテーブル/ビューflow_type
コンプリートフローか追加のフローかexplain_text
Sparkの実行計画flow_progress
データフローがデータバッチの処理を開始あるいは完了した際に生じるイベントmetrics
現在はnum_output_rows
が含まれていますdata_quality
(dropped_records
), (expectations
: name
, dataset
, passed_records
, failed_records
)この特定のデータセットに対するデータ品質ルールの結果の配列が含まれます * expectations
事件日志 – 根据时间戳排序的原始事件
-- 適宜データベースを指定してください
SELECT
id,
timestamp,
sequence,
event_type,
message,
level,
details
FROM cdc_data_taka.demo_cdc_dlt_system_event_log_raw
ORDER BY timestamp ASC
2- DLT的衍生物
%sql
-- タイプと最新の変更ごとに出力データセットを一覧します
-- 適宜データベースを指定してください
create or replace temp view cdc_dlt_expectations as (
SELECT
id,
timestamp,
details:flow_progress.metrics.num_output_rows as output_records,
details:flow_progress.data_quality.dropped_records,
details:flow_progress.status as status_update,
explode(from_json(details:flow_progress.data_quality.expectations
,'array<struct<dataset: string, failed_records: bigint, name: string, passed_records: bigint>>')) expectations
FROM cdc_data_taka.demo_cdc_dlt_system_event_log_raw
where details:flow_progress.data_quality.expectations is not null
ORDER BY timestamp);
select * from cdc_dlt_expectations
%sql
----------------------------------------------------------------------------------------
-- リネージュ
----------------------------------------------------------------------------------------
SELECT max_timestamp,
details:flow_definition.output_dataset,
details:flow_definition.input_datasets,
details:flow_definition.flow_type,
details:flow_definition.schema,
details:flow_definition.explain_text,
details:flow_definition
FROM cdc_data_taka.demo_cdc_dlt_system_event_log_raw e
INNER JOIN (
SELECT details:flow_definition.output_dataset output_dataset,
MAX(timestamp) max_timestamp
FROM cdc_data_taka.demo_cdc_dlt_system_event_log_raw
WHERE details:flow_definition.output_dataset IS NOT NULL
GROUP BY details:flow_definition.output_dataset
) m
WHERE e.timestamp = m.max_timestamp
AND e.details:flow_definition.output_dataset = m.output_dataset
-- AND e.details:flow_definition IS NOT NULL
ORDER BY e.details:flow_definition.output_dataset
;
3 – 質量指標
%sql
select sum(expectations.failed_records) as failed_records,
sum(expectations.passed_records) as passed_records,
expectations.name
from cdc_dlt_expectations
group by expectations.name
4 – 检查商业汇总信息
%python
import plotly.express as px
expectations_metrics = spark.sql("""select sum(expectations.failed_records) as failed_records,
sum(expectations.passed_records) as passed_records,
expectations.name
from cdc_dlt_expectations
group by expectations.name""").toPandas()
px.bar(expectations_metrics, x="name", y=["passed_records", "failed_records"], title="DLT expectations metrics")
总结
通过使用Delta Live Tables,您可以轻松利用变更数据捕获(Change Data Capture)将只传播更改的数据传递到下一个表。通过利用事件日志数据,还可以监控管道处理的状态,从而构建能够同时保持数据管道质量并执行多种处理的管道。请务必尝试使用DLT进行CDC!
Databricks 免费试用
达百克免费试用