Delta Live Tables Python language reference

Preview

This feature is in Public Preview.

This article provides details and examples for the Delta Live Tables Python programming interface. For the complete API specification, see the Python API specification.

For information on the SQL API, see the Delta Live Tables SQL language reference.

Limitations

The Delta Live Tables Python interface has the following limitations:

  • The Python table and view functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed after the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such as collect(), count(), toPandas(), save(), and saveAsTable(). However, you can include these functions outside of table or view function definitions because this code is run once during the graph initialization phase.

  • The pivot() function is not supported. The pivot operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.

Python datasets

The Python API is defined in the dlt module. You must import the dlt module in your Delta Live Tables pipelines implemented with the Python API. Apply the @dlt.view or @dlt.table decorator to a function to define a view or table in Python. You can use the function name or the name parameter to assign the table or view name. The following example defines two different datasets: a view called taxi_raw that takes a JSON file as the input source and a table called filtered_data that takes the taxi_raw view as input:

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

View and table functions must return a Spark DataFrame or a Koalas DataFrame. A Koalas DataFrame returned by a function is converted to a Spark Dataset by the Delta Live Tables runtime.

In addition to reading from external data sources, you can access datasets defined in the same pipeline with the Delta Live Tables read() function. The following example demonstrate creating a customers_filtered dataset using the read() function:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

You can also use the spark.table() function to access a dataset defined in the same pipeline or a table registered in the metastore. When using the spark.table() function to access a dataset defined in the pipeline, in the function argument prepend the LIVE keyword to the dataset name:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

To read data from a table registered in the metastore, in the function argument omit the LIVE keyword and optionally qualify the table name with the database name:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Delta Live Tables ensures that the pipeline automatically captures the dependency between datasets. This dependency information is used to determine the execution order when performing an update and recording lineage information in the event log for a pipeline.

You can also return a dataset using a spark.sql expression in a query function. To read from an internal dataset, prepend LIVE. to the dataset name:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Both views and tables have the following optional properties:

  • comment: A human-readable description of this dataset.

  • spark_conf: A Python dictionary containing Spark configurations for the execution of this query only.

  • Data quality constraints enforced with expectations.

Tables also offer additional control of their materialization:

  • Specify how tables are partitioned using partition_cols. You can use partitioning to speed up queries.

  • You can set table properties when you define a view or table. See Table properties for more details.

  • Set a storage location for table data using the path setting. By default, table data is stored in the pipeline storage location if path isn’t set.

  • You can optionally specify a table schema using a Python StructType or a SQL DDL string. When specified with a DDL string, the definition can include generated columns. The following examples create a table called sales with an explicitly specified schema:

    sales_schema = StructType([
      StructField("customer_id", StringType(), True),
      StructField("customer_name", StringType(), True),
      StructField("number_of_line_items", StringType(), True),
      StructField("order_datetime", StringType(), True),
      StructField("order_number", LongType(), True)]
    )
    
    @dlt.table(
      comment="Raw data on sales",
      schema=sales_schema)
    def sales():
      return ("...")
    
    @dlt.table(
      comment="Raw data on sales",
      schema="""
        customer_id STRING,
        customer_name STRING,
        number_of_line_items STRING,
        order_datetime STRING,
        order_number LONG,
        order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
        """,
      partition_cols = ["order_day_of_week"])
    def sales():
      return ("...")
    

    By default, Delta Live Tables infers the schema from the table definition if you don’t specify a schema.

Python libraries

To specify external Python libraries, use the %pip install magic command. When an update starts, Delta Live Tables runs all cells containing a %pip install command before running any table definitions. Every Python notebook included in the pipeline has access to all installed libraries. The following example installs the numpy library and makes it globally available to any Python notebook in the pipeline:

%pip install numpy

import numpy as np

To install a Python wheel package, add the wheel path to the %pip install command. Installed Python wheel packages are available to all tables in the pipeline. The following example installs a wheel named dltfns-1.0-py3-none-any.whl from the DBFS directory /dbfs/dlt/:

%pip install /dbfs/dlt/dltfns-1.0-py3-none-any.whl

See Install a wheel package with %pip.

Python API specification

Python module

Delta Live Tables Python functions are defined in the dlt module. Your pipelines implemented with the Python API must import this module:

import dlt

Create table

To define a table in Python, apply the @table decorator. The @table decorator is an alias for the @create_table decorator.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Create view

To define a view in Python, apply the @view decorator. The @view decorator is an alias for the @create_view decorator.

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Python properties

@table or @view

name

Type: str

An optional name for the table or view. If not defined, the function name is used as the table or view name.

comment

Type: str

An optional description for the table.

spark_conf

Type: dict

An optional list of Spark configurations for the execution of this query.

table_properties

Type: dict

An optional list of table properties for the table.

path

Type: str

An optional storage location for table data. If not set, the system will default to the pipeline storage location.

partition_cols

Type: array

An optional list of one or more columns to use for partitioning the table.

schema

Type: str or StructType

An optional schema definition for the table. Schemas can be defined as a SQL DDL string, or with a Python StructType.

temporary

Type: bool

Create a temporary table. No metadata is persisted for this table.

The default is ‘False’.

Table or view definition

def <function-name>()

A Python function that defines the dataset. If the name parameter is not set, then <function-name> is used as the target dataset name.

query

A Spark SQL statement that returns a Spark Dataset or Koalas DataFrame.

Use dlt.read() or spark.table() to perform a complete read from a dataset defined in the same pipeline. When using the spark.table() function to read from a dataset defined in the same pipeline, prepend the LIVE keyword to the dataset name in the function argument. For example, to read from a dataset named customers:

spark.table("LIVE.customers")

You can also use the spark.table() function to read from a table registered in the metastore by omitting the LIVE keyword and optionally qualifying the table name with the database name:

spark.table("sales.customers")

Use dlt.read_stream() to perform a streaming read from a dataset defined in the same pipeline.

Use the spark.sql function to define a SQL query to create the return dataset.

Use PySpark syntax to define Delta Live Tables queries with Python.

Expectations

@expect(“description”, “constraint”)

Declare a data quality constraint identified by description. If a row violates the expectation, include the row in the target dataset.

@expect_or_drop(“description”, “constraint”)

Declare a data quality constraint identified by description. If a row violates the expectation, drop the row from the target dataset.

@expect_or_fail(“description”, “constraint”)

Declare a data quality constraint identified by description. If a row violates the expectation, immediately stop execution.

@expect_all(expectations)

Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, include the row in the target dataset.

@expect_all_or_drop(expectations)

Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, drop the row from the target dataset.

@expect_all_or_fail(expectations)

Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, immediately stop execution.

Table properties

In addition to the table properties supported by Delta Lake, you can set the following table properties.

Table properties

pipelines.autoOptimize.managed

Default: true

Enables or disables automatic scheduled optimization of this table.

pipelines.autoOptimize.zOrderCols

Default: None

An optional comma-separated list of column names to z-order this table by.

pipelines.reset.allowed

Default: true

Controls whether a full-refresh is allowed for this table.