Delta Live Tables concepts

Preview

This feature is in Public Preview.

This article introduces the fundamental concepts you should understand to use Delta Live Tables effectively.

Pipelines

The main unit of execution in Delta Live Tables is a pipeline. A pipeline is a directed acyclic graph (DAG) linking data sources to target datasets. You define the contents of Delta Live Tables datasets using SQL queries or Python functions that return Spark SQL or Koalas DataFrames. A pipeline also has an associated configuration defining the settings required to run the pipeline. You can optionally specify data quality constraints when defining datasets.

You implement Delta Live Tables pipelines in Databricks notebooks. You can implement pipelines in a single notebook or in multiple notebooks. All queries in a single notebook must be implemented in either Python or SQL, but you can configure multiple-notebook pipelines with a mix of Python and SQL notebooks. Each notebook shares a storage location for output data and is able to reference datasets from other notebooks in the pipeline.

You can use Databricks Repos to store and manage your Delta Live Tables notebooks. To make a notebook managed with Databricks Repos available when you create a pipeline:

  • Add the comment line -- Databricks notebook source at the top of a SQL notebook.

  • Add the comment line # Databricks notebook source at the top of a Python notebook.

See Create, run, and manage Delta Live Tables pipelines to learn more about creating and running a pipeline. See Configure multiple notebooks in a pipeline for an example of configuring a multi-notebook pipeline.

Queries

Queries implement data transformations by defining a data source and a target dataset. Delta Live Tables queries can be implemented in Python or SQL.

Expectations

You use expectations to specify data quality controls on the contents of a dataset. Unlike a CHECK constraint in a traditional database which prevents adding any records that fail the constraint, expectations provide flexibility when processing data that fails data quality requirements. This flexibility allows you to process and store data that you expect to be messy and data that must meet strict quality requirements.

You can define expectations to retain records that fail validation, drop records that fail validation, or halt the pipeline when a record fails validation.

Pipeline settings

Pipeline settings are defined in JSON and include the parameters required to run the pipeline, including:

  • Libraries (in the form of notebooks) that contain the queries that describe the tables and views to create the target datasets in Delta Lake.

  • A cloud storage location where the tables and metadata required for processing will be stored. This location is either DBFS or another location you provide.

  • Optional configuration for a Spark cluster where data processing will take place.

See Delta Live Tables settings for more details.

Datasets

There are two types of datasets in a Delta Live Tables pipeline: views and tables.

  • Views are similar to a temporary view in SQL and are an alias for some computation. A view allows you to break a complicated query into smaller or easier-to-understand queries. Views also allow you to reuse a given transformation as a source for more than one table. Views are available within a pipeline only and cannot be queried interactively.

  • Tables are similar to traditional materialized views. The Delta Live Tables runtime automatically creates tables in the Delta format and ensures those tables are updated with the latest result of the query that creates the table.

You can define a live or streaming live view or table:

A live table or view always reflects the results of the query that defines it, including when the query defining the table or view is updated, or an input data source is updated. Like a traditional materialized view, a live table or view may be entirely computed when possible to optimize computation resources and time.

A streaming live table or view processes data that has been added only since the last pipeline update. Streaming tables and views are stateful; if the defining query changes, new data will be processed based on the new query and existing data is not recomputed.

Streaming live tables are valuable for a number of use cases, including:

  • Data retention: a streaming live table can preserve data indefinitely, even when an input data source has low retention, for example, a streaming data source such as Apache Kafka or Amazon Kinesis.

  • Data source evolution: data can be retained even if the data source changes, for example, moving from Kafka to Kinesis.

You can publish your tables to make them available for discovery and querying by downstream consumers.

Pipeline updates

After you create the pipeline and are ready to run it, you start an update. An update:

  • Starts a cluster with the correct configuration.

  • Discovers all the tables and views defined, and checks for any analysis errors such as invalid column names, missing dependencies, and syntax errors.

  • Creates or updates tables and views with the most recent data available.

