Databricks(Delta lake)的数据输入输出实现模式-速查表

这篇文章的内容基于在Data+AI World Tour 2021举行的技术研讨会“打造简单易懂的数据流水线”实施的内容。

这篇文章涉及的代码示例笔记本

读取CSV文件

undefined

CSV文件 => /databricks-datasets/lending-club-loan-stats/*.csv

# データフレームにCSVを読み込む (スキーマは推定する)
df = (
  spark.read.format('csv')
  .option('Header', True)
  .option('inferSchema', True)
  .load('/databricks-datasets/lending-club-loan-stats/*.csv') 
)

# 既存のDeltaに追記
df.write.format('delta').mode('append').save('/tmp/daiwt2021/loan_stats.delta')

# 書き込んだDeltaテーブルの確認
df_delta = spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
display(df_delta)

ETL数据整理

undefined

# 簡単な処理を行い、結果を保存
from pyspark.sql.functions import col, expr

df_raw = spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')

(
  df_raw
  .select('loan_amnt', # 必要なカラムの抽出
            'term',
            'int_rate',
            'grade',
            'addr_state',
            'emp_title',
            'home_ownership',
            'annual_inc',
            'loan_status')
  .withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)')) # データ型の変換
  .withColumnRenamed('addr_state', 'state') # カラム名変更
  .write
  .format('delta') # Deltaで保存(silverテーブル)
  .mode('overwrite')
  .save('/tmp/daiwt2021/loan_stat_silver.delta')
)

display( spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_silver.delta') )

我也能使用SQL,并且还可以直接进行可视化操作。

%sql

DROP DATABASE IF EXISTS daiwt2021_kitamura CASCADE;
CREATE DATABASE daiwt2021_kitamura;
USE daiwt2021_kitamura;

CREATE TABLE loan_stat_silver
USING delta
LOCATION '/tmp/daiwt2021/loan_stat_silver.delta';

SELECT state, loan_status, count(*) as counts 
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC
undefined

使用Python接收SQL的结果,并创建一个摘要表。

undefined
df_gold = spark.sql('''

SELECT state, loan_status, count(*) as counts 
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC

''')

# Deltaで保存(goldテーブル)
df_gold.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stat_gold.delta')

# goldテーブルをHiveへ登録
spark.sql('''
  CREATE TABLE IF NOT EXISTS loan_stat_gold
  USING delta
  LOCATION '/tmp/daiwt2021/loan_stat_gold.delta'
''')

从BI中引用Delta表

undefined

使用BI工具连接到Delta表的JDBC/ODBC端点。

undefined

将Delta表的内容写入DWH/RDBMS。

undefined
mysql_url = f'jdbc:mysql://{hostname}:{port}/{database}?user={username}&password={password}'

df_gold = spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_gold.delta')

(
  df_gold.write.format("jdbc")
  .option("url", mysql_url)
  .option("dbtable", "test")
  .mode("overwrite")
  .save()
)

将Delta表的内容写入文件中

undefined
df_gold = spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_gold.delta')

df_gold.write.format('csv'     ).save('/tmp/daiwt2021/loan_stat_gold.csv')
df_gold.write.format('json'    ).save('/tmp/daiwt2021/loan_stat_gold.json')
df_gold.write.format('parquet' ).save('/tmp/daiwt2021/loan_stat_gold.parquet')
df_gold.write.format('avro'    ).save('/tmp/daiwt2021/loan_stat_gold.avro')

整合(汇总)之前的ETL代码

以下代码可以简单地进行ETL处理。

undefined
### 1. Rawテーブル (CSV => bornze)
df_raw = (
  spark.read.format('csv')
  .option('Header', True)
  .option('inferSchema', True)
  .load('/databricks-datasets/lending-club-loan-stats/LoanStats_2018Q2.csv') 
)
df_raw.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stats.delta')
sql("CREATE TABLE loan_stat_raw USING delta LOCATION '/tmp/daiwt2021/loan_stat.delta';")


### 2. データ整形 (bronze => silver)
from pyspark.sql.functions import col, expr

df_silver = (
  spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
  .select('loan_amnt', 'term', 'int_rate', 'grade', 'addr_state', 'emp_title', 'home_ownership', 'annual_inc', 'loan_status')
  .withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)'))
  .withColumnRenamed('addr_state', 'state')
)
df_silver.write.format('delta').mode('overwrite').save( '/tmp/daiwt2021/loan_stat_silver.delta')
sql("CREATE TABLE loan_stat_silver USING delta LOCATION '/tmp/daiwt2021/loan_stat_silver.delta';"")


### 3. サマリテーブル (silver => gold)
df_gold = spark.sql('''
  SELECT state, loan_status, count(*) as counts 
  FROM loan_stat_silver
  GROUP BY state, loan_status
  ORDER BY counts DESC
''')

df_gold.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stat_gold.delta')
sql("CREATE TABLE IF NOT EXISTS loan_stat_gold USING delta LOCATION '/tmp/daiwt2021/loan_stat_gold.delta'")

加载JSON

JSON文件 => /mnt/training/ecommerce/events/events-500k.json

undefined
df_json = (
  spark.read.format('json')
  .option('inferSchema', True)
  .load('/mnt/training/ecommerce/events/events-500k.json')
)

# deltaへの書き込み
df_json.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/events.delta')

# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/events.delta') )

读取LOG数据

可以适应任何文本格式。

访问日志(Web服务器访问日志)=> s3://databricks-ktmr-s3/var/log/access.log.*.gz

13.66.139.0 - - [19/Dec/2020:13:57:26 +0100] "GET /index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53 HTTP/1.1" 200 32653 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"
157.48.153.185 - - [19/Dec/2020:14:08:06 +0100] "GET /apache-log/access.log HTTP/1.1" 200 233 "-" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"
157.48.153.185 - - [19/Dec/2020:14:08:08 +0100] "GET /favicon.ico HTTP/1.1" 404 217 "http://www.almhuette-raith.at/apache-log/access.log" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"
...
undefined
from pyspark.sql.functions import split, regexp_extract, col, to_timestamp

raw_df = spark.read.text('s3://databricks-ktmr-s3/var/log/access.log.*.gz')

# Regexでログデータをパース
split_df = (
    raw_df.select(
      regexp_extract('value', r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1).alias('src_ip'),
      regexp_extract('value', r'\[(.+?)\]', 1).alias('time_string'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 1).alias('method'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 2).alias('path'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 3).alias('version'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+)', 1).cast('int').alias('status_code'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+)', 2).cast('int').alias('content_size'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+) "(.+?)" "(.+?)" "(.+?)"', 3).alias('host2'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+) "(.+?)" "(.+?)" "(.+?)"', 4).alias('user_agent')
    )
  .withColumn( 'timestamp', to_timestamp(  col('time_string'), 'dd/MMM/yyyy:HH:mm:ss Z') )
  .drop('time_string')
  .filter( col('timestamp').isNotNull() ) 
)

# Deltaに書き出す
split_df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/access_log.delta')

# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/access_log.delta') )

读取图像文件(二进制文件)

图像文件 => /databricks-datasets/cctvVideos/train_images/label=0/*.jpg

undefined
image_df = (
  spark.read.format('binaryFile')
  .option('mimeType', 'image/*')
  .load('/databricks-datasets/cctvVideos/train_images/label=0/*.jpg')
)

spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
image_df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/train_image.delta')
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/train_image.delta') ) 
undefined

从关系数据库管理系统/数据仓库中读取

undefined
jdbcHostname = "example.databricks.training"
jdbcPort = 5432
jdbcDatabase = "training"
jdbcUrl = f"jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"


query = '''
(
  SELECT * FROM training.people_1m  
  WHERE salary > 100000
) emp_alias
'''

df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProps)
df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/salary.delta')

# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/salary.delta') )

加载流媒体

undefined
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col
import re

# JSONデータのスキーマ定義
schema = StructType([
  StructField("channel", StringType(), True),
  StructField("comment", StringType(), True),
  StructField("delta", IntegerType(), True),
  StructField("flag", StringType(), True),
  StructField("geocoding", StructType([ StructField("city", StringType(), True), StructField("country", StringType(), True), StructField("countryCode2", StringType(), True), StructField("countryCode3", StringType(), True), StructField("stateProvince", StringType(), True), StructField("latitude", DoubleType(), True), StructField("longitude", DoubleType(), True), ]), True),
  StructField("isAnonymous", BooleanType(), True),
  StructField("isNewPage", BooleanType(), True),
  StructField("isRobot", BooleanType(), True),
  StructField("isUnpatrolled", BooleanType(), True),
  StructField("namespace", StringType(), True),         
  StructField("page", StringType(), True),              
  StructField("pageURL", StringType(), True),           
  StructField("timestamp", StringType(), True),        
  StructField("url", StringType(), True),
  StructField("user", StringType(), True),              
  StructField("userURL", StringType(), True),
  StructField("wikipediaURL", StringType(), True),
  StructField("wikipedia", StringType(), True),
])

# 読み込み
stream_df = (
  spark.readStream.format('kafka') # Kafkaをソースと指定
  .option('kafka.bootstrap.servers', 'server123.databricks.training:9092')
  .option('subscribe', 'en')
  .load()
)

# ELTをして、Deltaに書き込む
(
  stream_df
  .withColumn('json', from_json(col('value').cast('string'), schema))   # Kafkaのバイナリデータを文字列に変換し、from_json()でJSONをパース
  .select(col("json.*"))                    # JSONの子要素だけを取り出す
  .writeStream                              # writeStream()でストリームを書き出す
  .format('delta')                          # Deltaとして保存
  .option('checkpointLocation', '/tmp/daiwt2021/stream.checkpoint') # チェックポイント保存先を指定
  .outputMode('append')                     # マイクロバッチの結果をAppendで追加
  .start('/tmp/daiwt2021/stream.delta')   # start()でストリーム処理を開始 (アクション)
)

# 確認
df = spark.readStream.format('delta').load('/tmp/daiwt2021/stream.delta')
display( df )

将随时更新追加的文件作为流进行处理。

ファイルのディレクトリ => s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*
=> s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/的文件目录

    • オブジェクトストレージ上にファイルが随時アップロードされるパターンもよくあります。

 

    • Databricksでは、ストレージに新たな追加されたファイルを認識して、そのファイルのみ読み込むことができます。

 

    この場合、ストレージをストリーミングのソースとして利用することになります。
undefined
# (スキーマを楽して取得する)
df=spark.read.format('json').load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/2014-01-01')
tweet_schema = df.schema

选择1)持续随时加载

df_autoloader = (
  spark.readStream.format('cloudFiles')
  .option('cloudFiles.format', 'json')
  .option('cloudFiles.maxBytesPerTrigger', '50KB') # 一度に読むサイズ上限
  .schema(tweet_schema)
  .load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)

(
  df_autoloader.writeStream.format('delta')
  .option('checkpointLocation', '/tmp/daiwt2021/tweet.checkpoint')
  .option('maxFilesPerTrigger', 25) # 一度に読むファイル数上限
  .outputMode('append')
  .trigger(processingTime='2 seconds') # 2秒に一度処理
  #.trigger(once=True) # 一度だけ処理
  .start('/tmp/daiwt2021/tweet.delta')
  #.awaitTermination() # async => sync
)

只在执行一次时,如果有新文件,则进行加载。

df_autoloader = (
  spark.readStream.format('cloudFiles')
  .option('cloudFiles.format', 'json')
  #.option('cloudFiles.maxBytesPerTrigger', '50KB') # 一度に読むサイズ上限 <== Trigger Onceの時には無視される!
  .schema(tweet_schema)
  .load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)

(
  df_autoloader.writeStream.format('delta')
  .option('checkpointLocation', '/tmp/daiwt2021/tweet.checkpoint')
  #.option('maxFilesPerTrigger', 25) # 一度に読むファイル数上限  <== Trigger Onceの時には無視される!
  .outputMode('append')
  .trigger(once=True) # 一度だけ処理
  .start('/tmp/daiwt2021/tweet.delta')
  .awaitTermination() # async => sync  <== Trigger Onceの実行が終わるまでブロックさせる
)
# 確認
spark.read.format('delta').load('/tmp/daiwt2021/tweet.delta').count()

工作・自動化・流水线化

我们可以使用Databricks来管理和定期执行数据流水线的功能(Jobs)。
让我们将以下代码转化为一个Job。

### 1. ストレージから更新ファイルだけを認識して、Deltaテーブルに追記する
df_autoloader = (
  spark.readStream.format('cloudFiles')
  .option('cloudFiles.format', 'json')
  .schema(tweet_schema)
  .load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)

(
  df_autoloader.writeStream.format('delta')
  .option('checkpointLocation', '/tmp/daiwt2021/job/tweet.checkpoint')
  .outputMode('append')
  .trigger(once=True) # 一度だけ処理
  .start('/tmp/daiwt2021/job/tweet.delta')
  .awaitTermination() # async => sync
)
### 2. 上記のDeltaテーブルからサマリのDeltaテーブルを作る

df=spark.read.format('delta').load('/tmp/daiwt2021/job/tweet.delta')

(
  df.groupBy('lang').count()
  .write.format('delta').mode('overwrite').save('/tmp/daiwt2021/job/tweet_summary.delta')
)

sql("CREATE TABLE IF NOT EXISTS tweet_summary USING delta LOCATION '/tmp/daiwt2021/job/tweet_summary.delta'")

# 確認
display(
  spark.read.format('delta').load('/tmp/daiwt2021/job/tweet_summary.delta')
)
广告
将在 10 秒后关闭
bannerAds