Build an end-to-end data pipeline in Databricks

This article provides an example of creating and deploying an end-to-end data processing pipeline, including ingesting raw data, transforming the data, and running analyses on the processed data.

What is a data pipeline?

A data pipeline implements the steps required to move data from source systems, transform that data based on requirements, and store the data in a target system. A data pipeline includes all the processes necessary to turn raw data into prepared data that users can consume. For example, a data pipeline might prepare data so data analysts and data scientists can extract value from the data through analysis and reporting.

An extract, transform, and load (ETL) workflow is a common example of a data pipeline. In ETL processing, data is ingested from source systems and written to a staging area, transformed based on requirements (ensuring data quality, deduplicating records, and so forth), and then written to a target system such as a data warehouse or data lake.

Data pipeline steps

To help you get started building data pipelines on Databricks, the example included in this article walks through creating a data processing workflow:

  • Use Databricks features to explore a raw dataset.

  • Create a Databricks notebook to ingest raw source data and write the raw data to a target table.

  • Create a Databricks notebook to transform the raw source data and write the transformed data to a target table.

  • Create a Databricks notebook to query the transformed data.

  • Automate the data pipeline with a Databricks job.

Databricks also provides Delta Live Tables to facilitate the implementation of data processing pipelines. Delta Live Tables is a framework that provides a declarative interface for implementing data processing pipelines. This article demonstrates how you can create a complete data pipeline using Databricks notebooks and a Databricks job to orchestrate a workflow, but Databricks recommends using Delta Live Tables to build reliable, maintainable, and testable data processing pipelines.

Requirements

Note

If you do not have cluster create permissions, you can still complete most of the steps below as long as you have access to a cluster.

Example: Million Song dataset

The dataset used in this example is a subset of the Million Song Dataset, a collection of features and metadata for contemporary music tracks. This dataset is available in the sample datasets included in your Databricks workspace.

Step 1: Create a cluster

To perform the data processing and analysis in this example, create a cluster to provide the compute resources needed to run commands.

  1. Click compute icon Compute in the sidebar.

  2. On the Compute page, click Create Cluster. The New Cluster page appears.

  3. Specify a unique name for the cluster, leave the remaining values in their default state, and click Create Cluster.

To learn more about Databricks clusters, see Clusters.

Step 2: Explore the source data

A common first step in creating a data pipeline is understanding the source data for the pipeline. In this step, you will run Databricks Utilities commands in a notebook to examine the source data and artifacts.

