在Databricks中全面运用结构化流处理技术

在生产环境中使用的结构化流式处理 | Databricks 在 AWS 上 [截至 2022/3/21]

這本書並非原著,無法保證內容準確。如需詳細內容,請參考原文。

将笔记本连接到群集并进行交互式流查询非常方便。但是,在生产环境中执行时,需要更高的可靠性和可用性。本书将讨论使用Databricks作业构建更具容错性的流式应用的方法。

定义流数据处理的时机

为了定义流数据处理的时机,可以使用触发器。如果设置触发器的时间段太短(在几十秒内),系统可能会执行不必要的处理以检查新数据的到达。作为最佳实践,建议调整触发器以最小化成本。

从查询失败中恢复

在生产级别的流媒体应用程序中,需要具备健壮的错误处理功能。在结构化流媒体中,启用对流媒体查询的检查点可以在失败后重新启动错误,并保证故障容忍性和数据一致性,从发生故障的地方继续查询。为了增强查询的故障容忍性,您应该启用检查点,并配置Databricks作业以在错误后自动重新启动查询。

启用检查点

要启用检查点,请在开始查询之前设置checkpointLocation选项为DBFS或云存储路径。

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "dbfs://outputPath/")
  .option("checkpointLocation", "dbfs://checkpointPath")
  .start()

在这个检查点的位置上,将保存可以唯一识别查询的基本信息。因此,每个查询都需要不同的检查点位置,不能在多个查询中指定相同的位置。有关详细信息,请参考Structured Streaming编程指南。

请注意,在许多输出类型的Sink中,checkpointLocation选项是必需的。但是在一些像内存Sink这样的类型中,如果不指定checkpointLocation,将自动在DBFS上生成临时的检查点存储位置。临时的检查点存储位置不能保证容错性和数据一致性,也可能没有得到适当的清除。作为最佳实践,建议始终指定checkpointLocation选项。

设置在流查询失败时重新启动的作业。

创建一个运行您的流查询笔记本以及执行JAR的Databricks作业,并按照以下方式进行配置。

    • 常に新規クラスターを使用

 

    失敗時には常にリトライ

工作与结构化流API紧密集成,可以监视活动的流式查询的所有执行作业。通过这个设置,当某个查询发生错误时,作业会自动停止处理(包括其他查询),并在新的集群上启动处理以确保处理的连续性。在新的执行中,会重新运行笔记本或JAR代码,并重新启动所有的查询。这是确保恢复正常状态的最安全的方法。

注意!不支持在筆記型工作流中執行長時間處理的任務。因此,不建議在流式作業中使用筆記型工作流。
请注意,任何活动的流式查询错误都会导致活动的运行失败,并停止其他所有的流式查询。在笔记本的末尾,不需要使用streamingQuery.awaitTermination()或spark.streams.awaitAnyTermination()。当流式查询处于活动状态时,作业会自动避免处理的停止。

以下是推荐的工作设置详细信息。

Cluster: 常に新規クラスターを使用し、最新のSparkバージョン(あるいは少なくともバージョン2.1)を使用するように設定します。Spark 2.1以降のクエリーは、クエリー後の復旧が可能です。

Alerts: 処理失敗時にメールの通知を受け取りたい場合に設定します。

Schedule: スケジュールは設定しません。

Timeout: *タイムアウトは設定しません。*ストリーミングクエリーは無限に長い期間実行されます。

Maximum concurrent runs: 1に設定します。それぞれのクエリーに対して、同時に1つのインスタンスのみがアクティブであることを許可します。

Retries: Unlimitedに設定します。

流媒体查询中的变更后恢复

在流式查询中,关于从同一检查点重新启动时允许的更改存在限制。这些更改类型可能是不允许的,或者其影响未确定。以下是所有类型共同的一些方面。

    • 用語「許可される」は、特定の変更を行うことはできますが、効果のセマンティクス(意味)はクエリーと変更内容に基づいて明確に定義される(well-defined)ことを意味します。

 

    • 用語「許可されない」は、予期されないエラーによりクエリーの再起動が失敗する可能性が高いため、特定の変更を行うべきではないことを意味します。

sdfはsparkSession.readStreamによって作成されるストリーミングデータフレーム/データセットを意味します。

改变的类型 de

