Orchestrate Databricks jobs with Apache Airflow

This article describes the Apache Airflow support for orchestrating data pipelines with Databricks, has instructions for installing and configuring Airflow locally, and provides an example of deploying and running a Databricks workflow with Airflow.

Job orchestration in a data pipeline

Developing and deploying a data processing pipeline often requires managing complex dependencies between tasks. For example, a pipeline might read data from a source, clean the data, transform the cleaned data, and write the transformed data to a target. You also need support for testing, scheduling, and troubleshooting errors when you operationalize a pipeline.

Workflow systems address these challenges by allowing you to define dependencies between tasks, schedule when pipelines run, and monitor workflows. Apache Airflow is an open source solution for managing and scheduling data pipelines. Airflow represents data pipelines as directed acyclic graphs (DAGs) of operations. You define a workflow in a Python file, and Airflow manages the scheduling and execution. The Airflow Databricks connection lets you take advantage of the optimized Spark engine offered by Databricks with the scheduling features of Airflow.

Requirements

  • The integration between Airflow and Databricks requires Airflow version 2.5.0 and later. The examples in this article are tested with Airflow version 2.6.1.

  • Airflow requires Python 3.8, 3.9, 3.10, or 3.11. The examples in this article are tested with Python 3.8.

  • The instructions in this article to install and run Airflow require pipenv to create a Python virtual environment.

Airflow operators for Databricks

An Airflow DAG is composed of tasks, where each task runs an Airflow Operator. Airflow operators supporting the integration to Databricks are implemented in the Databricks provider.

The Databricks provider includes operators to run a number of tasks against a Databricks workspace, including importing data into a table, running SQL queries, and working with Databricks Git folders.

The Databricks provider implements two operators for triggering jobs:

To create a new Databricks job or reset an existing job, the Databricks provider implements the DatabricksCreateJobsOperator. The DatabricksCreateJobsOperator uses the POST /api/2.1/jobs/create and POST /api/2.1/jobs/reset API requests. You can use the DatabricksCreateJobsOperator with the DatabricksRunNowOperator to create and run a job.

Note

Using the Databricks operators to trigger a job requires providing credentials in the Databricks connection configuration. See Create a Databricks personal access token for Airflow.

The Databricks Airflow operators write the job run page URL to the Airflow logs every polling_period_seconds (the default is 30 seconds). For more information, see the apache-airflow-providers-databricks package page on the Airflow website.

Install the Airflow Databricks integration locally

To install Airflow and the Databricks provider locally for testing and development, use the following steps. For other Airflow installation options, including creating a production installation, see installation in the Airflow documentation.

Open a terminal and run the following commands:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Replace <firstname>, <lastname>, and <email> with your username and email. You will be prompted to enter a password for the admin user. Make sure to save this password because it is required to log in to the Airflow UI.

This script performs the following steps:

  1. Creates a directory named airflow and changes into that directory.

  2. Uses pipenv to create and spawn a Python virtual environment. Databricks recommends using a Python virtual environment to isolate package versions and code dependencies to that environment. This isolation helps reduce unexpected package version mismatches and code dependency collisions.

  3. Initializes an environment variable named AIRFLOW_HOME set to the path of the airflow directory.

  4. Installs Airflow and the Airflow Databricks provider packages.

  5. Creates an airflow/dags directory. Airflow uses the dags directory to store DAG definitions.

  6. Initializes a SQLite database that Airflow uses to track metadata. In a production Airflow deployment, you would configure Airflow with a standard database. The SQLite database and default configuration for your Airflow deployment are initialized in the airflow directory.

  7. Creates an admin user for Airflow.

Tip

To confirm the installation of the Databricks provider, run the following command in the Airflow installation directory:

airflow providers list

Start the Airflow web server and scheduler

The Airflow web server is required to view the Airflow UI. To start the web server, open a terminal in the Airflow installation directory and run the following commands:

Note

If the Airflow web server fails to start because of a port conflict, you can change the default port in the Airflow configuration.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

The scheduler is the Airflow component that schedules DAGs. To start the scheduler, open a new terminal in the Airflow installation directory and run the following commands:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Test the Airflow installation