The tables and views updated, and how those tables are views are updated, depends on the update type:

  • Refresh all: All live tables are updated to reflect the current state of their input data sources. For all streaming live tables, new rows are appended to the table.

  • Full refresh all: All live tables are updated to reflect the current state of their input data sources. For all streaming live tables, Delta Live Tables attempts to clear all data from each table and then load all data from the streaming source.

  • Refresh selection: The behavior of refresh selection is identical to refresh all, but allows you to refresh only selected tables. Selected live tables are updated to reflect the current state of their input data sources. For selected streaming live tables, new rows are appended to the table.

  • Full refresh selection: The behavior of full refresh selection is identical to full refresh all, but allows you to perform a full refresh of only selected tables. Selected live tables are updated to reflect the current state of their input data sources. For selected streaming live tables, Delta Live Tables attempts to clear all data from each table and then load all data from the streaming source.

For existing live tables, an update has the same behavior as a SQL REFRESH on a materialized view. For new live tables, the behavior is the same as a SQL CREATE operation.

If the pipeline is triggered, the system stops processing after refreshing all tables or selected tables in the pipeline once.

When a triggered updates completes successfully, each table that is part of the update is guaranteed to be updated based on the data available when the update started.

For use cases that require low latency, you can configure a pipeline to update continuously. See Continuous and triggered pipelines for more information about choosing an execution mode for your pipeline.

Continuous and triggered pipelines

Delta Live Tables supports two different modes of execution:

  • Triggered pipelines update each table with whatever data is currently available and then stop the cluster running the pipeline. Delta Live Tables automatically analyzes the dependencies between your tables and starts by computing those that read from external sources. Tables within the pipeline are updated after their dependent data sources have been updated.

  • Continuous pipelines update tables continuously as input data changes. Once an update is started, it continues to run until manually stopped. Continuous pipelines require an always-running cluster but ensure that downstream consumers have the most up-to-date data.

Triggered pipelines can reduce resource consumption and expense since the cluster runs only long enough to execute the pipeline. However, new data won’t be processed until the pipeline is triggered. Continuous pipelines require an always-running cluster, which is more expensive but reduces processing latency.

The continuous flag in the pipeline settings controls the execution mode. Pipelines run in triggered execution mode by default. Set continuous to true if you require low latency refreshes of the tables in your pipeline.

{
  ...
  "continuous": true,
  ...
}

The execution mode is independent of the type of table being computed. Both live and streaming live tables can be updated in either execution mode.

If some tables in your pipeline have weaker latency requirements, you can configure their update frequency independently by setting the pipelines.trigger.interval setting:

spark_conf={"pipelines.trigger.interval": "1 hour"}

This option does not turn off the cluster in between pipeline updates, but can free up resources for updating other tables in your pipeline.

Tables and views in continuous pipelines

You can use both live tables or views and streaming live tables or views in a pipeline that runs continuously. To avoid unnecessary processing, pipelines automatically monitor dependent Delta tables and perform an update only when the contents of those dependent tables have changed.

The Delta Live Tables runtime is not able to detect changes in non-Delta data sources. The table is still updated regularly, but with a higher default trigger interval to prevent excessive recomputation from slowing down any incremental processing happening on the cluster.

Development and production modes

You can optimize pipeline execution by switching between development and production modes. Use the Delta Live Tables Environment Toggle Icon buttons in the Pipelines UI to switch between these two modes. By default, pipelines run in development mode.

When you run your pipeline in development mode, the Delta Live Tables system:

  • Reuses a cluster to avoid the overhead of restarts. By default, clusters run for two hours when development mode is enabled. You can change this with the pipelines.clusterShutdown.delay setting in the Cluster configuration.

  • Disables pipeline retries so you can immediately detect and fix errors.

