在Databricks SQL中使用流式化表进行数据加载

在Databricks SQL中加载数据 | 在AWS上的Databricks [截至2023年6月27日]

這本書是一個摘要翻譯,並不能保證內容的準確性。有關準確的內容,請參考原文。
预览
该功能正在公开预览阶段。若要访问,请填写此表格进行注册。

建议使用Databricks SQL来导入数据时使用流式处理表。流式处理表是Unity Catalog的托管表, 可支持流式处理和增量数据处理。对于每个流式处理表自动创建了DLT表。您可以利用来自Kafka或云对象存储的增量数据加载来使用流式处理表。

在这本书中,我们将演示如何利用流媒体表从作为Unity目录外部位置设置的云对象存储中加载数据。

在开始之前

请在开始之前确认以下事项:

    • Databricksアカウントでサーバレスが有効化されている。詳細に関しては、Use serverless SQL warehousesをご覧ください。

 

    • ワークスペースでUnity Catalogが有効化されている。詳細については、Unity Catalogを使い始めるをご覧ください。

 

    • SQLウェアハウスでCurrentチャンネルを使用している。

 

    • Unity Catalogの外部ロケーションに対するREAD FILES権限がある、詳細は、Unity Catalogにおける外部ロケーションとストレージ資格情報の管理をご覧ください。

 

    • ストリーミングテーブルを作成するカタログに対するUSE CATALOG権限。

 

    • ストリーミングテーブルを作成するスキーマに対するUSE SCHEMA権限。

 

    • ストリーミングテーブルを作成するスキーマにおけるCREATE TABLE権限。

 

    クラウドオブジェクトストレージのソースデータのパス。例えば、s3://myBucket/analysis。
请注意,本教程假设您想加载到支持Unity目录之外的云对象存储位置的数据。

寻找和预览原始数据

    1. 在工作区的侧边栏中,点击“查询”,然后点击“创建查询”。

 

    1. 在查询编辑器中,从下拉菜单中选择正在使用的SQL数据仓库,该数据仓库使用“current”频道。

 

    将下面的内容粘贴到编辑器中,并用括号(<>)内的值替换为您特定的源数据信息,然后点击“运行”。
注意:
在执行read_files表值函数时,如果函数的默认设置无法解析您的数据,则可能会遇到模式推断错误。例如,如果您需要在多行模式下设置多行CSV或JSON文件,则可能需要调整解析器选项列表。有关解析器选项的详细信息,请参阅read_files表值函数。
/* Discover your data in an external location */
LIST "s3://<bucket>/<folder>"

/* Preview your data */
SELECT * FROM read_files("s3://<bucket>/<folder>") LIMIT 10

加载数据到流媒体表

要从云对象存储中创建流式数据表,请将以下内容粘贴到查询编辑器中,并点击运行:

CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<folder>')

使用DLT管道进行流式处理表的刷新

在这个部分,我们将解释一种模式,即使用由查询定义的源可用的最新数据刷新流表。

在使用Databricks SQL进行初始创建和数据加载时,对于流媒体表的CREATE操作使用。对于流媒体表的REFRESH操作使用Delta Live Tables(DLT)。对于每个流媒体表,DLT管道将自动创建。当流媒体表进行刷新时,将启动DLT管道的更新以执行刷新处理。

执行REFRESH命令后,会返回DLT管道的链接。您可以利用DLT管道的链接来检查刷新的状态。

请注意,只有表的所有者才能刷新流式表以获取最新数据。创建表的用户将成为所有者,无法更改所有者身份。

请查看Delta Live Tables是什么?

仅导入新数据

默认情况下,read_files函数在创建表时将读取源目录中的所有现有数据,并在每次刷新时处理新到达的记录。

为了避免在创建表时导入源目录中的现有数据,可以使用ignoreExistingFiles选项。这意味着只有在创建表的过程完成后到达目录的数据会被导入。例如:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', ignoreExistingFiles => false)

完全刷新流媒体表

全面刷新会使用最新定义重新处理可在源中使用的所有数据。全面刷新会删除现有数据,因此对于不保留像Kafka这样的所有历史数据或保留期限较短的源,不建议调用全面刷新。如果源中的数据不可用,可能无法恢复旧数据。

例如:

REFRESH STREAMING TABLE my_bronze_table FULL

自动更新的流媒体时间表

为了将流表自动刷新设置为基于已定义的日程表,可以在查询编辑器中粘贴以下内容并执行”运行”:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

请查看ALTER STREAMING TABLE以获取刷新时间表查询的示例。

刷新状态的追踪

可以通过引用Delta Live Tables UI中的流表管理管道或执行DESCRIBE EXTENDED命令来确认流表刷新状态,并查看返回的刷新信息。

DESCRIBE EXTENDED <table-name>

从Kafka中进行流式数据提取

有关将Kafka流式传输示例导入的信息,请参阅read_kafka。

允许访问流媒体表

请在查询编辑器中粘贴以下内容并单击运行,以授予其他用户对流表的SELECT权限:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

请查看Unity Catalog中有关权限和可安全保护对象的授权详细信息。

其他资源

    • Streaming table

 

    • read_files table-valued function

 

    • CREATE STREAMING TABLE

 

    • ALTER STREAMING TABLE

 

    read_kafka table-valued function

Databricks快速入门指南

Databricks快速入门指南

Databricks免费试用

Databricks免费试用

广告
将在 10 秒后关闭
bannerAds