從結構化串流查詢的故障中恢復

处理结构化流查询故障 | 在AWS上的Databricks [截至2022年8月25日]

這本書是摘譯版本,並不能保證內容的準確性。請參考原文以獲取準確的內容。

使用Databricks工作流,结构化流可以提供故障容忍性和数据一致性。可以通过设置结构化流重新启动以便在故障时重新启动。通过启用流查询的检查点创建,可以在故障后重新启动查询。重新启动的查询将从故障点继续处理。

启用结构化流查询的检查点创建。

以下是一个例子,在开始查询之前,建议始终指定checkpointLocation选项。

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

这个检查点的位置将保存确定查询所需的所有重要信息。应为每个查询设置一个检查点位置。不应在多个查询中使用相同的位置。有关详细信息,请参阅Structured Streaming 编程指南。

请注意,大多数类型的输出sink都需要指定checkpointLocation,但某些类似内存sink的sink会自动生成临时的检查点位置,而无需指定checkpointLocation。这些临时的检查点位置不能保证容错性和数据一致性,并且可能在适当的情况下不会被正确清理。请务必始终指定checkpointLocation以避免潜在的陷阱。

将结构化流作业配置为重新启动失败的流查询。

可以使用带有流式查询的笔记本或JAR创建Databricks作业,并进行以下设置。

    • 常に新規クラスターを使用する

 

    障害時には常にリトライする

作业与结构化流API密切集成,可以监视所有活动查询的运行状态。通过这个设置,当一部分查询失败时,作业会自动停止处理(与其他所有查询一起),然后在新集群上重新启动处理。通过重新执行此笔记本或JAR代码,所有查询将再次启动运行,这是恢复到良好状态的最安全方法。

警告!笔记本工作流不支持长时间运行的作业。因此,不建议在流式作业中使用笔记本工作流。
注意:任何活动的流查询故障都会导致活动处理程序(运行)失败,并停止所有其他流查询。无需在笔记本的末尾使用streamingQuery.awaitTermination()或spark.streams.awaitAnyTermination()。作业会自动在流查询处于活动状态时保持运行。

我将展示以下关于推荐工作设置的示例。

クラスター: 常に新規クラスターを設定し、最新のSparkバージョン(あるいは最低でもバージョン2.1)を使う様にします。Spark 2.1以降で起動されたクエリーは、クエリーの実行後やSparkのバージョンアップ後に復旧可能です。

通知: 失敗時にメールの通知が必要であれば設定します。

スケジュール: スケジュールを設定しません。

タイムアウト: タイムアウトを設定しません。 ストリーミングクエリーは永遠に実行されます。

最大同時実行数: 1に設定します。それぞれのクエリーのインスタンスは1つ存在する様にします。

リトライ: Unlimitedに設定します。

请查看Databricks中的作业管理以理解这些配置。

在对结构化流查询进行更改后恢复。

从同一检查点位置重新启动之前,对于允许进行流查询更改的内容有一些限制。以下是一些不被允许或对变更影响未定义的变更类型。

許可されるという用語は、その変更を行うことができますが、クエリーと変更内容に基づいてその影響のセマンティクスは決定されます。

許可されないという用語は、クエリーの再起動は予期しないエラーで失敗するため、その変更は行うべきではないことを意味します。

sdfはsparkSession.readStreamで生成されたストリーミングデータフレーム/データセットを表現します。

結構化流式查询的变更类型

入力ソースの数やタイプの変更(異なるソース): これは許可されません。

入力ソースのパラメーターの変更: 許可されるかどうか、変更のセマンティクスが決定的かどうかはソースとクエリーに依存します。以下に例を示します。

レートリミットの追加、削除、変更は許可されます。

Scala
spark.readStream.format(“kafka”).option(“subscribe”, “article”)

から

Scala
spark.readStream.format(“kafka”).option(“subscribe”, “article”).option(“maxOffsetsPerTrigger”, …)

サブスクライブしているアーティクルやファイルの変更は、結果が予測できないため通常は許可されません。

spark.readStream.format(“kafka”).option(“subscribe”, “article”)からspark.readStream.format(“kafka”).option(“subscribe”, “newarticle”)への変更