To verify the Airflow installation, you can run one of the example DAGs included with Airflow:

  1. In a browser window, open http://localhost:8080/home. Log in to the Airflow UI with the username and password you created when installing Airflow. The Airflow DAGs page appears.

  2. Click the Pause/Unpause DAG toggle to unpause one of the example DAGs, for example, the example_python_operator.

  3. Trigger the example DAG by clicking the Trigger DAG button.

  4. Click the DAG name to view details, including the run status of the DAG.

Create a Databricks personal access token for Airflow

Airflow connects to Databricks using a Databricks personal access token (PAT). To create a PAT, follow the steps in Databricks personal access tokens for workspace users.

Note

As a security best practice, when you authenticate with automated tools, systems, scripts, and apps, Databricks recommends that you use personal access tokens belonging to service principals instead of workspace users. To create tokens for service principals, see Manage tokens for a service principal.

Configure a Databricks connection

Your Airflow installation contains a default connection for Databricks. To update the connection to connect to your workspace using the personal access token you created above:

  1. In a browser window, open http://localhost:8080/connection/list/. If prompted to sign in, enter your admin username and password.

  2. Under Conn ID, locate databricks_default and click the Edit record button.

  3. Replace the value in the Host field with the workspace instance name of your Databricks deployment, for example, https://adb-123456789.cloud.databricks.com.

  4. In the Password field, enter your Databricks personal access token.

  5. Click Save.

Example: Create an Airflow DAG to run a Databricks job

The following example demonstrates how to create a simple Airflow deployment that runs on your local machine and deploys an example DAG to trigger runs in Databricks. In this example, you will:

  1. Create a new notebook and add code to print a greeting based on a configured parameter.

  2. Create a Databricks job with a single task that runs the notebook.

  3. Configure an Airflow connection to your Databricks workspace.

  4. Create an Airflow DAG to trigger the notebook job. You define the DAG in a Python script using DatabricksRunNowOperator.

  5. Use the Airflow UI to trigger the DAG and view the run status.

Create a notebook

This example uses a notebook containing two cells:

  • The first cell contains a Databricks Utilities text widget defining a variable named greeting set to the default value world.

  • The second cell prints the value of the greeting variable prefixed by hello.

To create the notebook:

  1. Go to your Databricks workspace, click New Icon New in the sidebar, and select Notebook.

  2. Give your notebook a name, such as Hello Airflow, and make sure the default language is set to Python.

  3. Copy the following Python code and paste it into the first cell of the notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Add a new cell below the first cell and copy and paste the following Python code into the new cell:

    print("hello {}".format(greeting))
    

Create a job

  1. Click Workflows Icon Workflows in the sidebar.

  2. Click Create Job Button.

    The Tasks tab appears with the create task dialog.

    Create first task dialog
  3. Replace Add a name for your job… with your job name.

  4. In the Task name field, enter a name for the task, for example, greeting-task.

  5. In the Type drop-down menu, select Notebook.

  6. In the Source drop-down menu, select Workspace.

  7. Click the Path text box and use the file browser to find the notebook you created, click the notebook name, and click Confirm.

  8. Click Add under Parameters. In the Key field, enter greeting. In the Value field, enter Airflow user.

  9. Click Create task.

In the Job details panel, copy the Job ID value. This value is required to trigger the job from Airflow.

Run the job

To test your new job in the Databricks Jobs UI, click Run Now Button in the upper right corner. When the run completes, you can verify the output by viewing the job run details.

Create a new Airflow DAG

You define an Airflow DAG in a Python file. To create a DAG to trigger the example notebook job:

  1. In a text editor or IDE, create a new file named databricks_dag.py with the following contents:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Replace JOB_ID with the value of the job ID saved earlier.

  2. Save the file in the airflow/dags directory. Airflow automatically reads and installs DAG files stored in airflow/dags/.

Install and verify the DAG in Airflow

To trigger and verify the DAG in the Airflow UI:

  1. In a browser window, open http://localhost:8080/home. The Airflow DAGs screen appears.

  2. Locate databricks_dag and click the Pause/Unpause DAG toggle to unpause the DAG.

  3. Trigger the DAG by clicking the Trigger DAG button.

  4. Click a run in the Runs column to view the status and details of the run.