入力ソースの数や型の変更(異なるソースなど): これは許可されません。

入力ソースのパラメーターの変更: 変更のセマンティクスがwell-definedになるかどうかは、ソースとクエリーに依存します。こちらにいくつかの例を示します。

レートリミットの追加、削除、変更は許可されます。

Scala
spark.readStream.format(“kafka”).option(“subscribe”, “article”)

から以下への変更は許可されます。

Scala
spark.readStream.format(“kafka”).option(“subscribe”, “article”).option(“maxOffsetsPerTrigger”, …)

サブスクライブされたアーティクル、ファイルの変更は、結果が予期されないため、通常は許可されません。spark.readStream.format(“kafka”).option(“subscribe”, “article”)からspark.readStream.format(“kafka”).option(“subscribe”, “newarticle”)への変更は許可されません。

出力シンクの変更: いくつかの特定のシンクの組み合わせ間の変更は許可されます。ケースバイケースで検証する必要があります。以下にいくつか例を示します。

ファイルシンクからKafkaシンクへの変更は許可されます。Kafkaは新規データのみを参照します。
Kafkaシンクからファイルシンクへの変更は許可されません。
Kafkaシンクとforeachの相互変換は許可されます。

出力シンクのパラメーターの変更: 許可されるかどうか、変更のセマンティクスがwell-definedになるかどうかは、シンクとクエリーに依存します。いかに例を示します。

ファイルシンクの出力ディレクトリの変更は許可されません。sdf.writeStream.format(“parquet”).option(“path”, “/somePath”)からsdf.writeStream.format(“parquet”).option(“path”, “/anotherPath”)への変更は許可されません。
出力アーティクルの変更は許可されます。sdf.writeStream.format(“kafka”).option(“article”, “somearticle”)からsdf.writeStream.format(“kafka”).option(“path”, “anotherarticle”)への変更は許可されます。
ユーザー定義のforeachシンク(すなわちForeachWriterコード)への変更は許可されますが、変更のセマンティクスはコードに依存します。

projection / filter / mapライクなオペレーションにおける変更: いくつかのケースは許可されます。例えば、

フィルターの追加削除は許可されます: sdf.selectExpr(“a”)からsdf.where(…).selectExpr(“a”).filter(…)への変更は許可されます。
同じ出力スキーマによるprojectionの変更は許可されます: sdf.selectExpr(“stringColumn AS json”).writeStreamからsdf.select(to_json(…).as(“json”)).writeStreamへの変更は許可されます。
異なる出力スキーマによるprojectionの変更は条件付きで許可されます: 出力シンクが”a”から”b”へのスキーマ変更を許可しているのであれば、sdf.selectExpr(“a”).writeStreamからsdf.selectExpr(“b”).writeStreamへの変更は許可されます。

ステートフルなオペレーションにおける変更: 結果を継続的に更新するために、ストリーミングクエリーにおける幾つかのオペレーションはステートデータを維持する必要があります。構造化ストリーミングは自動で状態のチェックポイントをフォールトトレラントなストレージ(例えば、DBFS、AWS S3、Azure Blob storage)に作成し、再起動後にレストアします。しかし、ステートデータのスキーマは再起動の合間で変更がないことを前提としています。これは、再起動の合間にストリーミングクエリーのステートフルオペレーションに対するいかなる変更(追加、削除、スキーマ変更)も許可されないことを意味します。ステートのリカバリーを確実にするために、再起動の合間にスキーマを変更すべきではないステートフルオペレーションのリストを示します。

ストリーミングの集計: 例えば、sdf.groupBy(“a”).agg(…)。グルーピングのキー、集計の数やタイプの変更は許可されません。

ストリーミングの重複排除: 例えば、sdf.dropDuplicates(“a”)。グルーピングのキー、集計の数やタイプの変更は許可されません。

ストリームとストリームのjoin: 例えば、sdf1.join(sdf2, …) (ここでは両方の入力がsparkSession.readStreamで生成されます)。スキーマやjoinカラムの変更は許可されません。joinタイプ(inner/outer)の変更は許可されません。他のjoinの条件の変更の結果も未定になる可能性があります。

