Table utility commands
Delta tables support a number of utility commands.
Remove files no longer referenced by a Delta table
You can remove files no longer referenced by a Delta table and are older than the retention
threshold by running the vacuum
command on the table. vacuum
is not triggered automatically. The
default retention threshold for the files is 7 days. To change this behavior, see Data retention.
Important
vacuum
removes all files from directories not managed by Delta Lake, ignoring directories beginning with_
. If you are storing additional metadata like Structured Streaming checkpoints within a Delta table directory, use a directory name such as_checkpoints
.vacuum
deletes only data files, not log files. Log files are deleted automatically and asynchronously after checkpoint operations. The default retention period of log files is 30 days, configurable through thedelta.logRetentionDuration
property which you set with theALTER TABLE SET TBLPROPERTIES
SQL method. See Table properties.The ability to time travel back to a version older than the retention period is lost after running
vacuum
.
Note
When the Delta cache is enabled, a cluster might contain data from Parquet files that have been deleted with vacuum
. Therefore, it may be possible to query the data of previous table versions whose files have been deleted. Restarting the cluster will remove the cached data. See Configure the Delta cache.
VACUUM eventsTable -- vacuum files not required by versions older than the default retention period
VACUUM '/data/events' -- vacuum files in path-based table
VACUUM delta.`/data/events/`
VACUUM delta.`/data/events/` RETAIN 100 HOURS -- vacuum files not required by versions more than 100 hours old
VACUUM eventsTable DRY RUN -- do dry run to get the list of files to be deleted
For Spark SQL syntax details, see
Databricks Runtime 7.x and above: VACUUM
Databricks Runtime 5.5 LTS and 6.x: _
Note
The Python API is available in Databricks Runtime 6.1 and above.
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable) # path-based tables, or
deltaTable = DeltaTable.forName(spark, tableName) # Hive metastore-based tables
deltaTable.vacuum() # vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) # vacuum files not required by versions more than 100 hours old
Note
The Scala API is available in Databricks Runtime 6.0 and above.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) // vacuum files not required by versions more than 100 hours old
Note
The Java API is available in Databricks Runtime 6.0 and above.
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
deltaTable.vacuum(); // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100); // vacuum files not required by versions more than 100 hours old
See the Delta Lake APIs for Scala, Java, and Python syntax details.
Warning
It is recommended that you set a retention interval to be at least 7 days,
because old snapshots and uncommitted files can still be in use by concurrent
readers or writers to the table. If VACUUM
cleans up active files,
concurrent readers can fail or, worse, tables can be corrupted when VACUUM
deletes files that have not yet been committed. You must choose an interval
that is longer than the longest running concurrent transaction and the longest
period that any stream can lag behind the most recent update to the table.
Delta Lake has a safety check to prevent you from running a dangerous VACUUM
command. If you are certain that there are no operations being performed on
this table that take longer than the retention interval you plan to specify,
you can turn off this safety check by setting the Spark configuration property
spark.databricks.delta.retentionDurationCheck.enabled
to false
.
Retrieve Delta table history
You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table
by running the history
command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.
DESCRIBE HISTORY '/data/events/' -- get the full history of the table
DESCRIBE HISTORY delta.`/data/events/`
DESCRIBE HISTORY '/data/events/' LIMIT 1 -- get the last operation only
DESCRIBE HISTORY eventsTable
For Spark SQL syntax details, see
Databricks Runtime 7.x and above: DESCRIBE HISTORY (Delta Lake on Databricks)
Databricks Runtime 5.5 LTS and 6.x: _
Note
The Python API is available in Databricks Runtime 6.1 and above.
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
fullHistoryDF = deltaTable.history() # get the full history of the table
lastOperationDF = deltaTable.history(1) # get the last operation
Note
The Scala API is available in Databricks Runtime 6.0 and above.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
val fullHistoryDF = deltaTable.history() // get the full history of the table
val lastOperationDF = deltaTable.history(1) // get the last operation
Note
The Java API is available in Databricks Runtime 6.0 and above.
import io.delta.tables.*;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
DataFrame fullHistoryDF = deltaTable.history(); // get the full history of the table
DataFrame lastOperationDF = deltaTable.history(1); // fetch the last operation on the DeltaTable
See the Delta Lake APIs for Scala/Java/Python syntax details.
History schema
The output of the history
operation has the following columns.
Column |
Type |
Description |
---|---|---|
version |
long |
Table version generated by the operation. |
timestamp |
timestamp |
When this version was committed. |
userId |
string |
ID of the user that ran the operation. |
userName |
string |
Name of the user that ran the operation. |
operation |
string |
Name of the operation. |
operationParameters |
map |
Parameters of the operation (for example, predicates.) |
job |
struct |
Details of the job that ran the operation. |
notebook |
struct |
Details of notebook from which the operation was run. |
clusterId |
string |
ID of the cluster on which the operation ran. |
readVersion |
long |
Version of the table that was read to perform the write operation. |
isolationLevel |
string |
Isolation level used for this operation. |
isBlindAppend |
boolean |
Whether this operation appended data. |
operationMetrics |
map |
Metrics of the operation (for example, number of rows and files modified.) |
userMetadata |
string |
User-defined commit metadata if it was specified |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Note
Operation metrics are available only when the history command and the operation in the history were run using Databricks Runtime 6.5 or above.
A few of the other columns are not available if you write into a Delta table using the following methods:
Columns added in the future will always be added after the last column.
Operation metrics keys
The history
operation returns a collection of operations metrics in the operationMetrics
column map.
The following tables list the map key definitions by operation.
Operation |
Metric name |
Description |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO |
||
numFiles |
Number of files written. |
|
numOutputBytes |
Size in bytes of the written contents. |
|
numOutputRows |
Number of rows written. |
|
STREAMING UPDATE |
||
numAddedFiles |
Number of files added. |
|
numRemovedFiles |
Number of files removed. |
|
numOutputRows |
Number of rows written. |
|
numOutputBytes |
Size of write in bytes. |
|
DELETE |
||
numAddedFiles |
Number of files added. Not provided when partitions of the table are deleted. |
|
numRemovedFiles |
Number of files removed. |
|
numDeletedRows |
Number of rows removed. Not provided when partitions of the table are deleted. |
|
numCopiedRows |
Number of rows copied in the process of deleting files. |
|
executionTimeMs |
Time taken to execute the entire operation. |
|
scanTimeMs |
Time taken to scan the files for matches. |
|
rewriteTimeMs |
Time taken to rewrite the matched files. |
|
TRUNCATE |
||
numRemovedFiles |
Number of files removed. |
|
executionTimeMs |
Time taken to execute the entire operation. |
|
MERGE |
||
numSourceRows |
Number of rows in the source DataFrame. |
|
numTargetRowsInserted |
Number of rows inserted into the target table. |
|
numTargetRowsUpdated |
Number of rows updated in the target table. |
|
numTargetRowsDeleted |
Number of rows deleted in the target table. |
|
numTargetRowsCopied |
Number of target rows copied. |
|
numOutputRows |
Total number of rows written out. |
|
numTargetFilesAdded |
Number of files added to the sink(target). |
|
numTargetFilesRemoved |
Number of files removed from the sink(target). |
|
executionTimeMs |
Time taken to execute the entire operation. |
|
scanTimeMs |
Time taken to scan the files for matches. |
|
rewriteTimeMs |
Time taken to rewrite the matched files. |
|
UPDATE |
||
numAddedFiles |
Number of files added. |
|
numRemovedFiles |
Number of files removed. |
|
numUpdatedRows |
Number of rows updated. |
|
numCopiedRows |
Number of rows just copied over in the process of updating files. |
|
executionTimeMs |
Time taken to execute the entire operation. |
|
scanTimeMs |
Time taken to scan the files for matches. |
|
rewriteTimeMs |
Time taken to rewrite the matched files. |
|
FSCK |
numRemovedFiles |
Number of files removed. |
CONVERT |
numConvertedFiles |
Number of Parquet files that have been converted. |
OPTIMIZE |
||
numAddedFiles |
Number of files added. |
|
numRemovedFiles |
Number of files optimized. |
|
numAddedBytes |
Number of bytes added after the table was optimized. |
|
numRemovedBytes |
Number of bytes removed. |
|
minFileSize |
Size of the smallest file after the table was optimized. |
|
p25FileSize |
Size of the 25th percentile file after the table was optimized. |
|
p50FileSize |
Median file size after the table was optimized. |
|
p75FileSize |
Size of the 75th percentile file after the table was optimized. |
|
maxFileSize |
Size of the largest file after the table was optimized. |
Operation |
Metric name |
Description |
---|---|---|
CLONE (1) |
||
sourceTableSize |
Size in bytes of the source table at the version that’s cloned. |
|
sourceNumOfFiles |
Number of files in the source table at the version that’s cloned. |
|
numRemovedFiles |
Number of files removed from the target table if a previous Delta table was replaced. |
|
removedFilesSize |
Total size in bytes of the files removed from the target table if a previous Delta table was replaced. |
|
numCopiedFiles |
Number of files that were copied over to the new location. 0 for shallow clones. |
|
copiedFilesSize |
Total size in bytes of the files that were copied over to the new location. 0 for shallow clones. |
|
RESTORE (2) |
||
tableSizeAfterRestore |
Table size in bytes after restore. |
|
numOfFilesAfterRestore |
Number of files in the table after restore. |
|
numRemovedFiles |
Number of files removed by the restore operation. |
|
numRestoredFiles |
Number of files that were added as a result of the restore. |
|
removedFilesSize |
Size in bytes of files removed by the restore. |
|
restoredFilesSize |
Size in bytes of files added by the restore. |
|
VACUUM (3) |
||
numDeletedFiles |
Number of deleted files. |
|
numVacuumedDirectories |
Number of vacuumed directories. |
|
numFilesToDelete |
Number of files to delete. |
(1) Requires Databricks Runtime 7.3 LTS or above.
(2) Requires Databricks Runtime 7.4 or above.
(3) Requires Databricks Runtime 8.2 or above.
Retrieve Delta table details
You can retrieve detailed information about a Delta table (for example, number of files, data size) using DESCRIBE DETAIL
.
DESCRIBE DETAIL '/data/events/'
DESCRIBE DETAIL eventsTable
For Spark SQL syntax details, see
Databricks Runtime 7.x and above: DESCRIBE DETAIL
Databricks Runtime 5.5 LTS and 6.x: _
Detail schema
The output of this operation has only one row with the following schema.
Column |
Type |
Description |
---|---|---|
format |
string |
Format of the table, that is, |
id |
string |
Unique ID of the table. |
name |
string |
Name of the table as defined in the metastore. |
description |
string |
Description of the table. |
location |
string |
Location of the table. |
createdAt |
timestamp |
When the table was created. |
lastModified |
timestamp |
When the table was last modified. |
partitionColumns |
array of strings |
Names of the partition columns if the table is partitioned. |
numFiles |
long |
Number of the files in the latest version of the table. |
sizeInBytes |
int |
The size of the latest snapshot of the table in bytes. |
properties |
string-string map |
All the properties set for this table. |
minReaderVersion |
int |
Minimum version of readers (according to the log protocol) that can read the table. |
minWriterVersion |
int |
Minimum version of writers (according to the log protocol) that can write to the table. |
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format| id| name|description| location| createdAt| lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable| null|file:/Users/tuor/...|2020-06-05 12:20:...|2020-06-05 12:20:20| []| 10| 12345| []| 1| 2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
Generate a manifest file
You can generate a manifest file for a Delta table that can be used by other processing engines (that is, other than Apache Spark) to read the Delta table. For example, to generate a manifest file that can be used by <PrestoAnd> to read a Delta table, you run the following:
GENERATE symlink_format_manifest FOR TABLE delta.`/tmp/events`
GENERATE symlink_format_manifest FOR TABLE eventsTable
Note
The Python API is available in Databricks Runtime 6.3 and above.
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
Note
The Scala API is available in Databricks Runtime 6.3 and above.
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
Note
The Java API is available in Databricks Runtime 6.3 and above.
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");
Convert a Parquet table to a Delta table
Convert a Parquet table to a Delta table in-place. This command lists all the files in the directory, creates a Delta Lake transaction log that tracks these files, and automatically infers the data schema by reading the footers of all Parquet files. If your data is partitioned, you must specify the schema of the partition columns as a DDL-formatted string (that is, <column-name1> <type>, <column-name2> <type>, ...
).
Note
If a Parquet table was created by Structured Streaming, the listing of files can be avoided by using the _spark_metadata
sub-directory as the source of truth for files contained in the table setting the SQL configuration spark.databricks.delta.convert.useMetadataLog
to true
.
-- Convert unpartitioned Parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`<path-to-table>`
-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`<path-to-table>` PARTITIONED BY (part int, part2 int)
For syntax details, see
Databricks Runtime 7.x and above: CONVERT TO DELTA
Databricks Runtime 5.5 LTS and 6.x: _
Note
The Python API is available in Databricks Runtime 6.1 and above.
from delta.tables import *
# Convert unpartitioned Parquet table at path '<path-to-table>'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")
# Convert partitioned parquet table at path '<path-to-table>' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int")
Note
The Scala API is available in Databricks Runtime 6.0 and above.
import io.delta.tables._
// Convert unpartitioned Parquet table at path '<path-to-table>'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")
// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int")
Note
The Scala API is available in Databricks Runtime 6.0 and above.
import io.delta.tables.*;
// Convert unpartitioned Parquet table at path '<path-to-table>'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`");
// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int");
Note
Any file not tracked by Delta Lake is invisible and can be deleted when you run vacuum
. You should avoid updating or appending data files during the conversion process. After the table is converted, make sure all writes go through Delta Lake.
Convert an Iceberg table to a Delta table
Note
This feature is in Public Preview.
This feature is supported in Databricks Runtime 10.4 and above.
You can convert an Iceberg table to a Delta table in place if the underlying file format of the Iceberg table is Parquet. The following command creates a Delta Lake transaction log based on the Iceberg table’s native file manifest, schema and partitioning information. The converter also collects column stats during the conversion, unless NO STATISTICS
is specified.
-- Convert the Iceberg table in the path <path-to-table>.
CONVERT TO DELTA iceberg.`<path-to-table>`
-- Convert the Iceberg table in the path <path-to-table> without collecting statistics.
CONVERT TO DELTA iceberg.`<path-to-table>` NO STATISTICS
Note
Converting Iceberg metastore tables is not supported.
Convert a Delta table to a Parquet table
You can easily convert a Delta table back to a Parquet table using the following steps:
If you have performed Delta Lake operations that can change the data files (for example,
delete
ormerge
), run vacuum with retention of 0 hours to delete all data files that do not belong to the latest version of the table.Delete the
_delta_log
directory in the table directory.
Restore a Delta table to an earlier state
Note
Available in Databricks Runtime 7.4 and above.
You can restore a Delta table to its earlier state by using the RESTORE
command. A Delta table internally maintains historic versions of the table that enable it to be restored to an earlier state.
A version corresponding to the earlier state or a timestamp of when the earlier state was created are supported as options by the RESTORE
command.
Important
You can restore an already restored table.
You can restore a cloned table.
Restoring a table to an older version where the data files were deleted manually or by
vacuum
will fail. Restoring to this version partially is still possible ifspark.sql.files.ignoreMissingFiles
is set totrue
.The timestamp format for restoring to an earlier state is
yyyy-MM-dd HH:mm:ss
. Providing only a date(yyyy-MM-dd
) string is also supported.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, <path-to-table>) # path-based tables, or
deltaTable = DeltaTable.forName(spark, <table-name>) # Hive metastore-based tables
deltaTable.restoreToVersion(0) # restore table to oldest version
deltaTable.restoreToTimestamp('2019-02-14') # restore to a specific timestamp
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, <path-to-table>)
val deltaTable = DeltaTable.forName(spark, <table-name>)
deltaTable.restoreToVersion(0) // restore table to oldest version
deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp
import io.delta.tables.*;
DeltaTable deltaTable = DeltaTable.forPath(spark, <path-to-table>);
DeltaTable deltaTable = DeltaTable.forName(spark, <table-name>);
deltaTable.restoreToVersion(0) // restore table to oldest version
deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp
For syntax details, see RESTORE (Delta Lake on Databricks).
Important
Restore is considered a data-changing operation. Delta Lake log entries added by the RESTORE
command contain dataChange set to true. If there is a downstream application, such as a Structured streaming job that processes the updates to a Delta Lake table, the data change log entries added by the restore operation are considered as new data updates, and processing them may result in duplicate data.
For example:
Table version |
Operation |
Delta log updates |
Records in data change log updates |
---|---|---|---|
0 |
INSERT |
AddFile(/path/to/file-1, dataChange = true) |
(name = Viktor, age = 29, (name = George, age = 55) |
1 |
INSERT |
AddFile(/path/to/file-2, dataChange = true) |
(name = George, age = 39) |
2 |
OPTIMIZE |
AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) |
(No records as Optimize compaction does not change the data in the table) |
3 |
RESTORE(version=1) |
RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) |
(name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
In the preceding example, the RESTORE
command results in updates that were already seen when reading the Delta table version 0 and 1. If a streaming query was reading this table, then these files will be considered as newly added data and will be processed again.
Restore metrics
Note
Available in Databricks Runtime 8.2 and above.
RESTORE
reports the following metrics as a single row DataFrame once the operation is complete:
table_size_after_restore
: The size of the table after restoring.num_of_files_after_restore
: The number of files in the table after restoring.num_removed_files
: Number of files removed (logically deleted) from the table.num_restored_files
: Number of files restored due to rolling back.removed_files_size
: Total size in bytes of the files that are removed from the table.restored_files_size
: Total size in bytes of the files that are restored.
Clone a Delta table
Note
Available in Databricks Runtime 7.2 and above.
You can create a copy of an existing Delta table at a specific version using the clone
command. Clones can be either deep or shallow.
In this section:
Clone types
A deep clone is a clone that copies the source table data to the clone target in addition to the metadata of the existing table. Additionally, stream metadata is also cloned such that a stream that writes to the Delta table can be stopped on a source table and continued on the target of a clone from where it left off.
A shallow clone is a clone that does not copy the data files to the clone target. The table metadata is equivalent to the source. These clones are cheaper to create.
Any changes made to either deep or shallow clones affect only the clones themselves and not the source table.
The metadata that is cloned includes: schema, partitioning information, invariants, nullability. For deep clones only, stream and COPY INTO metadata are also cloned. Metadata not cloned are the table description and user-defined commit metadata.
Important
Shallow clones reference data files in the source directory. If you run
vacuum
on the source table clients will no longer be able to read the referenced data files and aFileNotFoundException
will be thrown. In this case, running clone with replace over the shallow clone will repair the clone. If this occurs often, consider using a deep clone instead which does not depend on the source table.Deep clones do not depend on the source from which they were cloned, but are expensive to create because a deep clone copies the data as well as the metadata.
Cloning with
replace
to a target that already has a table at that path creates a Delta log if one does not exist at that path. You can clean up any existing data by runningvacuum
.If an existing Delta table exists, a new commit is created that includes the new metadata and new data from the source table. This new commit is incremental, meaning that only new changes since the last clone are committed to the table.
Cloning a table is not the same as
Create Table As Select
orCTAS
. A clone copies the metadata of the source table in addition to the data. Cloning also has simpler syntax: you don’t need to specify partitioning, format, invariants, nullability and so on as they are taken from the source table.A cloned table has an independent history from its source table. Time travel queries on a cloned table will not work with the same inputs as they work on its source table.
CREATE TABLE delta.`/data/target/` CLONE delta.`/data/source/` -- Create a deep clone of /data/source at /data/target
CREATE OR REPLACE TABLE db.target_table CLONE db.source_table -- Replace the target
CREATE TABLE IF NOT EXISTS delta.`/data/target/` CLONE db.source_table -- No-op if the target table exists
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source`
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` VERSION AS OF version
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` TIMESTAMP AS OF timestamp_expression -- timestamp can be like “2019-01-01” or like date_sub(current_date(), 1)
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable) # path-based tables, or
deltaTable = DeltaTable.forName(spark, tableName) # Hive metastore-based tables
deltaTable.clone(target, isShallow, replace) # clone the source at latest version
deltaTable.cloneAtVersion(version, target, isShallow, replace) # clone the source at a specific version
# clone the source at a specific timestamp such as timestamp=“2019-01-01”
deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace)
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
val deltaTable = DeltaTable.forName(spark, tableName)
deltaTable.clone(target, isShallow, replace) // clone the source at latest version
deltaTable.cloneAtVersion(version, target, isShallow, replace) // clone the source at a specific version
deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace) // clone the source at a specific timestamp
import io.delta.tables.*;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
DeltaTable deltaTable = DeltaTable.forName(spark, tableName);
deltaTable.clone(target, isShallow, replace) // clone the source at latest version
deltaTable.cloneAtVersion(version, target, isShallow, replace) // clone the source at a specific version
deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace) // clone the source at a specific timestamp
For syntax details, see CREATE TABLE CLONE.
Clone metrics
Note
Available in Databricks Runtime 8.2 and above.
CLONE
reports the following metrics as a single row DataFrame once the operation is complete:
source_table_size
: Size of the source table that’s being cloned in bytes.source_num_of_files
: The number of files in the source table.num_removed_files
: If the table is being replaced, how many files are removed from the current table.num_copied_files
: Number of files that were copied from the source (0 for shallow clones).removed_files_size
: Size in bytes of the files that are being removed from the current table.copied_files_size
: Size in bytes of the files copied to the table.

Permissions
You must configure permissions for Databricks table access control and your cloud provider.
Table access control
The following permissions are required for both deep and shallow clones:
SELECT
permission on the source table.If you are using
CLONE
to create a new table,CREATE
permission on the database in which you are creating the table.If you are using
CLONE
to replace a table, you must haveMODIFY
permission on the table.
Cloud provider permissions
If you have created a deep clone, any user that reads the deep clone must have read access to the clone’s directory. To make changes to the clone, users must have write access to the clone’s directory.
If you have created a shallow clone, any user that reads the shallow clone needs permission to read the files in the original table, since the data files remain in the source table with shallow clones, as well as the clone’s directory. To make changes to the clone, users will need write access to the clone’s directory.
Clone use cases
In this section:
Data archiving
Data may need to be kept for longer than is feasible with time travel or for disaster recovery. In these cases, you can create a deep clone to preserve the state of a table at a certain point in time for archival. Incremental archiving is also possible to keep a continually updating state of a source table for disaster recovery.
-- Every month run
CREATE OR REPLACE TABLE delta.`/some/archive/path` CLONE my_prod_table
Machine learning flow reproduction
When doing machine learning, you may want to archive a certain version of a table on which you trained an ML model. Future models can be tested using this archived data set.
-- Trained model on version 15 of Delta table
CREATE TABLE delta.`/model/dataset` CLONE entire_dataset VERSION AS OF 15
Short-term experiments on a production table
To test a workflow on a production table without corrupting the table, you can easily create a shallow clone. This allows you to run arbitrary workflows on the cloned table that contains all the production data but does not affect any production workloads.
-- Perform shallow clone
CREATE OR REPLACE TABLE my_test SHALLOW CLONE my_prod_table;
UPDATE my_test WHERE user_id is null SET invalid=true;
-- Run a bunch of validations. Once happy:
-- This should leverage the update information in the clone to prune to only
-- changed files in the clone if possible
MERGE INTO my_prod_table
USING my_test
ON my_test.user_id <=> my_prod_table.user_id
WHEN MATCHED AND my_test.user_id is null THEN UPDATE *;
DROP TABLE my_test;
Data sharing
Other business units within a single organization may want to access the same data but may not require the latest updates. Instead of giving access to the source table directly, you can provide clones with different permissions for different business units. The performance of the clone can exceed that of a simple view.
-- Perform deep clone
CREATE OR REPLACE TABLE shared_table CLONE my_prod_table;
-- Grant other users access to the shared table
GRANT SELECT ON shared_table TO `<user-name>@<user-domain>.com`;
Table property overrides
Note
Available in Databricks Runtime 7.5 and above.
Table property overrides are particularly useful for:
Annotating tables with owner or user information when sharing data with different business units.
Archiving Delta tables and time travel is required. You can specify the log retention period independently for the archive table. For example:
CREATE OR REPLACE TABLE archive.my_table CLONE prod.my_table TBLPROPERTIES ( delta.logRetentionDuration = '3650 days', delta.deletedFileRetentionDuration = '3650 days' ) LOCATION 'xx://archive/my_table'
dt = DeltaTable.forName(spark, "prod.my_table") tblProps = { "delta.logRetentionDuration": "3650 days", "delta.deletedFileRetentionDuration": "3650 days" } dt.clone('xx://archive/my_table', isShallow=False, replace=True, tblProps)
val dt = DeltaTable.forName(spark, "prod.my_table") val tblProps = Map( "delta.logRetentionDuration" -> "3650 days", "delta.deletedFileRetentionDuration" -> "3650 days" ) dt.clone("xx://archive/my_table", isShallow = false, replace = true, properties = tblProps)
Find the last commit’s version in the Spark session
Note
Available in Databricks Runtime 7.1 and above.
To get the version number of the last commit written by the current SparkSession
across all threads
and all tables, query the SQL configuration spark.databricks.delta.lastCommitVersionInSession
.
SET spark.databricks.delta.lastCommitVersionInSession
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
If no commits have been made by the SparkSession
, querying the key returns an empty value.
Note
If you share the same SparkSession
across multiple threads, it’s similar to sharing a variable
across multiple threads; you may hit race conditions as the configuration value is updated
concurrently.