Monitor Delta Live Tables pipelines
This article describes using built-in monitoring and observability features for Delta Live Tables pipelines. These features support tasks such as:
Observing the progress and status of pipeline updates. See What pipeline details are available in the UI?.
Alerting on pipeline events such as the success or failure of pipeline updates. See Add email notifications for pipeline events.
Viewing metrics for streaming sources like Apache Kafka and Auto Loader (Public Preview). See View streaming metrics.
Extracting detailed information on pipeline updates such as data lineage, data quality metrics, and resource usage. See What is the Delta Live Tables event log?.
Defining custom actions to take when specific events occur. See Define custom monitoring of Delta Live Tables pipelines with event hooks.
To inspect and diagnose query performance, see Access query history for Delta Live Tables pipelines. This feature is in Public Preview.
Add email notifications for pipeline events
You can configure one or more email addresses to receive notifications when the following occurs:
A pipeline update completes successfully.
A pipeline update fails, either with a retryable or a non-retryable error. Select this option to receive a notification for all pipeline failures.
A pipeline update fails with a non-retryable (fatal) error. Select this option to receive a notification only when a non-retryable error occurs.
A single data flow fails.
To configure email notifications when you create or edit a pipeline:
Click Add notification.
Enter one or more email addresses to receive notifications.
Click the checkbox for each notification type to send to the configured email addresses.
Click Add notification.
What pipeline details are available in the UI?
The pipeline graph appears as soon as an update to a pipeline has successfully started. Arrows represent dependencies between datasets in your pipeline. By default, the pipeline details page shows the most recent update for the table, but you can select older updates from a drop-down menu.
Details include the pipeline ID, source code, compute cost, product edition, and the channel configured for the pipeline.
To see a tabular view of datasets, click the List tab. The List view allows you to see all datasets in your pipeline represented as a row in a table and is useful when your pipeline DAG is too large to visualize in the Graph view. You can control the datasets displayed in the table using multiple filters such as dataset name, type, and status. To switch back to the DAG visualization, click Graph.
The Run as user is the pipeline owner, and pipeline updates run with this user’s permissions. To change the run as
user, click Permissions and change the pipeline owner.
View streaming metrics
Preview
Streaming observability for Delta Live Tables is in Public Preview.
You can view streaming metrics from the data sources supported by Spark Structured Streaming, like Apache Kafka, Amazon Kinesis, Auto Loader, and Delta tables, for each streaming flow in your Delta Live Tables pipeline. Metrics are displayed as charts in the Delta Live Tables UI’s right pane and include backlog seconds, backlog bytes, backlog records, and backlog files. The charts display the maximum value aggregated by minute and a tooltip shows maximum values when you hover over the chart. The data is limited to the last 48 hours from the current time.
Tables in your pipeline with streaming metrics available display the icon when viewing the pipeline DAG in the UI Graph view. To view the streaming metrics, click the to display the streaming metric chart in the Flows tab in the right pane. You can also apply a filter to view only tables with streaming metrics by clicking List and then clicking Has streaming metrics.
Each streaming source supports only specific metrics. Metrics not supported by a streaming source are not available to view in the UI. The following table shows the metrics available for supported streaming sources:
source |
backlog bytes |
backlog records |
backlog seconds |
backlog files |
---|---|---|---|---|
Kafka |
✓ |
✓ |
||
Kinesis |
✓ |
✓ |
||
Delta |
✓ |
✓ |
||
Auto Loader |
✓ |
✓ |
||
Google Pub/Sub |
✓ |
✓ |
What is the Delta Live Tables event log?
The Delta Live Tables event log contains all information related to a pipeline, including audit logs, data quality checks, pipeline progress, and data lineage. You can use the event log to track, understand, and monitor the state of your data pipelines.
You can view event log entries in the Delta Live Tables user interface, the Delta Live Tables API, or by directly querying the event log. This section focuses on querying the event log directly.
You can also define custom actions to run when events are logged, for example, sending alerts, with event hooks.
Event log schema
The following table describes the event log schema. Some of these fields contain JSON data that require parsing to perform some queries, such as the details
field. Databricks supports the :
operator to parse JSON fields. See : (colon sign) operator.
Field |
Description |
---|---|
|
A unique identifier for the event log record. |
|
A JSON document containing metadata to identify and order events. |
|
A JSON document containing metadata for the origin of the event, for example, the cloud provider, the cloud
provider region, |
|
The time the event was recorded. |
|
A human-readable message describing the event. |
|
The event type, for example, |
|
If an error occurred, details describing the error. |
|
A JSON document containing structured details of the event. This is the primary field used for analyzing events. |
|
The event type. |
|
The stability of the event schema. The possible values are:
|
Querying the event log
The location of the event log and the interface to query the event log depend on whether your pipeline is configured to use the Hive metastore or Unity Catalog.
Hive metastore
If your pipeline publishes tables to the Hive metastore, the event log is stored in /system/events
under the storage
location. For example, if you have configured your pipeline storage
setting as /Users/username/data
, the event log is stored in the /Users/username/data/system/events
path in DBFS.
If you have not configured the storage
setting, the default event log location is /pipelines/<pipeline-id>/system/events
in DBFS. For example, if the ID of your pipeline is 91de5e48-35ed-11ec-8d3d-0242ac130003
, the storage location is /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events
.
You can create a view to simplify querying the event log. The following example creates a temporary view called event_log_raw
. This view is used in the example event log queries included in this article:
CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;
Replace <event-log-path>
with the event log location.
Each instance of a pipeline run is called an update. You often want to extract information for the most recent update. Run the following query to find the identifier for the most recent update and save it in the latest_update_id
temporary view. This view is used in the example event log queries included in this article:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
You can query the event log in a Databricks notebook or the SQL editor. Use a notebook or the SQL editor to run the example event log queries.
Unity Catalog
If your pipeline publishes tables to Unity Catalog, you must use the event_log
table valued function (TVF) to fetch the event log for the pipeline. You retrieve the event log for a pipeline by passing the pipeline ID or a table name to the TVF. For example, to retrieve the event log records for the pipeline with ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b
:
SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")
To retrieve the event log records for the pipeline that created or owns the table my_catalog.my_schema.table1
:
SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))
To call the TVF, you must use a shared cluster or a SQL warehouse. For example, you can use a notebook attached to a shared cluster or use the SQL editor connected to a SQL warehouse.
To simplify querying events for a pipeline, the owner of the pipeline can create a view over the event_log
TVF. The following example creates a view over the event log for a pipeline. This view is used in the example event log queries included in this article.
Note
The event_log
TVF can be called only by the pipeline owner and a view created over the event_log
TVF can be queried only by the pipeline owner. The view cannot be shared with other users.
CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");
Replace <pipeline-ID>
with the unique identifier for the Delta Live Tables pipeline. You can find the ID in the Pipeline details panel in the Delta Live Tables UI.
Each instance of a pipeline run is called an update. You often want to extract information for the most recent update. Run the following query to find the identifier for the most recent update and save it in the latest_update_id
temporary view. This view is used in the example event log queries included in this article:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
Query lineage information from the event log
Events containing information about lineage have the event type flow_definition
. The details:flow_definition
object contains the output_dataset
and input_datasets
defining each relationship in the graph.
You can use the following query to extract the input and output datasets to see lineage information:
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
|
|
---|---|
|
|
|
|
|
|
|
|
Query data quality from the event log
If you define expectations on datasets in your pipeline, the data quality metrics are stored in the details:flow_progress.data_quality.expectations
object. Events containing information about data quality have the event type flow_progress
. The following example queries the data quality metrics for the last pipeline update:
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
|
|
|
|
---|---|---|---|
|
|
4083 |
0 |
Monitor data backlog by querying the event log
Delta Live Tables tracks how much data is present in the backlog in the details:flow_progress.metrics.backlog_bytes
object. Events containing backlog metrics have the event type flow_progress
. The following example queries backlog metrics for the last pipeline update:
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
Note
The backlog metrics may not be available depending on the pipeline’s data source type and Databricks Runtime version.
Monitor enhanced autoscaling events from the event log for pipelines without serverless enabled
For DLT pipelines that do not use serverless compute, the event log captures cluster resizes when enhanced autoscaling is enabled in your pipelines. Events containing information about enhanced autoscaling have the event type autoscale
. The cluster resizing request information is stored in the details:autoscale
object. The following example queries the enhanced autoscaling cluster resize requests for the last pipeline update:
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Monitor compute resource utilization
cluster_resources
events provide metrics on the number of task slots in the cluster, how much those task slots are utilized, and how many tasks are waiting to be scheduled.
When enhanced autoscaling is enabled, cluster_resources
events also contain metrics for the autoscaling algorithm, including latest_requested_num_executors
, and optimal_num_executors
. The events also show the status of the algorithm as different states such as CLUSTER_AT_DESIRED_SIZE
, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
, and BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
.
This information can be viewed in conjunction with the autoscaling events to provide an overall picture of enhanced autoscaling.
The following example queries the task queue size history for the last pipeline update:
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
The following example queries the utilization history for the last pipeline update:
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
The following example queries the executor count history, accompanied by metrics available only for enhanced autoscaling pipelines, including the number of executors requested by the algorithm in the latest request, the optimal number of executors recommended by the algorithm based on the most recent metrics, and the autoscaling algorithm state:
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
Audit Delta Live Tables pipelines
You can use Delta Live Tables event log records and other Databricks audit logs to get a complete picture of how data is being updated in Delta Live Tables.
Delta Live Tables uses the credentials of the pipeline owner to run updates. You can change the credentials used by updating the pipeline owner. Delta Live Tables records the user for actions on the pipeline, including pipeline creation, edits to configuration, and triggering updates.
See Unity Catalog events for a reference of Unity Catalog audit events.
Query user actions in the event log
You can use the event log to audit events, for example, user actions. Events containing information about user actions have the event type user_action
.
Information about the action is stored in the user_action
object in the details
field. Use the following query to construct an audit log of user events. To create the event_log_raw
view used in this query, see Querying the event log.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
|
|
|
---|---|---|
2021-05-20T19:36:03.517+0000 |
|
|
2021-05-20T19:35:59.913+0000 |
|
|
2021-05-27T00:35:51.971+0000 |
|
|