Databricks(Delta lake)的数据输入输出实现模式-速查表
这篇文章的内容基于在Data+AI World Tour 2021举行的技术研讨会“打造简单易懂的数据流水线”实施的内容。
这篇文章涉及的代码示例笔记本
读取CSV文件
CSV文件 => /databricks-datasets/lending-club-loan-stats/*.csv
# データフレームにCSVを読み込む (スキーマは推定する)
df = (
spark.read.format('csv')
.option('Header', True)
.option('inferSchema', True)
.load('/databricks-datasets/lending-club-loan-stats/*.csv')
)
# 既存のDeltaに追記
df.write.format('delta').mode('append').save('/tmp/daiwt2021/loan_stats.delta')
# 書き込んだDeltaテーブルの確認
df_delta = spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
display(df_delta)
ETL数据整理
# 簡単な処理を行い、結果を保存
from pyspark.sql.functions import col, expr
df_raw = spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
(
df_raw
.select('loan_amnt', # 必要なカラムの抽出
'term',
'int_rate',
'grade',
'addr_state',
'emp_title',
'home_ownership',
'annual_inc',
'loan_status')
.withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)')) # データ型の変換
.withColumnRenamed('addr_state', 'state') # カラム名変更
.write
.format('delta') # Deltaで保存(silverテーブル)
.mode('overwrite')
.save('/tmp/daiwt2021/loan_stat_silver.delta')
)
display( spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_silver.delta') )
我也能使用SQL,并且还可以直接进行可视化操作。
%sql
DROP DATABASE IF EXISTS daiwt2021_kitamura CASCADE;
CREATE DATABASE daiwt2021_kitamura;
USE daiwt2021_kitamura;
CREATE TABLE loan_stat_silver
USING delta
LOCATION '/tmp/daiwt2021/loan_stat_silver.delta';
SELECT state, loan_status, count(*) as counts
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC
使用Python接收SQL的结果,并创建一个摘要表。
df_gold = spark.sql('''
SELECT state, loan_status, count(*) as counts
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC
''')
# Deltaで保存(goldテーブル)
df_gold.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stat_gold.delta')
# goldテーブルをHiveへ登録
spark.sql('''
CREATE TABLE IF NOT EXISTS loan_stat_gold
USING delta
LOCATION '/tmp/daiwt2021/loan_stat_gold.delta'
''')
从BI中引用Delta表
使用BI工具连接到Delta表的JDBC/ODBC端点。
将Delta表的内容写入DWH/RDBMS。
mysql_url = f'jdbc:mysql://{hostname}:{port}/{database}?user={username}&password={password}'
df_gold = spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_gold.delta')
(
df_gold.write.format("jdbc")
.option("url", mysql_url)
.option("dbtable", "test")
.mode("overwrite")
.save()
)
将Delta表的内容写入文件中
df_gold = spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_gold.delta')
df_gold.write.format('csv' ).save('/tmp/daiwt2021/loan_stat_gold.csv')
df_gold.write.format('json' ).save('/tmp/daiwt2021/loan_stat_gold.json')
df_gold.write.format('parquet' ).save('/tmp/daiwt2021/loan_stat_gold.parquet')
df_gold.write.format('avro' ).save('/tmp/daiwt2021/loan_stat_gold.avro')
整合(汇总)之前的ETL代码
以下代码可以简单地进行ETL处理。
### 1. Rawテーブル (CSV => bornze)
df_raw = (
spark.read.format('csv')
.option('Header', True)
.option('inferSchema', True)
.load('/databricks-datasets/lending-club-loan-stats/LoanStats_2018Q2.csv')
)
df_raw.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stats.delta')
sql("CREATE TABLE loan_stat_raw USING delta LOCATION '/tmp/daiwt2021/loan_stat.delta';")
### 2. データ整形 (bronze => silver)
from pyspark.sql.functions import col, expr
df_silver = (
spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
.select('loan_amnt', 'term', 'int_rate', 'grade', 'addr_state', 'emp_title', 'home_ownership', 'annual_inc', 'loan_status')
.withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)'))
.withColumnRenamed('addr_state', 'state')
)
df_silver.write.format('delta').mode('overwrite').save( '/tmp/daiwt2021/loan_stat_silver.delta')
sql("CREATE TABLE loan_stat_silver USING delta LOCATION '/tmp/daiwt2021/loan_stat_silver.delta';"")
### 3. サマリテーブル (silver => gold)
df_gold = spark.sql('''
SELECT state, loan_status, count(*) as counts
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC
''')
df_gold.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stat_gold.delta')
sql("CREATE TABLE IF NOT EXISTS loan_stat_gold USING delta LOCATION '/tmp/daiwt2021/loan_stat_gold.delta'")
加载JSON
JSON文件 => /mnt/training/ecommerce/events/events-500k.json
df_json = (
spark.read.format('json')
.option('inferSchema', True)
.load('/mnt/training/ecommerce/events/events-500k.json')
)
# deltaへの書き込み
df_json.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/events.delta')
# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/events.delta') )
读取LOG数据
可以适应任何文本格式。
访问日志(Web服务器访问日志)=> s3://databricks-ktmr-s3/var/log/access.log.*.gz
13.66.139.0 - - [19/Dec/2020:13:57:26 +0100] "GET /index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53 HTTP/1.1" 200 32653 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"
157.48.153.185 - - [19/Dec/2020:14:08:06 +0100] "GET /apache-log/access.log HTTP/1.1" 200 233 "-" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"
157.48.153.185 - - [19/Dec/2020:14:08:08 +0100] "GET /favicon.ico HTTP/1.1" 404 217 "http://www.almhuette-raith.at/apache-log/access.log" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"
...
from pyspark.sql.functions import split, regexp_extract, col, to_timestamp
raw_df = spark.read.text('s3://databricks-ktmr-s3/var/log/access.log.*.gz')
# Regexでログデータをパース
split_df = (
raw_df.select(
regexp_extract('value', r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1).alias('src_ip'),
regexp_extract('value', r'\[(.+?)\]', 1).alias('time_string'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 1).alias('method'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 2).alias('path'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 3).alias('version'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+)', 1).cast('int').alias('status_code'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+)', 2).cast('int').alias('content_size'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+) "(.+?)" "(.+?)" "(.+?)"', 3).alias('host2'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+) "(.+?)" "(.+?)" "(.+?)"', 4).alias('user_agent')
)
.withColumn( 'timestamp', to_timestamp( col('time_string'), 'dd/MMM/yyyy:HH:mm:ss Z') )
.drop('time_string')
.filter( col('timestamp').isNotNull() )
)
# Deltaに書き出す
split_df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/access_log.delta')
# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/access_log.delta') )
读取图像文件(二进制文件)
图像文件 => /databricks-datasets/cctvVideos/train_images/label=0/*.jpg
image_df = (
spark.read.format('binaryFile')
.option('mimeType', 'image/*')
.load('/databricks-datasets/cctvVideos/train_images/label=0/*.jpg')
)
spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
image_df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/train_image.delta')
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/train_image.delta') )
从关系数据库管理系统/数据仓库中读取
jdbcHostname = "example.databricks.training"
jdbcPort = 5432
jdbcDatabase = "training"
jdbcUrl = f"jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"
query = '''
(
SELECT * FROM training.people_1m
WHERE salary > 100000
) emp_alias
'''
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProps)
df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/salary.delta')
# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/salary.delta') )
加载流媒体
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col
import re
# JSONデータのスキーマ定義
schema = StructType([
StructField("channel", StringType(), True),
StructField("comment", StringType(), True),
StructField("delta", IntegerType(), True),
StructField("flag", StringType(), True),
StructField("geocoding", StructType([ StructField("city", StringType(), True), StructField("country", StringType(), True), StructField("countryCode2", StringType(), True), StructField("countryCode3", StringType(), True), StructField("stateProvince", StringType(), True), StructField("latitude", DoubleType(), True), StructField("longitude", DoubleType(), True), ]), True),
StructField("isAnonymous", BooleanType(), True),
StructField("isNewPage", BooleanType(), True),
StructField("isRobot", BooleanType(), True),
StructField("isUnpatrolled", BooleanType(), True),
StructField("namespace", StringType(), True),
StructField("page", StringType(), True),
StructField("pageURL", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("url", StringType(), True),
StructField("user", StringType(), True),
StructField("userURL", StringType(), True),
StructField("wikipediaURL", StringType(), True),
StructField("wikipedia", StringType(), True),
])
# 読み込み
stream_df = (
spark.readStream.format('kafka') # Kafkaをソースと指定
.option('kafka.bootstrap.servers', 'server123.databricks.training:9092')
.option('subscribe', 'en')
.load()
)
# ELTをして、Deltaに書き込む
(
stream_df
.withColumn('json', from_json(col('value').cast('string'), schema)) # Kafkaのバイナリデータを文字列に変換し、from_json()でJSONをパース
.select(col("json.*")) # JSONの子要素だけを取り出す
.writeStream # writeStream()でストリームを書き出す
.format('delta') # Deltaとして保存
.option('checkpointLocation', '/tmp/daiwt2021/stream.checkpoint') # チェックポイント保存先を指定
.outputMode('append') # マイクロバッチの結果をAppendで追加
.start('/tmp/daiwt2021/stream.delta') # start()でストリーム処理を開始 (アクション)
)
# 確認
df = spark.readStream.format('delta').load('/tmp/daiwt2021/stream.delta')
display( df )
将随时更新追加的文件作为流进行处理。
ファイルのディレクトリ => s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*
=> s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/的文件目录
-
- オブジェクトストレージ上にファイルが随時アップロードされるパターンもよくあります。
-
- Databricksでは、ストレージに新たな追加されたファイルを認識して、そのファイルのみ読み込むことができます。
- この場合、ストレージをストリーミングのソースとして利用することになります。
# (スキーマを楽して取得する)
df=spark.read.format('json').load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/2014-01-01')
tweet_schema = df.schema
选择1)持续随时加载
df_autoloader = (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', 'json')
.option('cloudFiles.maxBytesPerTrigger', '50KB') # 一度に読むサイズ上限
.schema(tweet_schema)
.load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)
(
df_autoloader.writeStream.format('delta')
.option('checkpointLocation', '/tmp/daiwt2021/tweet.checkpoint')
.option('maxFilesPerTrigger', 25) # 一度に読むファイル数上限
.outputMode('append')
.trigger(processingTime='2 seconds') # 2秒に一度処理
#.trigger(once=True) # 一度だけ処理
.start('/tmp/daiwt2021/tweet.delta')
#.awaitTermination() # async => sync
)
只在执行一次时,如果有新文件,则进行加载。
df_autoloader = (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', 'json')
#.option('cloudFiles.maxBytesPerTrigger', '50KB') # 一度に読むサイズ上限 <== Trigger Onceの時には無視される!
.schema(tweet_schema)
.load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)
(
df_autoloader.writeStream.format('delta')
.option('checkpointLocation', '/tmp/daiwt2021/tweet.checkpoint')
#.option('maxFilesPerTrigger', 25) # 一度に読むファイル数上限 <== Trigger Onceの時には無視される!
.outputMode('append')
.trigger(once=True) # 一度だけ処理
.start('/tmp/daiwt2021/tweet.delta')
.awaitTermination() # async => sync <== Trigger Onceの実行が終わるまでブロックさせる
)
# 確認
spark.read.format('delta').load('/tmp/daiwt2021/tweet.delta').count()
工作・自動化・流水线化
我们可以使用Databricks来管理和定期执行数据流水线的功能(Jobs)。
让我们将以下代码转化为一个Job。
### 1. ストレージから更新ファイルだけを認識して、Deltaテーブルに追記する
df_autoloader = (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', 'json')
.schema(tweet_schema)
.load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)
(
df_autoloader.writeStream.format('delta')
.option('checkpointLocation', '/tmp/daiwt2021/job/tweet.checkpoint')
.outputMode('append')
.trigger(once=True) # 一度だけ処理
.start('/tmp/daiwt2021/job/tweet.delta')
.awaitTermination() # async => sync
)
### 2. 上記のDeltaテーブルからサマリのDeltaテーブルを作る
df=spark.read.format('delta').load('/tmp/daiwt2021/job/tweet.delta')
(
df.groupBy('lang').count()
.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/job/tweet_summary.delta')
)
sql("CREATE TABLE IF NOT EXISTS tweet_summary USING delta LOCATION '/tmp/daiwt2021/job/tweet_summary.delta'")
# 確認
display(
spark.read.format('delta').load('/tmp/daiwt2021/job/tweet_summary.delta')
)