将任意的数据写入Spark结构化流到任何数据目的地
在Databricks上向任意数据源写入数据的功能
对于没有现有流式写入目标的数据源,结构化流API提供了两种方式来将流式查询输出写入,即`foreachBatch()`和`foreach()`。
通过foreachBatch()重新使用现有的批处理数据源
使用streamingDF.writeStream.foreachBatch(…),可以指定一个函数来处理每个流式查询的每个微批次的输出数据。该函数接收两个参数:一个用于保存微批次输出数据的DataFrame或Dataset,以及微批次的唯一ID。使用foreachBatch可以实现以下功能:
重复使用现有的批量数据源
在许多存储系统中,可能无法使用流式传输目标,但如果已经存在对批处理查询的数据写入器,则可以使用foreachBatch()来使用批处理数据写入器对每个微批次的输出进行处理。以下是一些示例。
-
- Cassandra Scala example
- Azure Synapse Analytics Python example
还可以使用foreachBatch()来使用其他许多批处理数据源。
对多个地方的写作
如果要将流查询的输出写入多个位置,可以简单地多次写入数据框架/数据集的输出。然而,每次写入尝试都可能需要重新计算输出数据,这可能包括重新加载输入数据。为了避免重新计算,可以将输出的数据框架/数据集进行缓存,并将其写入多个位置,然后解除缓存。以下是概述。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
应用附加的数据框操作
由于在Spark中,很多情况下不支持增量计划的生成,所以在流数据框中,不支持很多数据框和数据集操作。可以通过使用foreachBatch(),对每个微批处理的输出应用这些操作中的一些。例如,可以使用foreachBatch()和MERGE INTO操作将流聚合处理的输出以更新模式写入Delta表中。详见MERGE INTO的详细信息。
重要!
foreachBatch()仅提供至少一次写入保证。而为了进行输出去重并确保一次性处理的保证,可以将batchId传递给函数。在任一情况下,您都需要自行对端到端语义进行推理。
由于foreachBatch()基本上依赖于流式查询的微批处理执行,因此无法在连续处理模式下工作。当您在连续模式下写入数据时,请改用foreach()。
使用foreach()向任意位置进行写入
如果无法使用foreachBatch()(例如使用Databricks Runtime 4.2或更低版本,或者没有支持的批量数据写入器等),可以使用foreach()来表示自定义写入逻辑。特别地,可以将数据写入逻辑分为三个方法open()、process()和close()来表示。
请参考以下例子:使用Scala和Python中的foreach()方法向Amazon DynamoDB写入数据。
使用Scala或Java
在Scala或Java中,扩展ForeachWriter类。
datasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// Open connection
}
def process(record: String) = {
// Write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
}
).start()
使用Python
在Python中,可以通过两种方式调用foreach。一种是从函数内部调用,另一种是从对象内部调用。函数提供了一种简单的方法来表达处理逻辑,但是当需要重新处理输入数据以消除生成的数据的重复时,函数无法实现这一点。在这种情况下,必须在对象内部指定处理逻辑。
行として入力を受け取る関数
Python
def processRow(row):
// Write row to storage
query = streamingDF.writeStream.foreach(processRow).start()
processメソッドとオプションでopenとcloseメソッドを持つオブジェクト
Python
class ForeachWriter:
def open(self, partition_id, epoch_id):
// Open connection. This method is optional in Python.
def process(self, row):
// Write row to connection. This method is not optional in Python.
def close(self, error):
// Close the connection. This method is optional in Python.
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
执行语义
当流查询启动时,Spark会以以下方式调用函数或对象的方法。
-
- このオブジェクトの単一のコピーが、クエリーの単一のタスクで生成されるすべてのデータに責任を持ちます。言い換えると、分散処理で生成されたデータの一つのパーティションの処理に一つのインスタンスが責任を持ちます。
-
- それぞれのタスクは、提供されたオブジェクトの最新のシリアライズされた、あるいは、デシリアライズされたコピーを取得するので、オブジェクトはシリアライズ可能でなくてはなりません。このため、データ書き込みにおけるいかなる初期化処理は、他タスクがデータ生成をできることを意味するopen()メソッドの呼び出しの後に行うことを強くお勧めします。
-
- メソッドのライフサイクルは以下のようになります。
partition_idを持つそれぞれのパーティションに対して:
epoch_idを持つそれぞれのストリーミングデータのバッチ/エポックに対して:
open(partitionId, epochId)がコールされます。
open(…)がtrueを返すと、パーティションとバッチ/エポックのそれぞれの行に対してprocess(row)がコールされます。
行を処理している間にエラーが発生するとclose(error)メソッドがコールされます
open()メソッドが存在し、(戻り値に関係なく)戻り値が返却されると、JVMやPythonプロセスが途中でクラッシュしない限り、(存在する場合には)close()メソッドがコールされます。
Databricks 免费试用
Databricks 免费试用