Use foreachBatch to write to arbitrary data sinks with Structured Streaming
Structured Streaming APIs provide two ways to write the output of a streaming query to data sources that do not have an existing streaming sink: foreachBatch()
and foreach()
.
Reuse existing batch data sources with foreachBatch()
streamingDF.writeStream.foreachBatch(...)
allows you to specify a function that is executed on the output data of every micro-batch of the streaming query. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. With foreachBatch
, you can:
Reuse existing batch data sources
For many storage systems, there may not be a streaming sink available yet,
but there may already exist a data writer for batch queries. Using foreachBatch()
, you can use the batch data writers
on the output of each micro-batch. Here are a few examples:
Many other batch data sources can be used from foreachBatch()
.
Write to multiple locations
If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
Note
If you are running multiple Spark jobs on the batchDF
, the input data rate of the streaming query
(reported through StreamingQueryProgress
and visible in the notebook rate graph) may be reported
as a multiple of the actual rate at which data is generated at the source. This is because the
input data may be read multiple times in the multiple Spark jobs per batch.
Apply additional DataFrame operations
Many DataFrame and Dataset operations are not supported
in streaming DataFrames because Spark does not support generating incremental plans in those cases.
Using foreachBatch()
you can apply some of these operations on each micro-batch output. For example,
you can use foreachBath()
and the SQL MERGE INTO
operation to write the output of streaming
aggregations into a Delta table in Update mode. See more details in
MERGE INTO.
Important
foreachBatch()
provides only at-least-once write guarantees. However, you can use thebatchId
provided to the function as way to deduplicate the output and get an exactly-once guarantee. In either case, you will have to reason about the end-to-end semantics yourself.foreachBatch()
does not work with the continuous processing mode as it fundamentally relies on the micro-batch execution of a streaming query. If you write data in continuous mode, useforeach()
instead.
An empty dataframe can be invoked with foreachBatch()
and user code needs to be resilient to allow for proper operation.
An example is shown here:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Write to any location using foreach()
If foreachBatch()
is not an option (for example, you are using Databricks Runtime lower than 4.2, or corresponding batch data writer does not exist), then you can express your custom writer logic using foreach()
. Specifically, you can express the data writing logic by dividing it into three methods: open()
, process()
, and close()
.
Using Scala or Java
In Scala or Java, you extend the class ForeachWriter:
datasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// Open connection
}
def process(record: String) = {
// Write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
}
).start()
Using Python
In Python, you can invoke foreach
in two ways: in a function or in an object. The function offers a simple way to express your processing logic but does not allow you to deduplicate generated data when failures cause reprocessing of some input data. For that situation you must specify the processing logic in an object.
The function takes a row as input.
def processRow(row): // Write row to storage query = streamingDF.writeStream.foreach(processRow).start()
The object has a
process
method and optionalopen
andclose
methods:class ForeachWriter: def open(self, partition_id, epoch_id): // Open connection. This method is optional in Python. def process(self, row): // Write row to connection. This method is not optional in Python. def close(self, error): // Close the connection. This method is optional in Python. query = streamingDF.writeStream.foreach(ForeachWriter()).start()
Execution semantics
When the streaming query is started, Spark calls the function or the object’s methods in the following way:
A single copy of this object is responsible for all the data generated by a single task in a query. In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.
This object must be serializable, because each task will get a fresh serialized-deserialized copy of the provided object. Hence, it is strongly recommended that any initialization for writing data (for example, opening a connection or starting a transaction) is done after you call the
open()
method, which signifies that the task is ready to generate data.The lifecycle of the methods are as follows:
For each partition with
partition_id
:For each batch/epoch of streaming data with
epoch_id
:Method
open(partitionId, epochId)
is called.If
open(...)
returns true, for each row in the partition and batch/epoch, methodprocess(row)
is called.Method
close(error)
is called with error (if any) seen while processing rows.The
close()
method (if it exists) is called if anopen()
method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle.
Note
The partitionId
and epochId
in the open()
method can be used to deduplicate
generated data when failures cause reprocessing of some input data. This depends on the
execution mode of the query. If the streaming query is being executed in the micro-batch
mode, then every partition represented by a unique tuple (partition_id, epoch_id)
is guaranteed to have the same data. Hence, (partition_id, epoch_id)
can be used
to deduplicate and/or transactionally commit data and achieve exactly-once
guarantees. However, if the streaming query is being executed in the continuous
mode, then this guarantee does not hold and therefore should not be used for
deduplication.