從結構化串流查詢的故障中恢復
处理结构化流查询故障 | 在AWS上的Databricks [截至2022年8月25日]
使用Databricks工作流,结构化流可以提供故障容忍性和数据一致性。可以通过设置结构化流重新启动以便在故障时重新启动。通过启用流查询的检查点创建,可以在故障后重新启动查询。重新启动的查询将从故障点继续处理。
启用结构化流查询的检查点创建。
以下是一个例子,在开始查询之前,建议始终指定checkpointLocation选项。
streamingDataFrame.writeStream
.format("parquet")
.option("path", "/path/to/table")
.option("checkpointLocation", "/path/to/table/_checkpoint")
.start()
这个检查点的位置将保存确定查询所需的所有重要信息。应为每个查询设置一个检查点位置。不应在多个查询中使用相同的位置。有关详细信息,请参阅Structured Streaming 编程指南。
将结构化流作业配置为重新启动失败的流查询。
可以使用带有流式查询的笔记本或JAR创建Databricks作业,并进行以下设置。
-
- 常に新規クラスターを使用する
- 障害時には常にリトライする
作业与结构化流API密切集成,可以监视所有活动查询的运行状态。通过这个设置,当一部分查询失败时,作业会自动停止处理(与其他所有查询一起),然后在新集群上重新启动处理。通过重新执行此笔记本或JAR代码,所有查询将再次启动运行,这是恢复到良好状态的最安全方法。
我将展示以下关于推荐工作设置的示例。
クラスター: 常に新規クラスターを設定し、最新のSparkバージョン(あるいは最低でもバージョン2.1)を使う様にします。Spark 2.1以降で起動されたクエリーは、クエリーの実行後やSparkのバージョンアップ後に復旧可能です。
通知: 失敗時にメールの通知が必要であれば設定します。
スケジュール: スケジュールを設定しません。
タイムアウト: タイムアウトを設定しません。 ストリーミングクエリーは永遠に実行されます。
最大同時実行数: 1に設定します。それぞれのクエリーのインスタンスは1つ存在する様にします。
リトライ: Unlimitedに設定します。
请查看Databricks中的作业管理以理解这些配置。
在对结构化流查询进行更改后恢复。
从同一检查点位置重新启动之前,对于允许进行流查询更改的内容有一些限制。以下是一些不被允许或对变更影响未定义的变更类型。
許可されるという用語は、その変更を行うことができますが、クエリーと変更内容に基づいてその影響のセマンティクスは決定されます。
許可されないという用語は、クエリーの再起動は予期しないエラーで失敗するため、その変更は行うべきではないことを意味します。
sdfはsparkSession.readStreamで生成されたストリーミングデータフレーム/データセットを表現します。
結構化流式查询的变更类型
入力ソースの数やタイプの変更(異なるソース): これは許可されません。
入力ソースのパラメーターの変更: 許可されるかどうか、変更のセマンティクスが決定的かどうかはソースとクエリーに依存します。以下に例を示します。
レートリミットの追加、削除、変更は許可されます。
Scala
spark.readStream.format(“kafka”).option(“subscribe”, “article”)
から
Scala
spark.readStream.format(“kafka”).option(“subscribe”, “article”).option(“maxOffsetsPerTrigger”, …)
サブスクライブしているアーティクルやファイルの変更は、結果が予測できないため通常は許可されません。
spark.readStream.format(“kafka”).option(“subscribe”, “article”)からspark.readStream.format(“kafka”).option(“subscribe”, “newarticle”)への変更
出力シンクの変更: いくつかの特定のシンクの組み合わせ間の変更は許可されます。ケースバイケースで検証する必要があります。以下にいくつか例を示します。
ファイルシンクからKafkaシンクへの変更は許可されます。Kafkaは新規データのみを参照します。
Kafkaシンクからファイルシンクへの変更は許可されません。
Kafkaシンクとforeachの相互変換は許可されます。
出力シンクのパラメーターの変更: 許可されるかどうか、変更のセマンティクスがwell-definedになるかどうかは、シンクとクエリーに依存します。いかに例を示します。
ファイルシンクの出力ディレクトリの変更は許可されません。sdf.writeStream.format(“parquet”).option(“path”, “/somePath”)からsdf.writeStream.format(“parquet”).option(“path”, “/anotherPath”)への変更は許可されません。
出力アーティクルの変更は許可されます。sdf.writeStream.format(“kafka”).option(“article”, “somearticle”)からsdf.writeStream.format(“kafka”).option(“path”, “anotherarticle”)への変更は許可されます。
ユーザー定義のforeachシンク(すなわちForeachWriterコード)への変更は許可されますが、変更のセマンティクスはコードに依存します。
projection / filter / mapライクなオペレーションにおける変更: いくつかのケースは許可されます。例えば、
フィルターの追加削除は許可されます: sdf.selectExpr(“a”)からsdf.where(…).selectExpr(“a”).filter(…)への変更は許可されます。
同じ出力スキーマによるprojectionの変更は許可されます: sdf.selectExpr(“stringColumn AS json”).writeStreamからsdf.select(to_json(…).as(“json”)).writeStreamへの変更は許可されます。
異なる出力スキーマによるprojectionの変更は条件付きで許可されます: 出力シンクが”a”から”b”へのスキーマ変更を許可しているのであれば、sdf.selectExpr(“a”).writeStreamからsdf.selectExpr(“b”).writeStreamへの変更は許可されます。
ステートフルなオペレーションにおける変更: 結果を継続的に更新するために、ストリーミングクエリーにおける幾つかのオペレーションはステートデータを維持する必要があります。構造化ストリーミングは自動で状態のチェックポイントをフォールトトレラントなストレージ(例えば、DBFS、AWS S3、Azure Blob storage)に作成し、再起動後にレストアします。しかし、ステートデータのスキーマは再起動の合間で変更がないことを前提としています。これは、再起動の合間にストリーミングクエリーのステートフルオペレーションに対するいかなる変更(追加、削除、スキーマ変更)も許可されないことを意味します。ステートのリカバリーを確実にするために、再起動の合間にスキーマを変更すべきではないステートフルオペレーションのリストを示します。
ストリーミングの集計: 例えば、sdf.groupBy(“a”).agg(…)。グルーピングのキー、集計の数やタイプの変更は許可されません。
ストリーミングの重複排除: 例えば、sdf.dropDuplicates(“a”)。グルーピングのキー、集計の数やタイプの変更は許可されません。
ストリームとストリームのjoin: 例えば、sdf1.join(sdf2, …) (ここでは両方の入力がsparkSession.readStreamで生成されます)。スキーマやjoinカラムの変更は許可されません。joinタイプ(inner/outer)の変更は許可されません。他のjoinの条件の変更の結果も未定になる可能性があります。
任意のステートフルオペレーション: 例えば、sdf.groupByKey(…).mapGroupsWithState(…)や sdf.groupByKey(…).flatMapGroupsWithState(…)。ユーザー定義のステートのスキーマやタイムアウトのタイプの変更は許可されません。ユーザー定義のステートマッピング関数におけるいかなる変更は許可されますが、変更のセマンティックな影響は、ユーザー定義ロジックに依存します。ステートのスキーマ変更をサポートしたいのであれば、明示的に複雑なステートデータの構造を、スキーママイグレーションをサポートするエンコーディング/デコーディングスキーマを用いて、バイトコードにエンコード/デコードすることができます。例えば、Avroエンコードされたバイトコードとしてステートを保存するのであれば、バイナリーのステートは常に問題なくレストアされるので、クエリー再起動の合間にAvroのステートスキーマを自由に変更することができます。
Databricks 免费试用
Databricks 免费试用