Build an end-to-end data pipeline in Databricks
This article provides an example of creating and deploying an end-to-end data processing pipeline, including ingesting raw data, transforming the data, and running analyses on the processed data.
Note
This article demonstrates creating a complete data pipeline using Databricks notebooks and a Databricks job to orchestrate a workflow. Databricks also provides Delta Live Tables to facilitate the implementation of data processing pipelines. Delta Live Tables is a framework that provides a declarative interface for implementing data processing pipelines, and Databricks recommends using Delta Live Tables to build reliable, maintainable, and testable data processing pipelines.
What is a data pipeline?
A data pipeline implements the steps required to move data from source systems, transform that data based on requirements, and store the data in a target system. A data pipeline includes all the processes necessary to turn raw data into prepared data that users can consume. For example, a data pipeline might prepare data so data analysts and data scientists can extract value from the data through analysis and reporting.
An extract, transform, and load (ETL) workflow is a common example of a data pipeline. In ETL processing, data is ingested from source systems and written to a staging area, transformed based on requirements (ensuring data quality, deduplicating records, and so forth), and then written to a target system such as a data warehouse or data lake.
Data pipeline steps
To help you get started building data pipelines on Databricks, the example included in this article walks through creating a data processing workflow:
Use Databricks features to explore a raw dataset.
Create a Databricks notebook to ingest raw source data and write the raw data to a target table.
Create a Databricks notebook to transform the raw source data and write the transformed data to a target table.
Create a Databricks notebook to query the transformed data.
Automate the data pipeline with a Databricks job.
Requirements
You’re logged into Databricks and in the Data Science & Engineering workspace.
You have permission to create a cluster.
Note
If you do not have cluster create permissions, you can still complete most of the steps below as long as you have access to a cluster.
Example: Million Song dataset
The dataset used in this example is a subset of the Million Song Dataset, a collection of features and metadata for contemporary music tracks. This dataset is available in the sample datasets included in your Databricks workspace.
Step 1: Create a cluster
To perform the data processing and analysis in this example, create a cluster to provide the compute resources needed to run commands.
Click Compute in the sidebar.
On the Compute page, click Create Cluster. The New Cluster page appears.
Specify a unique name for the cluster, leave the remaining values in their default state, and click Create Cluster.
To learn more about Databricks clusters, see Clusters.
Step 2: Explore the source data
A common first step in creating a data pipeline is understanding the source data for the pipeline. In this step, you will run Databricks Utilities commands in a notebook to examine the source data and artifacts.
For an introduction to Databricks notebooks, watch this video:
In the sidebar, click
New and select Notebook from the menu. The notebook opens with a default name that you can replace.
Enter a name for the notebook, for example,
Explore songs data
. By default:Python is the selected language.
The notebook is attached to the last cluster you used - in this case, the cluster you just created.
To view the contents of the directory containing the dataset, enter the following in the first cell of the notebook, click
, and select Run Cell.
display(dbutils.fs.ls("/databricks-datasets/songs/"))
path
name
size
modificationTime
1
dbfs:/databricks-datasets/songs/README.md
README.md
1719
1454620183000
2
dbfs:/databricks-datasets/songs/data-001/
data-001/
0
1672791237846
3
dbfs:/databricks-datasets/songs/data-002/
data-002/
0
1672791237846
The README file has information about the dataset, including a description of the data schema. The schema information is used in the next step when ingesting the data. To view the contents of the README, click
in the cell actions menu, select Add Cell Below, enter the following in the new cell, click
, and select Run Cell.
with open("/dbfs/databricks-datasets/songs/README.md") as f: x = ''.join(f.readlines()) print(x)
Sample of Million Song Dataset =============================== ## Source This data is a small subset of the [Million Song Dataset](http://labrosa.ee.columbia.edu/millionsong/). The original data was contributed by The Echo Nest. Prepared by T. Bertin-Mahieux <tb2332 '@' columbia.edu> ## Attribute Information - artist_id:string - artist_latitude:double - artist_longitude:double - artist_location:string - artist_name:string - duration:double - end_of_fade_in:double - key:int - key_confidence:double - loudness:double - release:string - song_hotnes:double - song_id:string - start_of_fade_out:double - tempo:double - time_signature:double - time_signature_confidence:double - title:string - year:double - partial_sequence:int ...
The records used in this example are in the
/databricks-datasets/songs/data-001/
directory. To view the contents of this directory, clickin the cell actions menu, select Add Cell Below, enter the following in the new cell, click
, and select Run Cell.
display(dbutils.fs.ls("/databricks-datasets/songs/data-001"))
path
name
size
modificationTime
1
dbfs:/databricks-datasets/songs/data-001/header.txt
header.txt
377
1454633901000
2
dbfs:/databricks-datasets/songs/data-001/part-00000
part-00000
52837
1454547464000
3
dbfs:/databricks-datasets/songs/data-001/part-00001
part-00001
52469
1454547465000
To view a sample of the records, click
in the cell actions menu, select Add Cell Below, enter the following in the new cell, click
, and select Run Cell.
with open("/dbfs/databricks-datasets/songs/data-001/part-00000") as f: x = ''.join(f.readlines()) print(x)
AR81V6H1187FB48872 nan nan Earl Sixteen 213.7073 0.0 11 0.419 -12.106 Soldier of Jah Army nan SOVNZSZ12AB018A9B8 208.289 125.882 1 0.0 Rastaman 2003 -- ARVVZQP11E2835DBCB nan nan Wavves 133.25016 0.0 0 0.282 0.596 Wavvves 0.471578247701 SOJTQHQ12A8C143C5F 128.116 89.519 1 0.0 I Want To See You (And Go To The Movies) 2009 -- ARFG9M11187FB3BBCB nan nan Nashua USA C-Side 247.32689 0.0 9 0.612 -4.896 Santa Festival Compilation 2008 vol.1 nan SOAJSQL12AB0180501 242.196 171.278 5 1.0 Loose on the Dancefloor 0 225261 ...
From viewing a sample of the records, you can observe a few things about the data. You’ll use these observations later when processing the data:
The records do not contain a header. Instead, the header is stored in a separate file in the same directory.
The files are in tab-separated value (TSV) format.
Some fields are missing or invalid.
Step 3: Ingest raw data to Delta Lake
Databricks recommends using Auto Loader for data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage.
Databricks recommends storing data with Delta Lake. Delta Lake is an open source storage layer that provides ACID transactions and enables the data lakehouse. Delta Lake is the default format for tables created in Databricks.
You can configure Auto Loader to automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time. Databricks recommends schema inference when using Auto Loader. However, as seen in the data exploration step, the songs data does not contain header information. Because the header is not stored with the data, you’ll need to explicitly define the schema, as shown in the next example.
In the sidebar, click
New and select Notebook from the menu. The Create Notebook dialog appears.
Enter a name for the notebook, for example,
Ingest songs data
. In Default Language, select Python. In Cluster, select the cluster you created or an existing cluster.Click Create.
Enter the following in the first cell of the notebook:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "<checkpoint-path>" # For purposes of this example, clear out data from previous runs. Because Auto Loader # is intended for incremental loading, in production applications you normally won't drop # target tables and checkpoints between runs. spark.sql(f"DROP TABLE IF EXISTS {table_name}") dbutils.fs.rm(checkpoint_path, True) schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name))
Replace:
<table-name>
with the name of the Delta table to contain the ingested records, for example,pipeline_get_started_raw_song_data
.<checkpoint-path>
with a path to a directory in DBFS to maintain checkpoint files, for example,/tmp/pipeline_get_started/_checkpoint/song_data
.
Click
, and select Run Cell. This example defines the data schema using the information from the
README
, ingests the songs data from all of the files contained infile_path
, and writes the data to the Delta table specified bytable_name
.
Step 4: Prepare raw data and write to Delta Lake
In this step, you transform the raw songs data by filtering out unneeded columns and adding a new field containing a timestamp for the creation of the new record.
In the sidebar, click
New and select Notebook from the menu. The Create Notebook dialog appears.
Enter a name for the notebook, for example,
Prepare songs data
. In Default Language, select SQL. In Cluster, select the cluster you created or an existing cluster.Click Create.
Enter the following in the first cell of the notebook:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table_name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Replace
<table-name>
with the name of the Delta table to contain the filtered and transformed records, for example,pipeline_get_started_prepared_song_data
.<raw-songs-table-name>
with the name of the Delta table containing the raw songs records ingested in the previous step, for example,pipeline_get_started_raw_song_data
.
Click
, and select Run Cell.
Step 5: Query the transformed data
In this step, you extend the processing pipeline by adding queries to analyze the songs data. These queries use the prepared records created in the previous step.
In the sidebar, click
New and select Notebook from the menu. The Create Notebook dialog appears.
Enter a name for the notebook, for example,
Analyze songs data
. In Default Language, select SQL. In Cluster, select the cluster you created or an existing cluster.Click Create.
Enter the following in the first cell of the notebook:
CREATE OR REPLACE VIEW artists_by_year AS SELECT artist_name, year FROM <prepared-songs-table-name> -- Remove records where the year field isn't populated WHERE year > 0; -- Which artists released the most songs in each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM artists_by_year GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Replace
<prepared-songs-table-name>
with the name of the prepared table created in the previous step, for examplepipeline_get_started_prepared_song_data
.
Click
in the cell actions menu, select Add Cell Below and enter the following in the new cell:
-- Find songs for your DJ list CREATE OR REPLACE VIEW danceable_songs AS SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140; SELECT * FROM danceable_songs limit 100
Replace
<prepared-songs-table-name>
with the name of the prepared table created in the previous step, for examplepipeline_get_started_prepared_song_data
.
To run the queries and view the output, click Run all.
Step 6: Create a Databricks job to run the pipeline
You can create a workflow to automate running the data ingestion, processing, and analysis steps using a Databricks job.
In your Data Science & Engineering workspace, do one of the following:
Click
Workflows in the sidebar and click
.
In the sidebar, click
New and select Job.
In the task dialog box that appears on the Tasks tab, replace Add a name for your job… with your job name, for example, “Songs workflow”.
In Task name, enter a name for the first task, for example,
Ingest_songs_data
.In Type, select the Notebook task type.
In Source, select Workspace.
Use the file browser to find the data ingestion notebook, click the notebook name, and click Confirm.
In Cluster, select Shared_job_cluster or the cluster you created in the
Create a cluster
step.Click Create.
Click
below the task you just created.
In Task name, enter a name for the task, for example,
Prepare_songs_data
.In Type, select the Notebook task type.
In Source, select Workspace.
Use the file browser to find the data preparation notebook, click the notebook name, and click Confirm.
In Cluster, select Shared_job_cluster or the cluster you created in the
Create a cluster
step.Click Create.
Click
below the task you just created.
In Task name, enter a name for the task, for example,
Analyze_songs_data
.In Type, select the Notebook task type.
In Source, select Workspace.
Use the file browser to find the data analysis notebook, click the notebook name, and click Confirm.
In Cluster, select Shared_job_cluster or the cluster you created in the
Create a cluster
step.Click Create.
To run the workflow, Click
. To view details for the run, click the link in the Start time column for the run in the job runs view. Click each task to view details for the task run.
To view the results when the workflow completes, click the final data analysis task. The Output page appears and displays the query results.
Step 7: Schedule the data pipeline job
A common requirement is to run a data pipeline on a scheduled basis. To define a schedule for the job that runs the pipeline:
Click
Workflows in the sidebar.
In the Name column, click the job name. The side panel displays the Job details.
Click Add trigger in the Job details panel and select Scheduled in Trigger type.
Specify the period, starting time, and time zone. Optionally select the Show Cron Syntax checkbox to display and edit the schedule in Quartz Cron Syntax.
Click Save.
Learn more
To learn more about Databricks notebooks, see Introduction to Databricks notebooks.
To learn more about Databricks Jobs, see What is Databricks Jobs?.
To learn more about Delta Lake, see What is Delta Lake?.
To learn more about Delta Live Tables, see the What is Delta Live Tables?.