Enable idempotent writes across jobs

Sometimes a job that writes data to a Delta table is restarted due to various reasons (for example, job encounters a failure). The failed job may or may not have written the data to Delta table before terminating. In the case where the data is written to the Delta table, the restarted job writes the same data to the Delta table which results in duplicate data.

To address this, Delta tables support the following DataFrameWriter options to make the writes idempotent:

  • txnAppId: A unique string that you can pass on each DataFrame write. For example, this can be the name of the job.

  • txnVersion: A monotonically increasing number that acts as transaction version. This number needs to be unique for data that is being written to the Delta table(s). For example, this can be the epoch seconds of the instant when the query is attempted for the first time. Any subsequent restarts of the same job needs to have the same value for txnVersion.

The above combination of options needs to be unique for each new data that is being ingested into the Delta table and the txnVersion needs to be higher than the last data that was ingested into the Delta table. For example:

  • Last successfully written data contains option values as dailyETL:23423 (txnAppId:txnVersion).

  • Next write of data should have txnAppId = dailyETL and txnVersion as at least 23424 (one more than the last written data txnVersion).

  • Any attempt to write data with txnAppId = dailyETL and txnVersion as 23422 or less is ignored because the txnVersion is less than the last recorded txnVersion in the table.

  • Attempt to write data with txnAppId:txnVersion as anotherETL:23424 is successful writing data to the table as it contains a different txnAppId compared to the same option value in last ingested data.

Warning

This solution assumes that the data being written to Delta table(s) in multiple retries of the job is the same. If a write attempt to a Delta table succeeds but due to some downstream failure there is a second write attempt with same txn options but different data, then that second write attempt will be ignored. This can cause unexpected results.

See the following code for an example:

app_id = ... # A unique string that is used as an application ID.
version = ... # A monotonically increasing number that acts as transaction version.

dataFrame.write.option("txnVersion", version).option("txnAppId", app_id).save(...)
val appId = ... // A unique string that is used as an application ID.
version = ... // A monotonically increasing number that acts as transaction version.

dataFrame.write.option("txnVersion", version).option("txnAppId", appId).save(...)