出力シンクの変更: いくつかの特定のシンクの組み合わせ間の変更は許可されます。ケースバイケースで検証する必要があります。以下にいくつか例を示します。

ファイルシンクからKafkaシンクへの変更は許可されます。Kafkaは新規データのみを参照します。
Kafkaシンクからファイルシンクへの変更は許可されません。
Kafkaシンクとforeachの相互変換は許可されます。

出力シンクのパラメーターの変更: 許可されるかどうか、変更のセマンティクスがwell-definedになるかどうかは、シンクとクエリーに依存します。いかに例を示します。

ファイルシンクの出力ディレクトリの変更は許可されません。sdf.writeStream.format(“parquet”).option(“path”, “/somePath”)からsdf.writeStream.format(“parquet”).option(“path”, “/anotherPath”)への変更は許可されません。
出力アーティクルの変更は許可されます。sdf.writeStream.format(“kafka”).option(“article”, “somearticle”)からsdf.writeStream.format(“kafka”).option(“path”, “anotherarticle”)への変更は許可されます。
ユーザー定義のforeachシンク(すなわちForeachWriterコード)への変更は許可されますが、変更のセマンティクスはコードに依存します。

projection / filter / mapライクなオペレーションにおける変更: いくつかのケースは許可されます。例えば、

フィルターの追加削除は許可されます: sdf.selectExpr(“a”)からsdf.where(…).selectExpr(“a”).filter(…)への変更は許可されます。
同じ出力スキーマによるprojectionの変更は許可されます: sdf.selectExpr(“stringColumn AS json”).writeStreamからsdf.select(to_json(…).as(“json”)).writeStreamへの変更は許可されます。
異なる出力スキーマによるprojectionの変更は条件付きで許可されます: 出力シンクが”a”から”b”へのスキーマ変更を許可しているのであれば、sdf.selectExpr(“a”).writeStreamからsdf.selectExpr(“b”).writeStreamへの変更は許可されます。

ステートフルなオペレーションにおける変更: 結果を継続的に更新するために、ストリーミングクエリーにおける幾つかのオペレーションはステートデータを維持する必要があります。構造化ストリーミングは自動で状態のチェックポイントをフォールトトレラントなストレージ(例えば、DBFS、AWS S3、Azure Blob storage)に作成し、再起動後にレストアします。しかし、ステートデータのスキーマは再起動の合間で変更がないことを前提としています。これは、再起動の合間にストリーミングクエリーのステートフルオペレーションに対するいかなる変更(追加、削除、スキーマ変更)も許可されないことを意味します。ステートのリカバリーを確実にするために、再起動の合間にスキーマを変更すべきではないステートフルオペレーションのリストを示します。

ストリーミングの集計: 例えば、sdf.groupBy(“a”).agg(…)。グルーピングのキー、集計の数やタイプの変更は許可されません。

ストリーミングの重複排除: 例えば、sdf.dropDuplicates(“a”)。グルーピングのキー、集計の数やタイプの変更は許可されません。

ストリームとストリームのjoin: 例えば、sdf1.join(sdf2, …) (ここでは両方の入力がsparkSession.readStreamで生成されます)。スキーマやjoinカラムの変更は許可されません。joinタイプ(inner/outer)の変更は許可されません。他のjoinの条件の変更の結果も未定になる可能性があります。

任意のステートフルオペレーション: 例えば、sdf.groupByKey(…).mapGroupsWithState(…)や sdf.groupByKey(…).flatMapGroupsWithState(…)。ユーザー定義のステートのスキーマやタイムアウトのタイプの変更は許可されません。ユーザー定義のステートマッピング関数におけるいかなる変更は許可されますが、変更のセマンティックな影響は、ユーザー定義ロジックに依存します。ステートのスキーマ変更をサポートしたいのであれば、明示的に複雑なステートデータの構造を、スキーママイグレーションをサポートするエンコーディング/デコーディングスキーマを用いて、バイトコードにエンコード/デコードすることができます。例えば、Avroエンコードされたバイトコードとしてステートを保存するのであれば、バイナリーのステートは常に問題なくレストアされるので、クエリー再起動の合間にAvroのステートスキーマを自由に変更することができます。

Databricks 免费试用

Databricks 免费试用

广告
将在 10 秒后关闭
bannerAds