Manage data quality with Delta Live Tables

You use expectations to define data quality constraints on the contents of a dataset. Expectations allow you to guarantee data arriving in tables meets data quality requirements and provide insights into data quality for each pipeline update. You apply expectations to queries using Python decorators or SQL constraint clauses.

What are Delta Live Tables expectations?

Expectations are optional clauses you add to Delta Live Tables dataset declarations that apply data quality checks on each record passing through a query.

An expectation consists of three things:

  • A description, which acts as a unique identifier and allows you to track metrics for the constraint.

  • A boolean statement that always returns true or false based on some stated condition.

  • An action to take when a record fails the expectation, meaning the boolean returns false.

The following matrix shows the three actions you can apply to invalid records:

Action

Result

warn (default)

Invalid records are written to the target; failure is reported as a metric for the dataset.

drop

Invalid records are dropped before data is written to the target; failure is reported as a metrics for the dataset.

fail

Invalid records prevent the update from succeeding. Manual intervention is required before re-processing.

You can view data quality metrics such as the number of records that violate an expectation by querying the Delta Live Tables event log. See Monitor Delta Live Tables pipelines.

For a complete reference of Delta Live Tables dataset declaration syntax, see Delta Live Tables Python language reference or Delta Live Tables SQL language reference.

Note

While you can include multiple clauses in any expectation, only Python supports defining actions based on multiple expectations. See Multiple expectations.

Retain invalid records

Use the expect operator when you want to keep records that violate the expectation. Records that violate the expectation are added to the target dataset along with valid records:

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Drop invalid records

Use the expect or drop operator to prevent further processing of invalid records. Records that violate the expectation are dropped from the target dataset:

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Fail on invalid records

When invalid records are unacceptable, use the expect or fail operator to stop execution immediately when a record fails validation. If the operation is a table update, the system atomically rolls back the transaction:

@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

When a pipeline fails because of an expectation violation, you must fix the pipeline code to handle the invalid data correctly before re-running the pipeline.

Fail expectations modify the Spark query plan of your transformations to track information required to detect and report on violations. For many queries, you can use this information to identify which input record resulted in the violation. The following is an example exception:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Multiple expectations

You can define expectations with one or more data quality constraints in Python pipelines. These decorators accept a Python dictionary as an argument, where the key is the expectation name and the value is the expectation constraint.

Use expect_all to specify multiple data quality constraints when records that fail validation should be included in the target dataset:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_drop to specify multiple data quality constraints when records that fail validation should be dropped from the target dataset:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_fail to specify multiple data quality constraints when records that fail validation should halt pipeline execution:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

You can also define a collection of expectations as a variable and pass it to one or more queries in your pipeline:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Quarantine invalid data

The following example uses expectations in combination with temporary tables and views. This pattern provides you with metrics for records that pass expectation checks during pipeline updates, and provides a way to process valid and invalid records through different downstream paths.

Note

This example reads sample data included in the Databricks datasets. Because the Databricks datasets are not supported with a pipeline that publishes to Unity Catalog, this example works only with a pipeline configured to publish to the Hive metastore. However, this pattern also works with Unity Catalog enabled pipelines, but you must read data from external locations. To learn more about using Unity Catalog with Delta Live Tables, see Use Unity Catalog with your Delta Live Tables pipelines.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Validate row counts across tables

You can add an additional table to your pipeline that defines an expectation to compare row counts between two live tables. The results of this expectation appear in the event log and the Delta Live Tables UI. This following example validates equal row counts between the tbla and tblb tables:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Perform advanced validation with Delta Live Tables expectations

You can define live tables using aggregate and join queries and use the results of those queries as part of your expectation checking. This is useful if you wish to perform complex data quality checks, for example, ensuring a derived table contains all records from the source table or guaranteeing the equality of a numeric column across tables. You can use the TEMPORARY keyword to prevent these tables from being published to the target schema.

The following example validates that all expected records are present in the report table:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

The following example uses an aggregate to ensure the uniqueness of a primary key:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Make expectations portable and reusable

You can maintain data quality rules separately from your pipeline implementations.

Databricks recommends storing the rules in a Delta table with each rule categorized by a tag. You use this tag in dataset definitions to determine which rules to apply.

The following example creates a table named rules to maintain rules:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

The following Python example defines data quality expectations based on the rules stored in the rules table. The get_rules() function reads the rules from the rules table and returns a Python dictionary containing rules matching the tag argument passed to the function. The dictionary is applied in the @dlt.expect_all_*() decorators to enforce data quality constraints. For example, any records failing the rules tagged with validity will be dropped from the raw_farmers_market table:

Note

This example reads sample data included in the Databricks datasets. Because the Databricks datasets are not supported with a pipeline that publishes to Unity Catalog, this example works only with a pipeline configured to publish to the Hive metastore. However, this pattern also works with Unity Catalog enabled pipelines, but you must read data from external locations. To learn more about using Unity Catalog with Delta Live Tables, see Use Unity Catalog with your Delta Live Tables pipelines.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Instead of creating a table named rules to maintain rules, you could create a Python module to main rules, for example, in a file named rules_module.py in the same folder as the notebook:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Then modify the preceding notebook by importing the module and changing the get_rules() function to read from the module instead of from the rules table:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )