结构化流媒体:回顾2021年
2021年为Databricks和Apache Spark开发的所有新结构化流处理功能概述。
2022年到来之际,我们在这里回顾一下Databricks与Apache Spark™在流式处理方面所取得的伟大成就!在2021年,工程团队和开源贡献者以三个目标为指导,取得了许多重大成就。
-
- 改善低延迟和具有状态的处理
-
- 改善Databricks和Spark结构化流工作负载的可观察性
- 改善资源配置和可扩展性
最终的目标是通过使团队能够在Databricks和Spark上执行流式工作负载,以在Databricks上运行关键任务级别的流式应用程序,并同时优化成本效益和资源利用量,从而实现这些目标背后的动机。
目标#1:提高低延迟和状态处理能力。
有两个关键功能旨在减少有状态操作的延迟并改善有状态API。第一个功能是对大规模有状态操作的异步检查点,这改善了之前同步设计导致较高延迟的问题。
非同步檢查點
对任意一种有状态操作员的改进
我們在以前的文章中介紹了使用[flat]MapGroupsWithState進行結構化串流的任意狀態處理。使用這些運算子,您可以獲得跨越聚合框架的複雜狀態操作的靈活性。以下是我們對這些運算子進行的改進。
-
- 初期状態の許可、これによってお使いのストリーミングデータ全ての再処理を回避することができます。
- 新たにTestGroupStateインタフェースを提供することで、ロジックのテストが容易になり、ユーザーはGroupStateのインスタンスを作成することで設定された内部の値にアクセスでき、状態を変化させる関数のユニットテストがシンプルになります。
初期阶段的许可
让我们从以下的flatMapGroupswithState操作符开始。
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
这个自定义状态函数将保持遇到的水果数量。
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
在这个例子中,通过设置特定水果的初始值来指定这个操作员的初始状态。
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
以一种简便的方式进行逻辑测试
我们也可以使用 TestGroupState API 来测试状态更新。
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}
您还可以在Databricks的文档中查看其他示例。
会话窗口的本地支持
在结构化流处理中,我们引入了用于在基于事件时间的固定长度滚动窗口和滑动窗口中进行聚合的功能。Spark 3.2引入了具有动态窗口长度的会话窗口概念。以前需要使用flatMapGroupsWithState来实现自定义状态操作符。
我們將舉一個使用動態差距的例子。
# Define the session window having dynamic gap duration based on eventType
session_window expr = session_window(events.timestamp, \
when(events.eventType == "type1", "5 seconds") \
.when(events.eventType == "type2", "20 seconds") \
.otherwise("5 minutes"))
# Group the data by session window and userId, and compute the count of each group
windowedCountsDF = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(events.userID, session_window_expr) \
.count()
目标2:提高对流式工作负载的可观察性
通过使用 StreamingQueryListener API,您可以异步监视 SparkSession 中的查询,并定义用于查询状态、进度和停止事件的自定义回调函数。然而,在微批处理中理解背压和确定瓶颈仍然是困难的。在 Databricks Runtime 8.1 中,StreamingQueryProgress 对象会报告 Kafka、Kinesis、Delta Lake 和 Auto Loader 数据源的背压指标。
以下是由 Kafka 提供的指标的示例。
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
Databricks运行时8.3将引入实时指标,以帮助理解RocksDB状态存储的性能和调试状态操作的性能。这有助于在异步检查点中确定目标工作负载。
我将给出一个新的状态存储指标示例。
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
目标3:资源分配和可扩展性的改善。
Delta Live Tables(DLT)的流式自动缩放
我们在去年的Data + AI Summit上发布了Delta Live Tables,这是一个能够声明性地构建和编排数据管道,并极大地抽象化了集群和节点类型设置的框架。我们进一步推动了这一框架,并改进了现有的Databricks优化自动缩放功能,引入了对流式管道的智能解决方案。以下是它的一些优点:
クラスター使用率の改善: ストリーミングワークロードで負荷の変動があるシナリオにおいて、新たなバックプレッシャーメトリクスを活用してクラスターのサイズを調整し、クラスター使用率を改善します。
プロアクティブなワーカーのグレースフルシャットダウン: 既存のオートスケーリングのソリューションはノードがアイドル状態の場合にのみリトライを行いますが、新たなDLTのオートスケーラーは使用率が低い場合、シャットダウンによってタスクが失敗しないことを保証しつつも、プロアクティブに選択されたノードをシャットダウンします。
请在与Databricks账户团队联系后获取更多详细信息,因为本书的撰写阶段,此功能仍处于私人预览阶段。
立即触发
在结构化流处理中,可以通过使用触发器来定义流式查询的数据处理时机。这些触发器类型包括微批次(默认)、固定的微批次间隔(Trigger.ProcessingTime(””))、仅一次的微批次(Trigger.Once)和连续(Trigger.Continuous)。
Databricks 10.1 及以上的版本引入了新型触发器。Trigger.AvailableNow 类似于 Trigger.Once,但提供了更优秀的可扩展性。与 Trigger.Once 类似,它会在查询停止之前处理可用的所有数据,但不是在一个批次中处理,而是在多个批次中处理。这受到 Delta Lake 和 Auto Loader 的流式数据源的支持。
请给出一个样例。
spark.readStream
.format("delta")
.option("maxFilesPerTrigger", "1")
.load(inputDir)
.writeStream
.trigger(Trigger.AvailableNow)
.option("checkpointLocation", checkpointDir)
.start()
摘要
在2022年,我们将继续加速在结构化流媒体领域的创新,进一步提升性能、减少延迟,并提供全新的精彩功能。期待您对今年的进展保持关注!
Databricks免费试用
Databricks 免费试用