Run a Delta Live Tables pipeline in a workflow
You can run a Delta Live Tables pipeline as part of a data processing workflow with Databricks jobs, Apache Airflow, or Azure Data Factory.
Jobs
You can orchestrate multiple tasks in a Databricks job to implement a data processing workflow. To include a Delta Live Tables pipeline in a job, use the Pipeline task when you create a job. See Delta Live Tables pipeline task for jobs.
Apache Airflow
Apache Airflow is an open source solution for managing and scheduling data workflows. Airflow represents workflows as directed acyclic graphs (DAGs) of operations. You define a workflow in a Python file and Airflow manages the scheduling and execution. For information on installing and using Airflow with Databricks, see Orchestrate Databricks jobs with Apache Airflow.
To run a Delta Live Tables pipeline as part of an Airflow workflow, use the DatabricksSubmitRunOperator.
Requirements
The following are required to use the Airflow support for Delta Live Tables:
Airflow version 2.1.0 or later.
The Databricks provider package version 2.1.0 or later.
Example
The following example creates an Airflow DAG that triggers an update for the Delta Live Tables pipeline with the identifier 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Replace CONNECTION_ID
with the identifier for an Airflow connection to your workspace.
Save this example in the airflow/dags
directory and use the Airflow UI to view and trigger the DAG. Use the Delta Live Tables UI to view the details of the pipeline update.
Azure Data Factory
Note
Delta Live Tables and Azure Data Factory each include options to configure the number of retries when a failure occurs. If retry values are configured on your Delta Live Tables pipeline and on the Azure Data Factory activity that calls the pipeline, the number of retries is the Azure Data Factory retry value multiplied by the Delta Live Tables retry value.
For example, if a pipeline update fails, Delta Live Tables retries the update up to five times by default. If the Azure Data Factory retry is set to three, and your Delta Live Tables pipeline uses the default of five retries, your failing Delta Live Tables pipeline might be retried up to fifteen times. To avoid excessive retry attempts when pipeline updates fail, Databricks recommends limiting the number of retries when configuring the Delta Live Tables pipeline or the Azure Data Factory activity that calls the pipeline.
To change the retry configuration for your Delta Live Tables pipeline, use the pipelines.numUpdateRetryAttempts
setting when configuring the pipeline.
Azure Data Factory is a cloud-based ETL service that lets you orchestrate data integration and transformation workflows. Azure Data Factory directly supports running Databricks tasks in a workflow, including notebooks, JAR tasks, and Python scripts. You can also include a pipeline in a workflow by calling the Delta Live Tables API from an Azure Data Factory Web activity. For example, to trigger a pipeline update from Azure Data Factory:
Create a data factory or open an existing data factory.
When creation completes, open the page for your data factory and click the Open Azure Data Factory Studio tile. The Azure Data Factory user interface appears.
Create a new Azure Data Factory pipeline by selecting Pipeline from the New drop-down menu in the Azure Data Factory Studio user interface.
In the Activities toolbox, expand General and drag the Web activity to the pipeline canvas. Click the Settings tab and enter the following values:
Note
As a security best practice, when you authenticate with automated tools, systems, scripts, and apps, Databricks recommends that you use personal access tokens belonging to service principals instead of workspace users. To create tokens for service principals, see Manage tokens for a service principal.
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Replace
<get-workspace-instance>
.Replace
<pipeline-id>
with the pipeline identifier.Method: Select POST from the drop-down menu.
Headers: Click + New. In the Name text box, enter
Authorization
. In the Value text box, enterBearer <personal-access-token>
.Replace
<personal-access-token>
with a Databricks personal access token.Body: To pass additional request parameters, enter a JSON document containing the parameters. For example, to start an update and reprocess all data for the pipeline:
{"full_refresh": "true"}
. If there are no additional request parameters, enter empty braces ({}
).
To test the Web activity, click Debug on the pipeline toolbar in the Data Factory UI. The output and status of the run, including errors, are displayed in the Output tab of the Azure Data Factory pipeline. Use the Delta Live Tables UI to view the details of the pipeline update.
Tip
A common workflow requirement is to start a task after completion of a previous task. Because the Delta Live Tables updates
request is asynchronous—the request returns after starting the update but before the update completes—tasks in your Azure Data Factory pipeline with a dependency on the Delta Live Tables update must wait for the update to complete. An option to wait for update completion is adding an Until activity following the Web activity that triggers the Delta Live Tables update. In the Until activity:
Add a Wait activity to wait a configured number of seconds for update completion.
Add a Web activity following the Wait activity that uses the Delta Live Tables update details request to get the status of the update. The
state
field in the response returns the current state of the update, including if it has completed.Use the value of the
state
field to set the terminating condition for the Until activity. You can also use a Set Variable activity to add a pipeline variable based on thestate
value and use this variable for the terminating condition.