Delta Live Tables SQL language reference
This article provides details for the Delta Live Tables SQL programming interface.
For information on the Python API, see the Delta Live Tables Python language reference.
For more information about SQL commands, see SQL language reference.
You can use Python user-defined functions (UDFs) in your SQL queries, but you must define these UDFs in Python files before calling them in SQL source files. See User-defined scalar functions - Python.
Limitations
The PIVOT
clause is not supported. The pivot
operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.
Create a Delta Live Tables materialized view or streaming table
You use the same basic SQL syntax when declaring either a streaming table or a materialized view (also referred to as a LIVE TABLE
).
You can only declare streaming tables using queries that read against a streaming source. Databricks recommends using Auto Loader for streaming ingestion of files from cloud object storage. See Auto Loader SQL syntax.
You must include the STREAM()
function around a dataset name when specifying other tables or views in your pipeline as a streaming source.
The following describes the syntax for declaring materialized views and streaming tables with SQL:
CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
AS select_statement
Create a Delta Live Tables view
The following describes the syntax for declaring views with SQL:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Auto Loader SQL syntax
The following describes the syntax for working with Auto Loader in SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
You can use supported format options with Auto Loader. Using the map()
function, you can pass any number of options to the cloud_files()
method. Options are key-value pairs, where the keys and values are strings. For details on support formats and options, see File format options.
Example: Define tables
You can create a dataset by reading from an external data source or from datasets defined in a pipeline. To read from an internal dataset, prepend the LIVE
keyword to the dataset name. The following example defines two different datasets: a table called taxi_raw
that takes a JSON file as the input source and a table called filtered_data
that takes the taxi_raw
table as input:
CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Example: Read from a streaming source
To read data from a streaming source, for example, Auto Loader or an internal data set, define a STREAMING
table:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
For more information on streaming data, see Transform data with Delta Live Tables.
Control how tables are materialized
Tables also offer additional control of their materialization:
Specify how tables are partitioned using
PARTITIONED BY
. You can use partitioning to speed up queries.You can set table properties using
TBLPROPERTIES
. See Delta Live Tables table properties.Set a storage location using the
LOCATION
setting. By default, table data is stored in the pipeline storage location ifLOCATION
isn’t set.You can use generated columns in your schema definition. See Example: Specify a schema and partition columns.
Note
For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, you should generally not specify partition columns.
Example: Specify a schema and partition columns
You can optionally specify a schema when you define a table. The following example specifies the schema for the target table, including using Delta Lake generated columns and defining partition columns for the table:
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
By default, Delta Live Tables infers the schema from the table
definition if you don’t specify a schema.
Set configuration values for a table or view
Use SET
to specify a configuration value for a table or view, including Spark configurations. Any table or view you define in a notebook after the SET
statement has access to the defined value. Any Spark configurations specified using the SET
statement are used when executing the Spark query for any table or view following the SET statement. To read a configuration value in a query, use the string interpolation syntax ${}
. The following example sets a Spark configuration value named startDate
and uses that value in a query:
SET startDate='2020-01-01';
CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}
To specify multiple configuration values, use a separate SET
statement for each value.
SQL properties
CREATE TABLE or VIEW |
---|
Create a temporary table. No metadata is persisted for this table. |
Create a table that reads an input dataset as a stream.
The input dataset must be a streaming data source, for
example, Auto Loader or a |
An optional list of one or more columns to use for partitioning the table. |
An optional storage location for table data. If not set, the system will default to the pipeline storage location. |
An optional description for the table. |
An optional list of table properties for the table. |
A Delta Live Tables query that defines the dataset for the table. |
CONSTRAINT clause |
---|
Define data quality constraint |
Optional action to take for failed rows:
|
Change data capture with SQL in Delta Live Tables
Preview
Delta Live Tables support for SCD type 2 is in Public Preview.
Use the APPLY CHANGES INTO
statement to use Delta Live Tables CDC functionality, as described in the following:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Note
The default behavior for INSERT
and UPDATE
events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE
events can be specified with the APPLY AS DELETE WHEN
condition.
Important
You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the APPLY CHANGES
target table, you must also include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field.
See Change data capture with Delta Live Tables.
Clauses |
---|
The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. This clause is required. |
A condition applied to both source and target to trigger optimizations such as partition pruning. This condition cannot be used to drop source rows; all CDC rows in the source must satisfy this condition or an error is thrown. Using the WHERE clause is optional and should be used when your processing requires specific optimizations. This clause is optional. |
Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row
and IGNORE NULL UPDATES is specified, columns with a This clause is optional. The default is to overwrite existing columns with |
Specifies when a CDC event should be treated as a This clause is optional. |
Specifies when a CDC event should be treated as a full table The This clause is optional. |
The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. This clause is required. |
Specifies a subset of columns to include in the target table. You can either:
This clause is optional. The default is to include all columns in the target table when the |
Whether to store records as SCD type 1 or SCD type 2. This clause is optional. The default is SCD type 1. |
When
This clause is optional. The default is track history for all the output columns when there are any changes,
equivalent to To use this clause, you must set |