在Databricks SQL中使用流式化表进行数据加载
在Databricks SQL中加载数据 | 在AWS上的Databricks [截至2023年6月27日]
该功能正在公开预览阶段。若要访问,请填写此表格进行注册。
建议使用Databricks SQL来导入数据时使用流式处理表。流式处理表是Unity Catalog的托管表, 可支持流式处理和增量数据处理。对于每个流式处理表自动创建了DLT表。您可以利用来自Kafka或云对象存储的增量数据加载来使用流式处理表。
在这本书中,我们将演示如何利用流媒体表从作为Unity目录外部位置设置的云对象存储中加载数据。
在开始之前
请在开始之前确认以下事项:
-
- Databricksアカウントでサーバレスが有効化されている。詳細に関しては、Use serverless SQL warehousesをご覧ください。
-
- ワークスペースでUnity Catalogが有効化されている。詳細については、Unity Catalogを使い始めるをご覧ください。
-
- SQLウェアハウスでCurrentチャンネルを使用している。
-
- Unity Catalogの外部ロケーションに対するREAD FILES権限がある、詳細は、Unity Catalogにおける外部ロケーションとストレージ資格情報の管理をご覧ください。
-
- ストリーミングテーブルを作成するカタログに対するUSE CATALOG権限。
-
- ストリーミングテーブルを作成するスキーマに対するUSE SCHEMA権限。
-
- ストリーミングテーブルを作成するスキーマにおけるCREATE TABLE権限。
- クラウドオブジェクトストレージのソースデータのパス。例えば、s3://myBucket/analysis。
寻找和预览原始数据
-
- 在工作区的侧边栏中,点击“查询”,然后点击“创建查询”。
-
- 在查询编辑器中,从下拉菜单中选择正在使用的SQL数据仓库,该数据仓库使用“current”频道。
- 将下面的内容粘贴到编辑器中,并用括号(<>)内的值替换为您特定的源数据信息,然后点击“运行”。
在执行read_files表值函数时,如果函数的默认设置无法解析您的数据,则可能会遇到模式推断错误。例如,如果您需要在多行模式下设置多行CSV或JSON文件,则可能需要调整解析器选项列表。有关解析器选项的详细信息,请参阅read_files表值函数。
/* Discover your data in an external location */
LIST "s3://<bucket>/<folder>"
/* Preview your data */
SELECT * FROM read_files("s3://<bucket>/<folder>") LIMIT 10
加载数据到流媒体表
要从云对象存储中创建流式数据表,请将以下内容粘贴到查询编辑器中,并点击运行:
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<folder>')
使用DLT管道进行流式处理表的刷新
在这个部分,我们将解释一种模式,即使用由查询定义的源可用的最新数据刷新流表。
在使用Databricks SQL进行初始创建和数据加载时,对于流媒体表的CREATE操作使用。对于流媒体表的REFRESH操作使用Delta Live Tables(DLT)。对于每个流媒体表,DLT管道将自动创建。当流媒体表进行刷新时,将启动DLT管道的更新以执行刷新处理。
执行REFRESH命令后,会返回DLT管道的链接。您可以利用DLT管道的链接来检查刷新的状态。
请查看Delta Live Tables是什么?
仅导入新数据
默认情况下,read_files函数在创建表时将读取源目录中的所有现有数据,并在每次刷新时处理新到达的记录。
为了避免在创建表时导入源目录中的现有数据,可以使用ignoreExistingFiles选项。这意味着只有在创建表的过程完成后到达目录的数据会被导入。例如:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', ignoreExistingFiles => false)
完全刷新流媒体表
全面刷新会使用最新定义重新处理可在源中使用的所有数据。全面刷新会删除现有数据,因此对于不保留像Kafka这样的所有历史数据或保留期限较短的源,不建议调用全面刷新。如果源中的数据不可用,可能无法恢复旧数据。
例如:
REFRESH STREAMING TABLE my_bronze_table FULL
自动更新的流媒体时间表
为了将流表自动刷新设置为基于已定义的日程表,可以在查询编辑器中粘贴以下内容并执行”运行”:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
请查看ALTER STREAMING TABLE以获取刷新时间表查询的示例。
刷新状态的追踪
可以通过引用Delta Live Tables UI中的流表管理管道或执行DESCRIBE EXTENDED命令来确认流表刷新状态,并查看返回的刷新信息。
DESCRIBE EXTENDED <table-name>
从Kafka中进行流式数据提取
有关将Kafka流式传输示例导入的信息,请参阅read_kafka。
允许访问流媒体表
请在查询编辑器中粘贴以下内容并单击运行,以授予其他用户对流表的SELECT权限:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
请查看Unity Catalog中有关权限和可安全保护对象的授权详细信息。
其他资源
-
- Streaming table
-
- read_files table-valued function
-
- CREATE STREAMING TABLE
-
- ALTER STREAMING TABLE
- read_kafka table-valued function
Databricks快速入门指南
Databricks快速入门指南
Databricks免费试用
Databricks免费试用