任意のステートフルオペレーション: 例えば、sdf.groupByKey(…).mapGroupsWithState(…)や sdf.groupByKey(…).flatMapGroupsWithState(…)。ユーザー定義のステートのスキーマやタイムアウトのタイプの変更は許可されません。ユーザー定義のステートマッピング関数におけるいかなる変更は許可されますが、変更のセマンティックな影響は、ユーザー定義ロジックに依存します。ステートのスキーマ変更をサポートしたいのであれば、明示的に複雑なステートデータの構造を、スキーママイグレーションをサポートするエンコーディング/デコーディングスキーマを用いて、バイトコードにエンコード/デコードすることができます。例えば、Avroエンコードされたバイトコードとしてステートを保存するのであれば、バイナリーのステートは常に問題なくレストアされるので、クエリー再起動の合間にAvroのステートスキーマを自由に変更することができます。

对于流查询的监控

您可以通过Streaming标签下的Spark UI来监控流式应用程序。通过在df.writeStream.queryName(<查询名称>)中为流式查询命名,您可以在Spark UI中查看哪些指标属于哪个流式查询。

通过使用Apache Spark的Streaming Query Listener接口,可以将流式指标推送到外部服务,用于警报和仪表板。Streaming Query Listener接口只能在Scala中使用。

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
   * Called when a query is started.
   * @note This is called synchronously with
   *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
   *       that is, `onQueryStart` will be called on all listeners before
   *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
   *        Do not block in this method as it will block your query.
   */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
   * Called when there is some status update (ingestion rate updated, etc.)
   *
   * @note This method is asynchronous. The status in [[StreamingQuery]] will always be
   *       latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
   *       may be changed before/when you process the event. For example, you may find [[StreamingQuery]]
   *       is terminated when you are processing `QueryProgressEvent`.
   */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
   * Called when a query is stopped, with or without error.
   */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

观测方法

可以观测的度量可以通过查询(数据框)来定义的带有命名的任意聚合函数。当数据框的执行达到完成点时,将触发一个带有命名事件的统计指标,该事件包含自上次完成点以来处理的数据的统计指标。

通过为 Spark 会话附加监听器可以观察这些度量值。监听器的行为取决于执行模式。

バッチモード: QueryExecutionListenerを使います。
クエリーが完了するとQueryExecutionListenerがコールされます。QueryExecution.observedMetrics mapを用いてメトリクスにアクセスします。

ストリーミング、あるいはマイクロバッチ: StreamingQueryListenerを使います。
ストリーミングクエリーがエポックを完了するとStreamingQueryListenerがコールされます。StreamingQueryProgress.observedMetrics mapを用いてメトリクスにアクセスします。Databricksでは連続実行ストリーミングをサポートしていません。

這裡提供一個例子。

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

为了提高效率,设置Apache Spark调度程序池。

默认情况下,所有在笔记本中启动的查询都将在同一公平调度池中执行。因此,所有在笔记本中逐个执行的流查询将按照先进先出(FIFO)的方式进行处理。通过不在查询之间有效共享集群资源,可能会导致查询不必要的延迟。

为了使所有的流查询能够同时执行作业,并能够有效地共享集群资源,可以将查询设置为在另一个调度程序池中运行。例如,可以按照以下方式进行设置。

// Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").format("parquet").start(path1)

// Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").format("orc").start(path2)
请注意,本地属性设置必须在启动流式查询的同一个笔记本单元中进行。

请参阅Apache公平调度器文档以获取更详细的信息。

优化基于状态的流式查询的性能

如果您在流式查询中执行有状态的操作(例如流式聚合、流式负责重复项、流与流的连接、mapGroupsWithState、flatMapGroupsWithState),并且希望保留数百万个键的状态,则可能会遇到一些问题,如由于大型JVM垃圾回收(GC)导致的微批处理时间变动等。这是因为默认情况下,状态数据被保存在执行器的JVM内存中,大量的状态对象会导致JVM的内存消耗增加,进而导致大规模的GC暂停。

在这种情况下,您可以选择基于RocksDB的更优化的状态管理解决方案。这个解决方案可以在Databricks运行时中使用。与将状态保持在JVM内存中相比,该解决方案使用RocksDB在本地内存和本地SSD上高效管理状态。此外,对于此状态的任何更改都会自动保存到由结构化流指定的检查点位置,从而确保(与默认状态管理相同)完全的容错性。有关如何将RocksDB设置为状态存储的方法,请参阅配置RocksDB状态存储。

