将任意的数据写入Spark结构化流到任何数据目的地

在Databricks上向任意数据源写入数据的功能

這本書是抄譯的,並不能保證內容的準確性。請參考原文以獲得正確的內容。

对于没有现有流式写入目标的数据源,结构化流API提供了两种方式来将流式查询输出写入,即`foreachBatch()`和`foreach()`。

通过foreachBatch()重新使用现有的批处理数据源

使用streamingDF.writeStream.foreachBatch(…),可以指定一个函数来处理每个流式查询的每个微批次的输出数据。该函数接收两个参数:一个用于保存微批次输出数据的DataFrame或Dataset,以及微批次的唯一ID。使用foreachBatch可以实现以下功能:

重复使用现有的批量数据源

在许多存储系统中,可能无法使用流式传输目标,但如果已经存在对批处理查询的数据写入器,则可以使用foreachBatch()来使用批处理数据写入器对每个微批次的输出进行处理。以下是一些示例。

    • Cassandra Scala example

 

    Azure Synapse Analytics Python example

还可以使用foreachBatch()来使用其他许多批处理数据源。

对多个地方的写作

如果要将流查询的输出写入多个位置,可以简单地多次写入数据框架/数据集的输出。然而,每次写入尝试都可能需要重新计算输出数据,这可能包括重新加载输入数据。为了避免重新计算,可以将输出的数据框架/数据集进行缓存,并将其写入多个位置,然后解除缓存。以下是概述。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}
注意:在执行多个Spark作业的batchDF时,输入数据速率可能以实际速率的倍数来报告,可以在StreamingQueryProgress或笔记本的速率图中查看。这是因为每个批次可能会多次加载输入数据到多个Spark作业中。

应用附加的数据框操作

由于在Spark中,很多情况下不支持增量计划的生成,所以在流数据框中,不支持很多数据框和数据集操作。可以通过使用foreachBatch(),对每个微批处理的输出应用这些操作中的一些。例如,可以使用foreachBatch()和MERGE INTO操作将流聚合处理的输出以更新模式写入Delta表中。详见MERGE INTO的详细信息。

重要!

foreachBatch()仅提供至少一次写入保证。而为了进行输出去重并确保一次性处理的保证,可以将batchId传递给函数。在任一情况下,您都需要自行对端到端语义进行推理。

由于foreachBatch()基本上依赖于流式查询的微批处理执行,因此无法在连续处理模式下工作。当您在连续模式下写入数据时,请改用foreach()。

使用foreach()向任意位置进行写入

如果无法使用foreachBatch()(例如使用Databricks Runtime 4.2或更低版本,或者没有支持的批量数据写入器等),可以使用foreach()来表示自定义写入逻辑。特别地,可以将数据写入逻辑分为三个方法open()、process()和close()来表示。

请参考以下例子:使用Scala和Python中的foreach()方法向Amazon DynamoDB写入数据。

使用Scala或Java

在Scala或Java中,扩展ForeachWriter类。

datasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String) = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

使用Python

在Python中,可以通过两种方式调用foreach。一种是从函数内部调用,另一种是从对象内部调用。函数提供了一种简单的方法来表达处理逻辑,但是当需要重新处理输入数据以消除生成的数据的重复时,函数无法实现这一点。在这种情况下,必须在对象内部指定处理逻辑。

行として入力を受け取る関数

Python
def processRow(row):
// Write row to storage

query = streamingDF.writeStream.foreach(processRow).start()

processメソッドとオプションでopenとcloseメソッドを持つオブジェクト

Python
class ForeachWriter:
def open(self, partition_id, epoch_id):
// Open connection. This method is optional in Python.

def process(self, row):
// Write row to connection. This method is not optional in Python.

def close(self, error):
// Close the connection. This method is optional in Python.

query = streamingDF.writeStream.foreach(ForeachWriter()).start()

执行语义

当流查询启动时,Spark会以以下方式调用函数或对象的方法。

    • このオブジェクトの単一のコピーが、クエリーの単一のタスクで生成されるすべてのデータに責任を持ちます。言い換えると、分散処理で生成されたデータの一つのパーティションの処理に一つのインスタンスが責任を持ちます。

 

    • それぞれのタスクは、提供されたオブジェクトの最新のシリアライズされた、あるいは、デシリアライズされたコピーを取得するので、オブジェクトはシリアライズ可能でなくてはなりません。このため、データ書き込みにおけるいかなる初期化処理は、他タスクがデータ生成をできることを意味するopen()メソッドの呼び出しの後に行うことを強くお勧めします。

 

    • メソッドのライフサイクルは以下のようになります。

partition_idを持つそれぞれのパーティションに対して:

epoch_idを持つそれぞれのストリーミングデータのバッチ/エポックに対して:

open(partitionId, epochId)がコールされます。

open(…)がtrueを返すと、パーティションとバッチ/エポックのそれぞれの行に対してprocess(row)がコールされます。
行を処理している間にエラーが発生するとclose(error)メソッドがコールされます

open()メソッドが存在し、(戻り値に関係なく)戻り値が返却されると、JVMやPythonプロセスが途中でクラッシュしない限り、(存在する場合には)close()メソッドがコールされます。

注意:在处理输入数据发生故障,并需要进行重复排除时,可以使用open()方法的partitionId和epochId。这取决于查询的执行模式。如果在微批处理模式下执行流查询,则保证每个具有唯一元组(partition_id,epoch_id)表示的分区具有相同的数据。因此,(partition_id,epoch_id)可以用于重复排除、事务性地提交数据或实现exactly-once保证。但是,如果流查询在连续模式下执行,则无法提供保证,因此不应该将其用于重复排除。

Databricks 免费试用

Databricks 免费试用

广告
将在 10 秒后关闭
bannerAds