In production mode, the Delta Live Tables system:

  • Restarts the cluster for specific recoverable errors, including memory leaks and stale credentials.

  • Retries execution in the event of specific errors, for example, a failure to start a cluster.

Note

Switching between development and production modes only controls cluster and pipeline execution behavior. Storage locations and target schemas in the catalog for publishing tables must be configured as part of pipeline settings and are not affected when switching between modes.

Databricks Enhanced Autoscaling

Preview

This feature is in Public Preview.

Databricks Enhanced Autoscaling optimizes cluster utilization by automatically allocating cluster resources based on workload volume, with minimal impact to the data processing latency of your pipelines.

Enhanced Autoscaling adds to the existing cluster autoscaling functionality with the following features:

  • Enhanced Autoscaling implements optimization of streaming workloads, and adds enhancements to improve the performance of batch workloads. These optimizations result in more efficient cluster utilization, reduced resource usage, and lower cost.

  • Enhanced Autoscaling proactively shuts down under-utilized nodes while guaranteeing there are no failed tasks during shutdown. The existing cluster autoscaling feature scales down nodes only if the node is idle.

Requirements

To use Enhanced Autoscaling:

  • Set Cluster mode to Enhanced autoscaling when you create a pipeline in the Delta Live Tables UI.

  • Add the autoscale configuration to the pipeline default cluster and set the mode field to enhanced. The following example configures an Enhanced Autoscaling cluster with a minimum of 5 workers and a maximum of 10 workers. max_workers must be greater than or equal to min_workers.

Note

  • Enhanced Autoscaling is available for the default cluster only. If you include the autoscale configuration in the maintenance cluster configuration, the existing cluster autoscaling feature is used.

  • The autoscale configuration has two modes:

    • legacy: Use the existing cluster autoscaling. By default, legacy mode is enabled if the mode option is unset.

    • enhanced: Use Enhanced Autoscaling.

{
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 5,
        "max_workers": 10,
        "mode": "enhanced"
      }
    }
  ]
}

The pipeline is automatically restarted after the autoscaling configuration changes if the pipeline is continuous. After restart, expect a short period of increased latency. Following this brief period of increased latency, the cluster size should be updated based on your autoscale configuration, and the pipeline latency returned to its previous latency characteristics.

Monitoring Enhanced Autoscaling enabled pipelines

You can use the Delta Live Tables event log to monitor Enhanced Autoscaling metrics. You can view the metrics in the user interface. Enhanced Autoscaling events have the autoscale event type. The following are example events:

Event

Message

Cluster resize request started

Scaling [up or down] to <y> executors from current cluster size of <x>

Cluster resize request succeeded

Achieved cluster size <x> for cluster <cluster-id> with status SUCCEEDED

Cluster resize request partially succeeded

Achieved cluster size <x> for cluster <cluster-id> with status PARTIALLY_SUCCEEDED

Cluster resize request failed

Achieved cluster size <x> for cluster <cluster-id> with status FAILED

You can also view Enhanced Autoscaling events by directly querying the event log:

Product editions

You can use the Delta Live Tables product edition option to run your pipeline with the features best suited for the pipeline requirements. The following product editions are available:

  • Core to run streaming ingest workloads. Select the Core edition if your pipeline doesn’t require advanced features such as change data capture (CDC) or Delta Live Tables expectations.

  • Pro to run streaming ingest and CDC workloads. The Pro product edition supports all of the Core features, plus support for workloads that require updating tables based on changes in source data.

  • Advanced to run streaming ingest workloads, CDC workloads, and workloads that require expectations. The Advanced product edition supports the features of the Core and Pro editions, and also supports enforcement of data quality constraints with Delta Live Tables expectations.

You can select the product edition when you create or edit a pipeline. You can select a different edition for each pipeline.

If your pipeline includes features not supported by the selected product edition, for example, expectations, you will receive an error message with the reason for the error. You can then edit the pipeline to select the appropriate edition.