Manage data quality with pipeline expectations

Use expectations to apply quality constraints that validate data as it flows through ETL pipelines. Expectations provide greater insight into data quality metrics and allow you to fail updates or drop records when detecting invalid records.

This article provides an overview of expectations, including syntax examples and behavior options. For more advanced use cases and recommended best practices, see Expectation recommendations and advanced patterns.

Delta Live Tables expectations flow graph

What are expectations?

Expectations are optional clauses in pipeline materialized view, streaming table, or view creation statements that apply data quality checks on each record passing through a query. Expectations use standard SQL Boolean statements to specify constraints. You can combine multiple expectations for a single dataset and set expectations across all dataset declarations in a pipeline.

The following sections introduce the three components of an expectation and provide syntax examples.

Expectation name

Each expectation must have a name, which is used as an identifier to track and monitor the expectation. Choose a name that communicates the metrics being validated. The following example defines the expectation valid_customer_age to confirm that age is between 0 and 120 years:

Importante

An expectation name must be unique for a given dataset. You can reuse expectations across multiple datasets in a pipeline. See Portable and reusable expectations.

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")
CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Constraint to evaluate

The constraint clause is a SQL conditional statement that must evaluate to true or false for each record. The constraint contains the actual logic for what is being validated. When a record fails this condition, the expectation is triggered.

Constraints must use valid SQL syntax and cannot contain the following:

  • Custom Python functions

  • External service calls

  • Subqueries referencing other tables

The following are examples of constraints that could be added to dataset creation statements:

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Action on invalid record

You must specify an action to determine what happens when a record fails the validation check. The following table describes the available actions:

Ação

SQL syntax

Python syntax

Resultado

avisar (default)

EXPECT

dlt.expect

Invalid records are written to the target. The count of valid and invalid records is logged alongside other dataset metrics.

derrubar

EXPECT ... ON VIOLATION DROP ROW

dlt.expect_or_drop

Invalid records are dropped before data is written to the target. The count of dropped records is logged alongside other dataset metrics.

falhar

EXPECT ... ON VIOLATION FAIL UPDATE

dlt.expect_or_fail

Invalid records prevent the update from succeeding. Manual intervention is required before reprocessing. This expectation causes a failure of a single flow and does not cause other flows in your pipeline to fail.

You can also implement advanced logic to quarantine invalid records without failing or dropping data. See Quarantine invalid records.

Expectation tracking metrics

You can see tracking metrics for warn or drop actions from the pipeline UI. Because fail causes the update to fail when an invalid record is detected, metrics are not recorded.

To view expectation metrics, complete the following steps:

  1. Click Delta Live Tables in the sidebar.

  2. Click the Name of your pipeline.

  3. Click a dataset with an expectation defined.

  4. Select the Data quality tab in the right sidebar.

You can view data quality metrics by querying the Delta Live Tables event log. See Query data quality from the event log.

Reter registros inválidos

Retaining invalid records is the default behavior for expectations. Use the expect operator when you want to keep records that violate the expectation but collect metrics on how many records pass or fail a constraint. Records that violate the expectation are added to the target dataset along with valid records:

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Solte registros inválidos

Use o operador expect_or_drop para evitar o processamento adicional de registros inválidos. Os registros que violam as expectativas são descartados do dataset de destino:

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Falha em registros inválidos

Quando registros inválidos forem inaceitáveis, use o operador expect_or_fail para interromper a execução imediatamente quando um registro falhar na validação. Se a operação for uma atualização de tabela, o sistema reverte atomicamente a transação:

@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Importante

Se o senhor tiver vários fluxos paralelos definidos em um pipeline, a falha de um único fluxo não causará a falha de outros fluxos.

Delta Live Tables flow failure explanation graph

Troubleshooting failed updates from expectations

Quando um pipeline falha devido a uma violação de expectativa, é necessário corrigir o código do pipeline para lidar corretamente com os dados inválidos antes de executar o pipeline novamente.

Expectations configured to fail pipelines modify the Spark query plan of your transformations to track information required to detect and report violations. You can use this information to identify which input record resulted in the violation for many queries. The following is an example expectation:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

Multiple expectations management

Observação

While both SQL and Python support multiple expectations within a single dataset, only Python allows you to group multiple separate expectations together and specify collective actions.

Delta Live Tables with multiple expectations fLow graph

You can group multiple expectations together and specify collective actions using the functions expect_all, expect_all_or_drop, and expect_all_or_fail.

These decorators accept a Python dictionary as an argument, where the key is the expectation name and the value is the expectation constraint. You can reuse the same set of expectations in multiple datasets in your pipeline. The following shows examples of each of the expect_all Python operators:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset