Load data with COPY INTO

The COPY INTO SQL command lets you load data from a file location into a Delta table. This is a re-triable and idempotent operation; files in the source location that have already been loaded are skipped.

COPY INTO supports secure access in a several ways, including the ability to use temporary credentials.

Empty Delta Lake tables

Note

This feature is available in Databricks Runtime 11.0 and above.

You can create empty placeholder Delta tables so that the schema is later inferred during a COPY INTO command:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table_description>]
[TBLPROPERTIES (<table_properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

The SQL statement above is idempotent and can be scheduled to run to ingest data exactly-once into a Delta table.

Note

The empty Delta table is not usable outside of COPY INTO. INSERT INTO and MERGE INTO are not supported to write data into schemaless Delta tables. After data is inserted into the table with COPY INTO, the table becomes queryable.

See Create target tables for COPY INTO

Example

For common use patterns, see Common data loading patterns with COPY INTO

The following example shows how to create a Delta table and then use the COPY INTO SQL command to load sample data from Databricks datasets into the table. You can run the example Python, R, Scala, or SQL code from a notebook attached to a Databricks cluster. You can also run the SQL code from a query associated with a SQL warehouse in Databricks SQL.

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/
DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

To clean up, run the following code, which deletes the table:

spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload

Reference