Delta Live Tables SQL language reference

Preview

This feature is in Public Preview.

This article provides details for the Delta Live Tables SQL programming interface.

You can use Python user-defined functions (UDFs) in your SQL queries, but you must define these UDFs in Python files before calling them in SQL source files. See User-defined scalar functions - Python.

Limitations

The PIVOT clause is not supported. The pivot operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.

Create a Delta Live Tables materialized view or streaming table

You use the same basic SQL syntax when declaring either a streaming table or a materialized view (also referred to as a LIVE TABLE).

You can only declare streaming tables using queries that read against a streaming source. Databricks recommends using Auto Loader for streaming ingestion of files from cloud object storage. See Auto Loader SQL syntax.

You must include the STREAM() function around a dataset name when specifying other tables or views in your pipeline as a streaming source.

The following describes the syntax for declaring materialized views and streaming tables with SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Create a Delta Live Tables view

The following describes the syntax for declaring views with SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Auto Loader SQL syntax

The following describes the syntax for working with Auto Loader in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file_path>",
    "<file_format>",
    map(
      "<option_key>", "<option_value",
      "<option_key>", "<option_value",
      ...
    )
  )

You can use supported format options with Auto Loader. Using the map() function, you can pass any number of options to the cloud_files() method. Options are key-value pairs, where the keys and values are strings. For details on support formats and options, see File format options.

SQL properties

CREATE TABLE or VIEW

TEMPORARY

Create a temporary table. No metadata is persisted for this table.

STREAMING

Create a table that reads an input dataset as a stream. The input dataset must be a streaming data source, for example, Auto Loader or a STREAMING table.

PARTITIONED BY

An optional list of one or more columns to use for partitioning the table.

LOCATION

An optional storage location for table data. If not set, the system will default to the pipeline storage location.

COMMENT

An optional description for the table.

TBLPROPERTIES

An optional list of table properties for the table.

select_statement

A Delta Live Tables query that defines the dataset for the table.

CONSTRAINT clause

EXPECT expectation_name

Define data quality constraint expectation_name. If ON VIOLATION constraint is not defined, add rows that violate the constraint to the target dataset.

ON VIOLATION

Optional action to take for failed rows:

  • FAIL UPDATE: Immediately stop pipeline execution.

  • DROP ROW: Drop the record and continue processing.

Change data capture with SQL in Delta Live Tables

Use the APPLY CHANGES INTO statement to use Delta Live Tables CDC functionality, as described in the following:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Note

The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the APPLY AS DELETE WHEN condition.

Important

You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the APPLY CHANGES target table, you must also include the __START_AT and __END_AT columns with the same data type as the sequence_by field.

See Change data capture with Delta Live Tables.

Clauses

KEYS

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

This clause is required.

WHERE

A condition applied to both source and target to trigger optimizations such as partition pruning. This condition cannot be used to drop source rows; all CDC rows in the source must satisfy this condition or an error is thrown. Using the WHERE clause is optional and should be used when your processing requires specific optimizations.

This clause is optional.

IGNORE NULL UPDATES

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and IGNORE NULL UPDATES is specified, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null.

This clause is optional.

The default is to overwrite existing columns with null values.

APPLY AS DELETE WHEN

Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the pipelines.cdc.tombstoneGCThresholdInSeconds table property.

This clause is optional.

APPLY AS TRUNCATE WHEN

Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

The APPLY AS TRUNCATE WHEN clause is supported only for SCD type 1. SCD type 2 does not support truncate.

This clause is optional.

SEQUENCE BY

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

This clause is required.

COLUMNS

Specifies a subset of columns to include in the target table. You can either:

  • Specify the complete list of columns to include: COLUMNS (userId, name, city).

  • Specify a list of columns to exclude: COLUMNS * EXCEPT (operation, sequenceNum)

This clause is optional.

The default is to include all columns in the target table when the COLUMNS clause is not specified.

STORED AS

Whether to store records as SCD type 1 or SCD type 2.

This clause is optional.

The default is SCD type 1.

TRACK HISTORY ON

When pipelines.enableTrackHistory is set, specifies a subset of output columns to generate history records when there are any changes to those specified columns. You can specify either:

  • Specify the complete list of columns to track: COLUMNS (userId, name, city).

  • Specify a list of columns to be excluded from tracking: COLUMNS * EXCEPT (operation, sequenceNum)

This clause is optional. The default is track history for all the output columns when there are any changes, equivalent to TRACK HISTORY ON *.

To use this clause, you must set pipelines.enableTrackHistory in the pipeline settings. Otherwise, an exception is thrown. When pipelines.enableTrackHistory is not set, a history record is generated for every input row.