Apache Spark™ 3.1版本中的结构化流式处理的新功能

Apache Spark 3.1版本中针对结构化流的改进——The Databricks Blog的翻译。

這本書是摘譯,並不能保證其內容的準確性。請參考原文以獲取正確的內容。

構造化ストリーミング是Apache Spark™中最重要的组件之一,它提供了使用Spark Core和SQL API进行流式处理的功能。本博客文章总结了Spark Streaming在最新的3.1版本中的一些显著改进,包括新的流式表API、流-流连接支持以及多个用户界面的改进。通过模式验证和Apache Kafka数据源的改进,提供了出色的用户友好性。最后,对于FileStream的读取/写入操作进行了多种增强。

全新的流媒体表 API

启动结构化流后,连续的数据流可以被视为没有边界的表。因此,表API提供了一种自然且方便的方法来处理流查询。在Spark 3.1中,我们添加了DataStreamReader和DataStreamWriter的支持。现在,用户可以直接将流数据框作为表进行读写。请参考以下示例。

# Create a streaming DataFrame
src = spark.readStream.format("rate").option("rowPerSecond", 10).load()

# Write the streaming DataFrame to a table
src.writeStream.option("checkpointLocation", checkpointLoc1).toTable("myTable")

# Check the table result
spark.read.table("myTable").show(truncate=30)
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2021-01-19 07:45:23.122|42   |
|2021-01-19 07:45:23.222|43   |
|2021-01-19 07:45:23.322|44   |
...

而且,借助这些新选项,用户可以将数据集转换为源并写入新表中。

# Write to a new table with transformation
spark.readStream.table("myTable").select("value") \
  .writeStream.option("checkpointLocation", checkpointLoc2) \
  .format("parquet").toTable("newTable")

# Check the table result
spark.read.table("newTable").show()
+-----+
|value|
+-----+
| 1214|
| 1215|
| 1216|
...

建议在流式处理API中使用Delta Lake格式,以实现以下功能。

    • 同時に実行される低レーテンシーのデータ取り込みによって生成される小規模ファイルのコンパクト化

 

    • 1つ以上のストリーム(あるいは同時実行のバッチジョブ)による「一度のみ(exactly-once)」の処理の維持

 

    ファイルをストリームのソースとして使用する際、新規ファイルを効率的に特定

对Stream-Stream join的支持已经更新

截至Spark 3.1版本,只支持内连接(inner),左外连接(left outer)和右外连接(right outer)的流-流连接。在最新发布的版本中,我们实现了全外连接(full outer)和左半连接(left semi)的流-流连接,因此可以在更多情况下利用结构化流处理。

    • Left semi stream-stream join (SPARK-32862)

 

    Full outer stream-stream join (SPARK-32863)

Kafka数据源的改善

由于在Spark 3.1中升级了Kafka的依赖库到2.6.0(SPARK-32568),因此用户可以迁移到针对Kafka偏移获取(AdminClient.listOffsets)的新API。这可以解决在使用旧版本时Kafka连接器会永远等待的问题(SPARK-28367)。

验证模式

在構造化流式查詢中,架構是一個重要的信息。在Spark 3.1中,我們為用戶輸入的架構和內部狀態存儲添加了架構驗證邏輯。

引用:引入查询重启期间状态模式验证(SPARK-27237)

请注意,由于机器人的语料库是基于英文的,我的翻译能力可能会受限。某些技术术语或专业术语可能无法准确转换。以下是我的尝试翻译:

在查询重启期间引入状态模式验证(SPARK-27237)。

通过这次更新,在流式开始时,键值模式将被存储在模式文件中。新的键值模式将在查询重新启动时与现有模式进行兼容性验证。如果字段数量相同且每个字段的数据类型相同,则状态模式将被视为“兼容”。由于Spark可以更改字段名称,请注意此处不检查字段名。

通过这样做,可以防止使用不兼容状态模式进行查询操作,并降低非确定性行为的可能性,还能够输出更有意义的错误信息。

在流式存储数据源中引入模式验证(SPARK-31894)。

在过去,结构化流处理在将检查点直接写入到StateStore中(以UnsafeRow的形式表示),而没有进行任何模式验证。升级到新版本的Spark后,检查点文件将被重用。如果没有模式验证,与聚合函数相关的更改或错误修复可能会导致随机异常,并且可能返回不同的结果(例如 SPARK-28067)。现在,Spark将验证检查点与模式对齐,如果在迁移过程中重用检查点,则会引发InvalidUnsafeRowException。这个行为有助于发现Spark 3.0.1中的阻碍性问题SPARK-31990:流式状态存储的不兼容性。

增强结构化流媒体用户界面

在Spark 3.0中,我们引入了全新的结构化流式处理用户界面。在Spark 3.1中,对于结构化流式处理的用户界面,我们增加了对历史服务器的支持(SPARK-31953),并为流式处理的执行状态提供了更多信息。

在構造化流媒體UI中的狀態信息(SPARK-33223)

在状态信息中增加了四个度量标准。

    1. 合計状态行的计数

 

    1. 更新后的状态行的计数

 

    1. 使用的状态行内存的字节数

 

    根据水印删除的状态行的计数
    構造化ストリーミングUIにおけるウォーターマークギャップ情報(SPARK-33224)
    SS UIにステートカスタムメトリクスを表示 (SPARK-33287)

文件流的增强提升源/接收器。

文件流的源/汇有几项改进。

将超过maxFilesPerTrigger的获取文件列表读取为缓存中的未读文件(SPARK-30866)。

过去,当设置了maxFilesPerTrigger时,FileStream源会获取可用于每个微批次的所有文件,并根据设置处理一定数量的文件,忽略剩下的文件。通过这次改进,可以缓存上一批次获取的文件,并在后续批次中使用它们。

整理(SPARK-30462)了对文件流源和接收器的元数据日志逻辑的修整。

在这个变更之前,当在FileStream源/接收器中需要元数据时,所有元数据日志的条目都会被反序列化到Spark驱动程序的内存中。通过这个变更,Spark将尽可能以整齐的方式读取和处理元数据日志。

提供对于保留输出文件时间的新选项(SPARK-27188)。

提供了一种在FileStream Sync中设置元数据日志文件保留期限的新选项,它有助于限制长时间运行的结构化流查询的元数据日志文件的大小扩展。

下一个来的是

在下一个主要版本发布之前,我们将会专注于改善结构化流式处理的新功能、性能和易用性。我们非常期待作为最终用户和Spark开发者收到各位的反馈!如果您有任何反馈,请在Spark用户和开发者的邮件列表中分享。感谢所有社区的贡献者和用户,他们帮助我们将这些重要的增强功能变成现实。

数据脑 無法体验

Databricks 免费试用

广告
将在 10 秒后关闭
bannerAds