Tutorial: Declare a data pipeline with Python in Delta Live Tables

This tutorial shows you how to use Python syntax to declare a data pipeline in Delta Live Tables. Users familiar with PySpark or Pandas for Spark can use DataFrames with Delta Live Tables. For users unfamiliar with Spark DataFrames, Databricks recommends using SQL for Delta Live Tables. See Tutorial: Declare a data pipeline with SQL in Delta Live Tables.

Python syntax for Delta Live Tables extends standard PySpark with a set of decorator functions imported through the dlt module.

To use the code in this example, you must have a volume in Unity Catalog. This article assumes a volume named my-volume in a schema named default within a catalog named main. Also, you must have the following permissions in Unity Catalog:

  • READ VOLUME and WRITE VOLUME, or ALL PRIVILEGES, for the my-volume volume.

  • USE SCHEMA or ALL PRIVILEGES for the default schema.

  • USE CATALOG or ALL PRIVILEGES for the main catalog.

To set these permissions, see your Databricks administrator or Unity Catalog privileges and securable objects.

Note

  • You cannot mix languages within a Delta Live Tables source code file. You can use multiple notebooks or files with different languages in a pipeline.

  • To use the code in this example, select Unity Catalog as the storage option when you create the pipeline.

Where do you run Delta Live Tables Python queries?

You can use notebooks or Python files to write Delta Live Tables Python queries, but Delta Live Tables is not designed to be run interactively in notebook cells.

Delta Live Tables differs from many Python scripts in a key way: you do not call the functions that perform data ingestion and transformation to create Delta Live Tables datasets. Instead, Delta Live Tables interprets the decorator functions from the dlt module in all files loaded into a pipeline and builds a dataflow graph.

Important

You cannot rely on the cell-by-cell execution ordering of notebooks when writing Python for Delta Live Tables. Delta Live Tables evaluates and runs all code defined in notebooks, but has an entirely different execution model than a notebook Run all command.

Executing a cell that contains Delta Live Tables syntax in a Databricks notebook results in an error message. To learn about configuring pipelines with Delta Live Tables, see Tutorial: Run your first Delta Live Tables pipeline.

Declare a Delta Live Tables data pipeline with Python

This tutorial demonstrates using Python syntax to declare a Delta Live Tables pipeline to:

  • Read raw CSV data from a publicly available dataset into a table.

  • Read the records from the raw data table and use Delta Live Tables expectations to create a new table that contains cleansed data.

  • Use the records from the cleansed data table to make Delta Live Tables queries that create derived datasets.

This code demonstrates a simplified example of the medallion architecture. See What is the medallion lakehouse architecture?.

Copy the Python code and paste it into a new Python notebook. You should add each example code snippet to its own cell in the notebook, in the order described. To review options for creating notebooks, see Create a notebook.

Note

When you create a pipeline with the Python interface, by default, table names are defined by function names. For example, the following Python example creates three tables named baby_names_raw, baby_names_prepared, and top_baby_names_2021. You can override the table name using the name parameter. See Create a Delta Live Tables materialized view or streaming table.

Import the Delta Live Tables module

All Delta Live Tables Python APIs are implemented in the dlt module. Explicitly import the dlt module at the top of Python notebooks and files.

The following example shows this import, alongside import statements for pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Download the data

To get the data for this example, you download a CSV file and store it in the volume, as follows:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/main/default/my-volume/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Create a table from files in object storage

Delta Live Tables supports loading data from all formats supported by Databricks. See Data format options.

The @dlt.table decorator tells Delta Live Tables to create a table that contains the result of a DataFrame returned by a function. Add the @dlt.table decorator before any Python function definition that returns a Spark DataFrame to register a new table in Delta Live Tables. The following example demonstrates using the function name as the table name and adding a descriptive comment to the table:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Departement of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Add a table from an upstream dataset in the pipeline

You can use dlt.read() to read data from other datasets declared in your current Delta Live Tables pipeline. Declaring new tables in this way creates a dependency that Delta Live Tables automatically resolves before executing updates. The following code also includes examples of monitoring and enforcing data quality with expectations. See Manage data quality with Delta Live Tables.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Create a table with enriched data views

Because Delta Live Tables processes updates to pipelines as a series of dependency graphs, you can declare highly enriched views that power dashboards, BI, and analytics by declaring tables with specific business logic.

Delta Live Tables tables are equivalent conceptually to materialized views. Whereas traditional views on Spark execute logic each time the view is queried, Delta Live Tables tables store the most recent version of query results in data files. Because Delta Live Tables manages updates for all datasets in a pipeline, you can schedule pipeline updates to match latency requirements for materialized views and know that queries against these tables contain the most recent version of data available.

The table defined by the following code demonstrates the conceptual similarity to a materialized view derived from upstream data in your pipeline:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Next steps

To learn more, see Delta Live Tables Python language reference.