For an introduction to Databricks notebooks, watch this video:

  1. In the sidebar, click New Icon New and select Notebook from the menu. The Create Notebook dialog appears.

  2. Enter a name for the notebook, for example, Explore songs data. In Default Language, select Python. In Cluster, select the cluster you created or an existing cluster.

  3. Click Create.

  4. To view the contents of the directory containing the dataset, enter the following in the first cell of the notebook, click Run Menu, and select Run Cell.

    display(dbutils.fs.ls("/databricks-datasets/songs/"))
    

    path

    name

    size

    modificationTime

    1

    dbfs:/databricks-datasets/songs/README.md

    README.md

    1719

    1454620183000

    2

    dbfs:/databricks-datasets/songs/data-001/

    data-001/

    0

    1672791237846

    3

    dbfs:/databricks-datasets/songs/data-002/

    data-002/

    0

    1672791237846

  5. The README file has information about the dataset, including a description of the data schema. The schema information is used in the next step when ingesting the data. To view the contents of the README, click Down Caret in the cell actions menu, select Add Cell Below, enter the following in the new cell, click Run Menu, and select Run Cell.

    with open("/dbfs/databricks-datasets/songs/README.md") as f:
      x = ''.join(f.readlines())
    
    print(x)
    
    Sample of Million Song Dataset
    ===============================
    
    ## Source
    This data is a small subset of the [Million Song Dataset](http://labrosa.ee.columbia.edu/millionsong/).
    The original data was contributed by The Echo Nest.
    Prepared by T. Bertin-Mahieux <tb2332 '@' columbia.edu>
    
    ## Attribute Information
    - artist_id:string
    - artist_latitude:double
    - artist_longitude:double
    - artist_location:string
    - artist_name:string
    - duration:double
    - end_of_fade_in:double
    - key:int
    - key_confidence:double
    - loudness:double
    - release:string
    - song_hotnes:double
    - song_id:string
    - start_of_fade_out:double
    - tempo:double
    - time_signature:double
    - time_signature_confidence:double
    - title:string
    - year:double
    - partial_sequence:int
    ...
    
  6. The records used in this example are in the /databricks-datasets/songs/data-001/ directory. To view the contents of this directory, click Down Caret in the cell actions menu, select Add Cell Below, enter the following in the new cell, click Run Menu, and select Run Cell.

    display(dbutils.fs.ls("/databricks-datasets/songs/data-001"))
    

    path

    name

    size

    modificationTime

    1

    dbfs:/databricks-datasets/songs/data-001/header.txt

    header.txt

    377

    1454633901000

    2

    dbfs:/databricks-datasets/songs/data-001/part-00000

    part-00000

    52837

    1454547464000

    3

    dbfs:/databricks-datasets/songs/data-001/part-00001

    part-00001

    52469

    1454547465000

  7. To view a sample of the records, click Down Caret in the cell actions menu, select Add Cell Below, enter the following in the new cell, click Run Menu, and select Run Cell.

    with open("/dbfs/databricks-datasets/songs/data-001/part-00000") as f:
      x = ''.join(f.readlines())
    
    print(x)
    
    AR81V6H1187FB48872  nan     nan             Earl Sixteen    213.7073        0.0     11      0.419   -12.106 Soldier of Jah Army     nan     SOVNZSZ12AB018A9B8      208.289 125.882 1       0.0     Rastaman        2003    --
    ARVVZQP11E2835DBCB  nan     nan             Wavves  133.25016       0.0     0       0.282   0.596   Wavvves 0.471578247701  SOJTQHQ12A8C143C5F      128.116 89.519  1       0.0     I Want To See You (And Go To The Movies)        2009    --
    ARFG9M11187FB3BBCB  nan     nan     Nashua USA      C-Side  247.32689       0.0     9       0.612   -4.896  Santa Festival Compilation 2008 vol.1   nan     SOAJSQL12AB0180501      242.196 171.278 5       1.0     Loose on the Dancefloor 0       225261
    ...
    

    From viewing a sample of the records, you can observe a few things about the data. You’ll use these observations later when processing the data:

    • The records do not contain a header. Instead, the header is stored in a separate file in the same directory.

    • The files are in tab-separated value (TSV) format.

    • Some fields are missing or invalid.

Step 3: Ingest raw data to Delta Lake

Databricks recommends using Auto Loader for data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage.

Databricks recommends storing data with Delta Lake. Delta Lake is an open source storage layer that provides ACID transactions and enables the data lakehouse. Delta Lake is the default format for tables created in Databricks.

You can configure Auto Loader to automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time. Databricks recommends schema inference when using Auto Loader. However, as seen in the data exploration step, the songs data does not contain header information. Because the header is not stored with the data, you’ll need to explicitly define the schema, as shown in the next example.

  1. In the sidebar, click New Icon New and select Notebook from the menu. The Create Notebook dialog appears.

  2. Enter a name for the notebook, for example, Ingest songs data. In Default Language, select Python. In Cluster, select the cluster you created or an existing cluster.

  3. Click Create.

  4. Enter the following in the first cell of the notebook:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "<checkpoint-path>"
    
    # For purposes of this example, clear out data from previous runs. Because Auto Loader
    # is intended for incremental loading, in production applications you normally won't drop
    # target tables and checkpoints between runs.
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    dbutils.fs.rm(checkpoint_path, True)
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name))
    

    Replace:

    • <table-name> with the name of the Delta table to contain the ingested records, for example, pipeline_get_started_raw_song_data.

    • <checkpoint-path> with a path to a directory in DBFS to maintain checkpoint files, for example, /tmp/pipeline_get_started/_checkpoint/song_data.

  5. Click Run Menu, and select Run Cell. This example defines the data schema using the information from the README, ingests the songs data from all of the files contained in file_path, and writes the data to the Delta table specified by table_name.

Step 4: Prepare raw data and write to Delta Lake

