Table deletes, updates, and merges
Delta Lake supports several statements to facilitate deleting data from and updating data in Delta tables.
For an overview and demonstration of deleting and updating data in Delta Lake, watch this YouTube video (54 minutes).
For additional information about capturing change data from Delta Lake, watch this YouTube video (53 minutes).
Delete from a table
You can remove data that matches a predicate from a Delta table. For instance, in a table named people10m
or a path at /tmp/delta/people-10m
, to delete all rows corresponding to people with a value in the birthDate
column from before 1955
, you can run the following:
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Note
The Python API is available in Databricks Runtime 6.1 and above.
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Note
The Scala API is available in Databricks Runtime 6.0 and above.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Note
The Java API is available in Databricks Runtime 6.0 and above.
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m");
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'");
// Declare the predicate by using Spark SQL functions.
deltaTable.delete(functions.col("birthDate").lt(functions.lit("1955-01-01")));
See the Delta Lake APIs for details.
Important
delete
removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed. See vacuum for details.
Tip
When possible, provide predicates on the partition columns for a partitioned Delta table as such predicates can significantly speed up the operation.
Update a table
You can update data that matches a predicate in a Delta table. For example, in a table named people10m
or a path at /tmp/delta/people-10m
, to change an abbreviation in the gender
column from M
or F
to Male
or Female
, you can run the following:
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Note
The Python API is available in Databricks Runtime 6.1 and above.
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Note
The Scala API is available in Databricks Runtime 6.0 and above.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Note
The Scala API is available in Databricks Runtime 6.0 and above.
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
new HashMap<String, String>() {{
put("gender", "'Female'");
}}
);
// Declare the predicate by using Spark SQL functions.
deltaTable.update(
functions.col(gender).eq("M"),
new HashMap<String, Column>() {{
put("gender", functions.lit("Male"));
}}
);
See the Delta Lake APIs for details.
Tip
Similar to delete, update operations can get a significant speedup with predicates on partitions.
Upsert into a table using merge
You can upsert data from a source table, view, or DataFrame into a target Delta table by using the MERGE
SQL operation. Delta Lake supports inserts, updates and deletes in MERGE
, and it supports extended syntax beyond the SQL standards to facilitate advanced use cases.
Suppose you have a source table named people10mupdates
or a source path at /tmp/delta/people-10m-updates
that contains new data for a target table named people10m
or a target path at /tmp/delta/people-10m
. Some of these new records may already be present in the target data. To merge the new data, you want to update rows where the person’s id
is already present and insert the new rows where no matching id
is present. You can run the following:
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
For syntax details, see
Databricks Runtime 7.x and above: MERGE INTO
Databricks Runtime 5.5 LTS and 6.x: _
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
Dataset<Row> dfUpdates = spark.read("delta").load("/tmp/delta/people-10m-updates")
deltaTable
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched()
.updateExpr(
new HashMap<String, String>() {{
put("id", "updates.id");
put("firstName", "updates.firstName");
put("middleName", "updates.middleName");
put("lastName", "updates.lastName");
put("gender", "updates.gender");
put("birthDate", "updates.birthDate");
put("ssn", "updates.ssn");
put("salary", "updates.salary");
}})
.whenNotMatched()
.insertExpr(
new HashMap<String, String>() {{
put("id", "updates.id");
put("firstName", "updates.firstName");
put("middleName", "updates.middleName");
put("lastName", "updates.lastName");
put("gender", "updates.gender");
put("birthDate", "updates.birthDate");
put("ssn", "updates.ssn");
put("salary", "updates.salary");
}})
.execute();
See the Delta Lake APIs for Scala, Java, and Python syntax details.
Delta Lake merge operations typically require two passes over the source data. If your source data contains nondeterministic expressions, multiple passes on the source data can produce different rows causing incorrect results. Some common examples of nondeterministic expressions include the current_date
and current_timestamp
functions. If you cannot avoid using non-deterministic functions, consider saving the source data to storage, for example as a temporary Delta table. Caching the source data may not address this issue, as cache invalidation can cause the source data to be recomputed partially or completely (for example when a cluster loses some of it executors when scaling down).
Operation semantics
Here is a detailed description of the merge
programmatic operation.
There can be any number of
whenMatched
andwhenNotMatched
clauses.Note
In Databricks Runtime 7.2 and below,
merge
can have at most 2whenMatched
clauses and at most 1whenNotMatched
clause.whenMatched
clauses are executed when a source row matches a target table row based on the match condition. These clauses have the following semantics.whenMatched
clauses can have at most oneupdate
and onedelete
action. Theupdate
action inmerge
only updates the specified columns (similar to theupdate
operation) of the matched target row. Thedelete
action deletes the matched row.Each
whenMatched
clause can have an optional condition. If this clause condition exists, theupdate
ordelete
action is executed for any matching source-target row pair only when the clause condition is true.If there are multiple
whenMatched
clauses, then they are evaluated in the order they are specified. AllwhenMatched
clauses, except the last one, must have conditions.If none of the
whenMatched
conditions evaluate to true for a source and target row pair that matches the merge condition, then the target row is left unchanged.To update all the columns of the target Delta table with the corresponding columns of the source dataset, use
whenMatched(...).updateAll()
. This is equivalent to:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query throws an analysis error.
Note
This behavior changes when automatic schema migration is enabled. See Automatic schema evolution for details.
whenNotMatched
clauses are executed when a source row does not match any target row based on the match condition. These clauses have the following semantics.whenNotMatched
clauses can have only theinsert
action. The new row is generated based on the specified column and corresponding expressions. You do not need to specify all the columns in the target table. For unspecified target columns,NULL
is inserted.Note
In Databricks Runtime 6.5 and below, you must provide all the columns in the target table for the
INSERT
action.Each
whenNotMatched
clause can have an optional condition. If the clause condition is present, a source row is inserted only if that condition is true for that row. Otherwise, the source column is ignored.If there are multiple
whenNotMatched
clauses, then they are evaluated in the order they are specified. AllwhenNotMatched
clauses, except the last one, must have conditions.To insert all the columns of the target Delta table with the corresponding columns of the source dataset, use
whenNotMatched(...).insertAll()
. This is equivalent to:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query throws an analysis error.
Note
This behavior changes when automatic schema migration is enabled. See Automatic schema evolution for details.
Important
A
merge
operation can fail if multiple rows of the source dataset match and the merge attempts to update the same rows of the target Delta table. According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. You can preprocess the source table to eliminate the possibility of multiple matches. See the change data capture example—it shows how to preprocess the change dataset (that is, the source dataset) to retain only the latest change for each key before applying that change into the target Delta table.A
merge
operation can produce incorrect results if the source dataset is non-deterministic. This is becausemerge
may perform two scans of the source dataset and if the data produced by the two scans are different, the final changes made to the table can be incorrect. Non-determinism in the source can arise in many ways. Some of them are as follows:Reading from non-Delta tables. For example, reading from a CSV table where the underlying files can change between the multiple scans.
Using non-deterministic operations. For example,
Dataset.filter()
operations that uses current timestamp to filter data can produce different results between the multiple scans.
You can apply a SQL
MERGE
operation on a SQL VIEW only if the view has been defined asCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Note
In Databricks Runtime 7.3 LTS and above, multiple matches are allowed when matches are unconditionally deleted (since unconditional delete is not ambiguous even if there are multiple matches).
Schema validation
merge
automatically validates that the schema of the data generated by insert and update expressions are compatible with the schema of the table. It uses the following rules to determine whether the merge
operation is compatible:
For
update
andinsert
actions, the specified target columns must exist in the target Delta table.For
updateAll
andinsertAll
actions, the source dataset must have all the columns of the target Delta table. The source dataset can have extra columns and they are ignored.For all actions, if the data type generated by the expressions producing the target columns are different from the corresponding columns in the target Delta table,
merge
tries to cast them to the types in the table.
Automatic schema evolution
Note
Schema evolution in merge
is available in Databricks Runtime 6.6 and above.
By default, updateAll
and insertAll
assign all the columns in the target Delta table with columns of the same name from the source dataset. Any columns in the source dataset that don’t match columns in the target table are ignored. However, in some use cases, it is desirable to automatically add source columns to the target Delta table. To automatically update the table schema during a merge
operation with updateAll
and insertAll
(at least one of them), you can set the Spark session configuration spark.databricks.delta.schema.autoMerge.enabled
to true
before running the merge
operation.
Note
Schema evolution occurs only when there is either an
updateAll
(UPDATE SET *
) or aninsertAll
(INSERT *
) action, or both.update
andinsert
actions cannot explicitly refer to target columns that do not already exist in the target table (even it there areupdateAll
orinsertAll
as one of the clauses). See the examples below.
Note
In Databricks Runtime 7.4 and below, merge
supports schema evolution of only top-level columns, and not of nested columns.
Here are a few examples of the effects of merge
operation with and without schema evolution.
Columns |
Query (in Scala) |
Behavior without schema evolution (default) |
Behavior with schema evolution |
---|---|---|---|
Target columns: Source columns: |
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
|
The table schema remains unchanged; only columns |
The table schema is changed to |
Target columns: Source columns: |
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
|
|
The table schema is changed to |
Target columns: Source columns: |
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().update(Map(
"newValue" -> col("s.newValue")))
.whenNotMatched().insertAll()
.execute()
|
|
|
Target columns: Source columns: |
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insert(Map(
"key" -> col("s.key"),
"newValue" -> col("s.newValue")))
.execute()
|
|
|
Special considerations for schemas that contain arrays of structs
Delta MERGE INTO
supports resolving struct fields by name and evolving schemas for arrays of structs. With schema evolution enabled, target table schemas will evolve for arrays of structs, which also works with any nested structs inside of arrays.
Note
This feature is available in Databricks Runtime 9.1 and above. For Databricks Runtime 9.0 and below, implicit Spark casting is used for arrays of structs to resolve struct fields by position, and the effects of merge operations with and without schema evolution of structs in arrays are inconsistent with the behaviors of structs outside of arrays.
Here are a few examples of the effects of merge operations with and without schema evolution for arrays of structs.
Source schema |
Target schema |
Behavior without schema evolution (default) |
Behavior with schema evolution |
---|---|---|---|
array<struct<b: string, a: string>> |
array<struct<a: int, b: int>> |
The table schema remains unchanged. Columns will be resolved by name and updated or inserted. |
The table schema remains unchanged. Columns will be resolved by name and updated or inserted. |
array<struct<a: int, c: string, d: string>> |
array<struct<a: string, b: string>> |
|
The table schema is changed to array<struct<a: string, b: string, c: string, d: string>>. |
array<struct<a: string, b: struct<c: string, d: string>>> |
array<struct<a: string, b: struct<c: string>>> |
|
The target table schema is changed to array<struct<a: string, b: struct<c: string, d: string>>>. |
Performance tuning
You can reduce the time taken by merge using the following approaches:
Reduce the search space for matches: By default, the
merge
operation searches the entire Delta table to find matches in the source table. One way to speed upmerge
is to reduce the search space by adding known constraints in the match condition. For example, suppose you have a table that is partitioned bycountry
anddate
and you want to usemerge
to update information for the last day and a specific country. Adding the conditionevents.date = current_date() AND events.country = 'USA'
will make the query faster as it looks for matches only in the relevant partitions. Furthermore, it will also reduce the chances of conflicts with other concurrent operations. See Concurrency control for more details.
Compact files: If the data is stored in many small files, reading the data to search for matches can become slow. You can compact small files into larger files to improve read throughput. See Compact files for details.
Control the shuffle partitions for writes: The
merge
operation shuffles data multiple times to compute and write the updated data. The number of tasks used to shuffle is controlled by the Spark session configurationspark.sql.shuffle.partitions
. Setting this parameter not only controls the parallelism but also determines the number of output files. Increasing the value increases parallelism but also generates a larger number of smaller data files.
Enable optimized writes: For partitioned tables,
merge
can produce a much larger number of small files than the number of shuffle partitions. This is because every shuffle task can write multiple files in multiple partitions, and can become a performance bottleneck. You can reduce the number of files by enabling Optimized Write.
Note
In Databricks Runtime 7.4 and above, Optimized Write is automatically enabled in merge
operations on partitioned tables.
Tune file sizes in table: In Databricks Runtime 8.2 and above, Databricks can automatically detect if a Delta table has frequent
merge
operations that rewrite files and may choose to reduce the size of rewritten files in anticipation of further file rewrites in the future. See the section on tuning file sizes for details.Low Shuffle Merge: In Databricks Runtime 9.0 and above, Low Shuffle Merge provides an optimized implementation of
MERGE
that provides better performance for most common workloads. In addition, it preserves existing data layout optimizations such as Z-ordering on unmodified data.
Merge examples
Here are a few examples on how to use merge
in different scenarios.
In this section:
Data deduplication when writing into Delta tables
A common ETL use case is to collect logs into Delta table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With merge
, you can avoid inserting the duplicate records.
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Note
The dataset containing the new logs needs to be deduplicated within itself. By the SQL semantics of merge, it matches and deduplicates the new data with the existing data in the table, but if there is duplicate data within the new dataset, it is inserted. Hence, deduplicate the new data before merging into the table.
If you know that you may get duplicate records only for a few days, you can optimized your query further by partitioning the table by date, and then specifying the date range of the target table to match on.
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
This is more efficient than the previous command as it looks for duplicates only in the last 7 days of logs, not the entire table. Furthermore, you can use this insert-only merge with Structured Streaming to perform continuous deduplication of the logs.
In a streaming query, you can use merge operation in
foreachBatch
to continuously write any streaming data to a Delta table with deduplication. See the following streaming example for more information onforeachBatch
.In another streaming query, you can continuously read deduplicated data from this Delta table. This is possible because an insert-only merge only appends new data to the Delta table.
Note
Insert-only merge is optimized to only append data in Databricks Runtime 6.2 and above. In Databricks Runtime 6.1 and below, writes from insert-only merge operations cannot be read as a stream.
Slowly changing data (SCD) Type 2 operation into Delta tables
Another common operation is SCD Type 2, which maintains history of all changes made to each key in a dimensional table. Such operations require updating existing rows to mark previous values of keys as old, and the inserting the new rows as the latest values. Given a source table with updates and the target table with the dimensional data, SCD Type 2 can be expressed with merge
.
Here is a concrete example of maintaining the history of addresses for a customer along with the active date range of each address. When a customer’s address needs to be updated, you have to mark the previous address as not the current one, update its active date range, and add the new address as the current one.
Write change data into a Delta table
Similar to SCD, another common use case, often called change data capture (CDC), is to apply
all data changes generated from an external database into a Delta table. In other words, a set
of updates, deletes, and inserts applied to an external table needs to be applied to a Delta table.
You can do this using merge
as follows.
Upsert from streaming queries using foreachBatch
You can use a combination of merge
and foreachBatch
(see foreachbatch for more information) to write complex upserts from a streaming query into a Delta table. For example:
Write streaming aggregates in Update Mode: This is much more efficient than Complete Mode.
Write a stream of database changes into a Delta table: The merge query for writing change data can be used in
foreachBatch
to continuously apply a stream of changes to a Delta table.Write a stream data into Delta table with deduplication: The insert-only merge query for deduplication can be used in
foreachBatch
to continuously write data (with duplicates) to a Delta table with automatic deduplication.
Note
Make sure that your
merge
statement insideforeachBatch
is idempotent as restarts of the streaming query can apply the operation on the same batch of data multiple times.When
merge
is used inforeachBatch
, the input data rate of the streaming query (reported throughStreamingQueryProgress
and visible in the notebook rate graph) may be reported as a multiple of the actual rate at which data is generated at the source. This is becausemerge
reads the input data multiple times causing the input metrics to be multiplied. If this is a bottleneck, you can cache the batch DataFrame beforemerge
and then uncache it aftermerge
.