以下是为了获得最佳性能而推荐的设置。

    • ワーカーとしてcompute-optimizedインスタンスを使用します。例えば、AWSのc3.4xlargeインスタンスなどです。

 

    クラスターにおけるコア数の1-2倍のシャッフルパーティション数を指定します。

在RocksDB基础的状态管理中,关于性能的优点是可以维持超过默认值100倍以上的状态键。例如,在使用AWS c3.4xlarge实例的Spark集群中作为工作者,使用默认的状态管理,每个执行器可以保持最多1-2百万个状态键,但之后JVM的垃圾回收会开始并对性能产生影响。然而,基于RocksDB的状态管理可以轻松地每个执行器保持一亿个状态键,而不会面临垃圾回收的问题。

请注意,在重新启动查询期间不能更改状态管理方案。换句话说,如果您使用默认的状态管理启动查询,则除非使用新的检查点存储位置重新启动查询,否则无法更改。

设置RocksDB状态存储

在开始流查询之前,您可以通过在SparkSession中进行以下设置来启用基于RocksDB的状态管理。

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

RocksDB状态存储的指标

每个状态操作符将监视状态存储并收集与在RocksDB实例上运行的状态管理操作相关的度量标准,以帮助调试慢速作业。这些度量标准被聚合(总和)到每个作业的状态操作符中,在执行状态操作符的所有任务中。这些度量标准将成为StreamingQueryProgress中stateOperators中customMetrics映射的一部分。以下是通过调用StreamingQueryProgress.json()获取的JSON格式的StreamingQueryProgress示例。

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

请参考原文以获取有关度量指标的详细说明。

创建非同步状态检查点

注意:只能在Databricks运行时10.3及以上版本中使用该功能。

通过启用异步状态检查点,在涉及大规模状态更新的有状态流式查询中,可能会减少端到端微批处理的延迟。

非同步型的状态检查点会尝试异步地创建检查点,以免微批处理执行需要等待检查点创建完成。换句话说,下一个微批处理会在上一个微批处理完成后立即启动。然而,在内部,偏移量元数据(保存在检查点的位置)会跟踪每个微批处理的状态检查点是否已完成。如果重新启动查询,则可能需要重新执行一个或多个微批处理。这可能包括尚未完成计算的最新微批处理以及尚未完成状态检查点的微批处理。因此,您可以获得与同步检查点创建相同的故障容错保证(即具有幂等性的消费者并确保仅一次的保证)。

简言之,通过在存在状态更新瓶颈的状态保持型流式查询中启用异步检查点创建,可以在减少端到端延迟的代价上,保持所有容错性保证,而无需牺牲任何可靠性。

确定目标工作负载

以下是通过创建异步检查点而可能享受到优势的流式作业的特性。

ジョブに1つ以上のステートフルなオペレーション(例えば、集計、[flat]MapGroupsWithState、ストリームとストリームのjoin)が含まれている。

ステートのチェックポイントのレーテンシーがバッチ処理全体のレーテーンシーの大部分を占める。この情報はStreamingQueryProgressで確認することができます。これらのイベントはSparkドライバーのlog4jログでも確認することができます。以下にストリーミングクエリーの進捗状況と、全体的なバッチ実行のレーテンシーにおけるステートチェックポイントのインパクトをどのように特定するのかの例を示します。

JSON
{
“id” : “2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19”,
“runId” : “e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe”,
“…”,
“batchId” : 0,
“durationMs” : {
“…”,
“triggerExecution” : 547730,
“…”
},
“stateOperators” : [ {
“…”,
“commitTimeMs” : 3186626,
“numShufflePartitions” : 64,
“…”
}]
}

上記のクエリー進捗イベントのステートチェックポイントのレーテンシーの分析

バッチ期間(durationMs.triggerDuration)は約547秒。
ステートストアのコミットのレーテンシー(stateOperations[0].commitTimeMs)は約3,186秒。コミットのレーテンシーはステートストアを持つタスクで合計されます。この場合、そのようなタスクは64個(stateOperators[0].numShufflePartitions)です。
ステートオペレータを持つそれぞれのタスクは、チェックポイント作成に平均50秒(3,186/64)かかっています。これは追加のレーテンシーであり、バッチ期間に加算されます。64個すべてのタスクが同時に実行すると仮定すると、チェックポイントのステップはバッチ処理期間の約9%(50秒 / 547秒)を占めています。最大同時タスク実行数が64より少なくなると、このパーセンテージはさらに増加します。