In this step, you transform the raw songs data by filtering out unneeded columns and adding a new field containing a timestamp for the creation of the new record.

  1. In the sidebar, click New Icon New and select Notebook from the menu. The Create Notebook dialog appears.

  2. Enter a name for the notebook, for example, Prepare songs data. In Default Language, select SQL. In Cluster, select the cluster you created or an existing cluster.

  3. Click Create.

  4. Enter the following in the first cell of the notebook:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table_name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Replace

    • <table-name> with the name of the Delta table to contain the filtered and transformed records, for example, pipeline_get_started_prepared_song_data.

    • <raw-songs-table-name> with the name of the Delta table containing the raw songs records ingested in the previous step, for example, pipeline_get_started_raw_song_data.

  5. Click Run Menu, and select Run Cell.

Step 5: Query the transformed data

In this step, you extend the processing pipeline by adding queries to analyze the songs data. These queries use the prepared records created in the previous step.

  1. In the sidebar, click New Icon New and select Notebook from the menu. The Create Notebook dialog appears.

  2. Enter a name for the notebook, for example, Analyze songs data. In Default Language, select SQL. In Cluster, select the cluster you created or an existing cluster.

  3. Click Create.

  4. Enter the following in the first cell of the notebook:

    CREATE OR REPLACE VIEW
      artists_by_year
    AS SELECT
      artist_name,
      year
    FROM
      <prepared-songs-table-name>
    -- Remove records where the year field isn't populated
    WHERE
      year > 0;
    
    -- Which artists released the most songs in each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      artists_by_year
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Replace

    • <prepared-songs-table-name> with the name of the prepared table created in the previous step, for example pipeline_get_started_prepared_song_data.

  5. Click Down Caret in the cell actions menu, select Add Cell Below and enter the following in the new cell:

     -- Find songs for your DJ list
     CREATE OR REPLACE VIEW
       danceable_songs
     AS SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    
     SELECT * FROM danceable_songs limit 100
    

    Replace

    • <prepared-songs-table-name> with the name of the prepared table created in the previous step, for example pipeline_get_started_prepared_song_data.

  6. To run the queries and view the output, click Run all.

Step 6: Create a Databricks job to run the pipeline

You can create a workflow to automate running the data ingestion, processing, and analysis steps using a Databricks job.

  1. In your Data Science & Engineering workspace, do one of the following:

    • Click Jobs Icon Workflows in the sidebar and click Create Job Button.

    • In the sidebar, click New Icon New and select Job.

  2. In the task dialog box that appears on the Tasks tab, replace Add a name for your job… with your job name, for example, “Songs workflow”.

  3. In Task name, enter a name for the first task, for example, Ingest_songs_data.

  4. In Type, select the Notebook task type.

  5. In Source, select Workspace.

  6. Use the file browser to find the data ingestion notebook, click the notebook name, and click Confirm.

  7. In Cluster, select Shared_job_cluster or the cluster you created in the Create a cluster step.

  8. Click Save task.

  9. Click Add Task Button below the task you just created.

  10. In Task name, enter a name for the task, for example, Prepare_songs_data.

  11. In Type, select the Notebook task type.

  12. In Source, select Workspace.

  13. Use the file browser to find the data preparation notebook, click the notebook name, and click Confirm.

  14. In Cluster, select Shared_job_cluster or the cluster you created in the Create a cluster step.

  15. Click Save task.

  16. Click Add Task Button below the task you just created.

  17. In Task name, enter a name for the task, for example, Analyze_songs_data.

  18. In Type, select the Notebook task type.

  19. In Source, select Workspace.

  20. Use the file browser to find the data analysis notebook, click the notebook name, and click Confirm.

  21. In Cluster, select Shared_job_cluster or the cluster you created in the Create a cluster step.

  22. Click Save task.

  23. To run the workflow, Click Run Now Button. To view details for the run, click the link in the Start time column for the run in the job runs view. Click on each task to view details for the task run.

  24. To view the results when the workflow completes, click the final data analysis task. The Output page appears and displays the query results.

Step 7: Schedule the data pipeline job

A common requirement is to run a data pipeline on a scheduled basis. To define a schedule for the job that runs the pipeline:

  1. Click Jobs Icon Workflows in the sidebar.

  2. In the Name column, click the job name. The side panel displays the Job details.

  3. Click Edit schedule in the Job details panel and set the Schedule Type to Scheduled.

  4. Specify the period, starting time, and time zone. Optionally select the Show Cron Syntax checkbox to display and edit the schedule in Quartz Cron Syntax.

  5. Click Save.

Learn more