Common data loading patterns using COPY INTO

Learn common patterns for using COPY INTO to load data from file sources into Delta Lake.

There are many options for using COPY INTO.

See COPY INTO for a full reference of all options.

Create target tables for COPY INTO

COPY INTO must target an existing Delta table. In Databricks Runtime 11.3 LTS and above, setting the schema for these tables is optional for formats that support schema evolution:

CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

Note that to infer the schema with COPY INTO, you must pass additional options:

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

The following example creates a schemaless Delta table called my_pipe_data and loads a pipe-delimited CSV with a header:

CREATE TABLE IF NOT EXISTS my_pipe_data;

COPY INTO my_pipe_data
  FROM 'gs://my-bucket/pipeData'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'delimiter' = '|',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

Load JSON data with COPY INTO

The following example loads JSON data from five files in Google Cloud Storage (GCS) into the Delta table called my_json_data. This table must be created before COPY INTO can be executed. If any data was already loaded from one of the files, the data isn’t reloaded for that file.

COPY INTO my_json_data
  FROM 'gs://my-bucket/jsonData'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

-- The second execution will not copy any data since the first command already loaded the data
COPY INTO my_json_data
  FROM 'gs://my-bucket/jsonData'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

Load Avro data with COPY INTO

The following example loads Avro data in GCS using additional SQL expressions as part of the SELECT statement.

COPY INTO my_delta_table
  FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
          FROM 'gs://my-bucket/avroData')
  FILEFORMAT = AVRO

Load CSV files with COPY INTO

The following example loads CSV files from GCS under gs://bucket/base/path/folder1 into a Delta table at gs:/bucket/deltaTables/target.

COPY INTO delta.`gs:/bucket/deltaTables/target`
  FROM (SELECT key, index, textData, 'constant_value'
          FROM 'gs://bucket/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'
  FORMAT_OPTIONS('header' = 'true')

-- The example below loads CSV files without headers in GCS using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO delta.`gs:/bucket/deltaTables/target`
  FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
        FROM 'gs://bucket/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'

Ignore corrupt files while loading data

If the data you’re loading can’t be read due to some corruption issue, those files can be skipped by setting ignoreCorruptFiles to true in the FORMAT_OPTIONS.

The result of the COPY INTO command returns how many files were skipped due to corruption in the num_skipped_corrupt_files column. This metric also shows up in the operationMetrics column under numSkippedCorruptFiles after running DESCRIBE HISTORY on the Delta table.

Corrupt files aren’t tracked by COPY INTO, so they can be reloaded in a subsequent run if the corruption is fixed. You can see which files are corrupt by running COPY INTO in VALIDATE mode.

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')

Note

ignoreCorruptFiles is available in Databricks Runtime 11.3 LTS and above.