启用非同步状态检查点创建

在流媒体作业中进行以下设置。要进行异步状态检查点创建,需要使用支持异步提交的状态存储实现。目前仅支持基于RocksDB的状态存储。

spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

限制

    • 非同期チェックポイントにおけるあらゆる失敗は、クエリー自体を失敗させます。同期チェックポイント作成モードでは、チェックポイントはタスクの一部として実行され、Sparkはクエリーが失敗する前にタスクを複数回リトライします。この機構は非同期チェックポイント作成モードでは存在しません。しかし、Databricksジョブのリトライを使うことで、そのような処理失敗に対して自動でリトライするようにすることができます。

 

    • 非同期ステートチェックポイント作成とオートスケーリングの組み合わせは動作しません。マイクロバッチの実行の合間にステートストアの場所がへこうされない場合には、非同期チェックポイント作成がもっともうまく動作します。オートスケーリングを有効化すると、オートスケーリングの一部でノードが追加、削除されるたびに、ステートストアのインスタンスが再分散される場合があります。

 

    非同期ステートチェックポイント作成はRocksDBステートストアプロバイダー実装でのみサポートされています。デフォルトのインメモリステートストア実装は非同期ステートチェックポイント作成をサポートしていません。

多种水印策略

在流式查询中,可以包含多个输入流进行union和join操作。每个输入流都可以使用带有Watermark(“eventTime”,delay)的状态操作来指定不同的延迟数据阈值。可以通过使用withWatermarks(“eventTime”,delay)在每个输入流上来指定这些阈值。例如,考虑一个流与流的join查询。

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

在执行查询时,结构化流会单独跟踪每个输入流的最大事件时间,并根据相应的延迟计算水印,然后选择用于状态操作的单个全局水印。默认情况下,为了防止出现意外的数据丢失,使得某个流由于延迟而比其他流滞后,最小值被用作全局水印(例如,某个流由于上游流的故障而停止接收数据)。换句话说,全局水印可以安全地与最慢的流同步,从而使查询输出也相应地延迟。

在某些情况下,即使从最慢的数据流中丢弃数据,我们可能仍然希望更快地获得结果。通过将SQL设置spark.sql.streaming.multipleWatermarkPolicy设置为max(默认为min),可以设置多个水印策略以选择全局水印的最大值。这样,全局水印将与最快的数据流同步。然而,作为副作用,来自最慢数据流的数据将被积极丢弃。因此,建议在考虑后使用此设置。

可视化结构化流数据框架

为了实时可视化结构化流式数据框,您可以使用display函数。trigger和checkpointLocation参数是可选的,但作为最佳实践,在生产环境中建议始终指定它们。

import org.apache.spark.sql.streaming.Trigger

val streaming_df = spark.readStream.format("rate").load()
display(streaming_df.groupBy().count(), trigger = Trigger.ProcessingTime("5 seconds"), checkpointLocation = "dbfs:/<checkpoint-path>")
streaming_df = spark.readStream.format("rate").load()
display(streaming_df.groupBy().count(), processingTime = "5 seconds", checkpointLocation = "dbfs:/<checkpoint-path>")

请参考有关Structured Streaming DataFrames的详细信息。

改善flatMapGroupsWithState方法的State Operator

初始状态的设定

您可以使用[flat]MapGroupsWithState操作符来指定用户定义的初始状态,用于结构化流处理中的状态处理。

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

以下是为flatMapGroupsWithState操作符指定初始状态的一个示例。

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

以下是一个示例,演示如何在mapGroupsWithState操作符中指定初始状态。

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

测试状态更新函数

通过使用TestGroupState API,可以测试Dataset.groupByKey(…).mapGroupsWithState(…)和Dataset.groupByKey(…).flatMapGroupsWithState(…)的状态更新函数。

状态更新函数接收先前状态为输入,该输入是 GroupState对象类型。有关示例,请参考Apache Spark的GroupState参考文档。

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}

Databricks 免费试用

Databricks 免费试用

广告
将在 10 秒后关闭
bannerAds