Enable idempotent writes across jobs
Sometimes a job that writes data to a Delta table is restarted due to various reasons (for example, job encounters a failure). The failed job may or may not have written the data to Delta table before terminating. In the case where the data is written to the Delta table, the restarted job writes the same data to the Delta table which results in duplicate data.
To address this, Delta tables support the following DataFrameWriter
options to make the writes idempotent:
txnAppId
: A unique string that you can pass on eachDataFrame
write. For example, this can be the name of the job.txnVersion
: A monotonically increasing number that acts as transaction version. This number needs to be unique for data that is being written to the Delta table(s). For example, this can be the epoch seconds of the instant when the query is attempted for the first time. Any subsequent restarts of the same job needs to have the same value fortxnVersion
.
The above combination of options needs to be unique for each new data that is being ingested into the Delta table and the txnVersion
needs to be higher than the last data that was ingested into the Delta table. For example:
Last successfully written data contains option values as
dailyETL:23423
(txnAppId:txnVersion
).Next write of data should have
txnAppId = dailyETL
andtxnVersion
as at least23424
(one more than the last written datatxnVersion
).Any attempt to write data with
txnAppId = dailyETL
andtxnVersion
as23422
or less is ignored because thetxnVersion
is less than the last recordedtxnVersion
in the table.Attempt to write data with
txnAppId:txnVersion
asanotherETL:23424
is successful writing data to the table as it contains a differenttxnAppId
compared to the same option value in last ingested data.
Warning
This solution assumes that the data being written to Delta table(s) in multiple retries of the job is the same. If a write attempt to a Delta table succeeds but due to some downstream failure there is a second write attempt with same txn options but different data, then that second write attempt will be ignored. This can cause unexpected results.
See the following code for an example:
app_id = ... # A unique string that is used as an application ID.
version = ... # A monotonically increasing number that acts as transaction version.
dataFrame.write.option("txnVersion", version).option("txnAppId", app_id).save(...)
val appId = ... // A unique string that is used as an application ID.
version = ... // A monotonically increasing number that acts as transaction version.
dataFrame.write.option("txnVersion", version).option("txnAppId", appId).save(...)