Build an end-to-end data pipeline in Databricks
This article shows you how to create and deploy an end-to-end data processing pipeline, including how to ingest raw data, transform the data, and run analyses on the processed data.
Note
Although this article demonstrates how to create a complete data pipeline using Databricks notebooks and a Databricks job to orchestrate a workflow, Databricks recommends using Delta Live Tables, a declarative interface for building reliable, maintainable, and testable data processing pipelines.
What is a data pipeline?
A data pipeline implements the steps required to move data from source systems, transform that data based on requirements, and store the data in a target system. A data pipeline includes all the processes necessary to turn raw data into prepared data that users can consume. For example, a data pipeline might prepare data so data analysts and data scientists can extract value from the data through analysis and reporting.
An extract, transform, and load (ETL) workflow is a common example of a data pipeline. In ETL processing, data is ingested from source systems and written to a staging area, transformed based on requirements (ensuring data quality, deduplicating records, and so forth), and then written to a target system such as a data warehouse or data lake.
Data pipeline steps
To help you get started building data pipelines on Databricks, the example included in this article walks through creating a data processing workflow:
Use Databricks features to explore a raw dataset.
Create a Databricks notebook to ingest raw source data and write the raw data to a target table.
Create a Databricks notebook to transform the raw source data and write the transformed data to a target table.
Create a Databricks notebook to query the transformed data.
Automate the data pipeline with a Databricks job.
Requirements
You’re logged into Databricks and in the Data Science & Engineering workspace.
You have permission to create a cluster or access to a cluster.
(Optional) To publish tables to Unity Catalog, you must create a catalog and schema in Unity Catalog.
Example: Million Song dataset
The dataset used in this example is a subset of the Million Song Dataset, a collection of features and metadata for contemporary music tracks. This dataset is available in the sample datasets included in your Databricks workspace.
Step 1: Create a cluster
To perform the data processing and analysis in this example, create a cluster to provide the compute resources needed to run commands.
Note
Because this example uses a sample dataset stored in DBFS and recommends persisting tables to Unity Catalog, you create a cluster configured with single user access mode. Single user access mode provides full access to DBFS while also enabling access to Unity Catalog. See Best practices for DBFS and Unity Catalog.
Click Compute in the sidebar.
On the Compute page, click Create Cluster.
On the New Cluster page, enter a unique name for the cluster.
In Access mode, select Single User.
In Single user or service principal access, select your user name.
Leave the remaining values in their default state, and click Create Cluster.
To learn more about Databricks clusters, see Compute.
Step 2: Explore the source data
To learn how to use the Databricks interface to explore the raw source data, see Explore the source data for a data pipeline. If you want to go directly to ingesting and preparing the data, continue to Step 3: Ingest the raw data.
Step 3: Ingest the raw data
In this step, you load the raw data into a table to make it available for further processing. To manage data assets on the Databricks platform such as tables, Databricks recommends Unity Catalog. However, if you don’t have permissions to create the required catalog and schema to publish tables to Unity Catalog, you can still complete the following steps by publishing tables to the Hive metastore.
To ingest data, Databricks recommends using Auto Loader. Auto Loader automatically detects and processes new files as they arrive in cloud object storage.
You can configure Auto Loader to automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time. Databricks recommends schema inference when using Auto Loader. However, as seen in the data exploration step, the songs data does not contain header information. Because the header is not stored with the data, you’ll need to explicitly define the schema, as shown in the next example.
In the sidebar, click New and select Notebook from the menu. The Create Notebook dialog appears.
Enter a name for the notebook, for example,
Ingest songs data
. By default:Python is the selected language.
The notebook is attached to the last cluster you used. In this case, the cluster you created in Step 1: Create a cluster.
Enter the following into the first cell of the notebook:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
If you are using Unity Catalog, replace
<table-name>
with a catalog, schema, and table name to contain the ingested records (for example,data_pipelines.songs_data.raw_song_data
). Otherwise, replace<table-name>
with the name of a table to contain the ingested records, for example,raw_song_data
.Replace
<checkpoint-path>
with a path to a directory in DBFS to maintain checkpoint files, for example,/tmp/pipeline_get_started/_checkpoint/song_data
.Click , and select Run Cell. This example defines the data schema using the information from the
README
, ingests the songs data from all of the files contained infile_path
, and writes the data to the table specified bytable_name
.
Step 4: Prepare the raw data
To prepare the raw data for analysis, the following steps transform the raw songs data by filtering out unneeded columns and adding a new field containing a timestamp for the creation of the new record.
In the sidebar, click New and select Notebook from the menu. The Create Notebook dialog appears.
Enter a name for the notebook. For example,
Prepare songs data
. Change the default language to SQL.Enter the following in the first cell of the notebook:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
If you are using Unity Catalog, replace
<table-name>
with a catalog, schema, and table name to contain the filtered and transformed records (for example,data_pipelines.songs_data.prepared_song_data
). Otherwise, replace<table-name>
with the name of a table to contain the filtered and transformed records (for example,prepared_song_data
).Replace
<raw-songs-table-name>
with the name of the table containing the raw songs records ingested in the previous step.Click , and select Run Cell.
Step 5: Query the transformed data
In this step, you extend the processing pipeline by adding queries to analyze the songs data. These queries use the prepared records created in the previous step.
In the sidebar, click New and select Notebook from the menu. The Create Notebook dialog appears.
Enter a name for the notebook. For example,
Analyze songs data
. Change the default language to SQL.Enter the following in the first cell of the notebook:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Replace
<prepared-songs-table-name>
with the name of the table containing prepared data. For example,data_pipelines.songs_data.prepared_song_data
.Click in the cell actions menu, select Add Cell Below and enter the following in the new cell:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Replace
<prepared-songs-table-name>
with the name of the prepared table created in the previous step. For example,data_pipelines.songs_data.prepared_song_data
.To run the queries and view the output, click Run all.
Step 6: Create a Databricks job to run the pipeline
You can create a workflow to automate running the data ingestion, processing, and analysis steps using a Databricks job.
In your Data Science & Engineering workspace, do one of the following:
Click Workflows in the sidebar and click .
In the sidebar, click New and select Job.
In the task dialog box on the Tasks tab, replace Add a name for your job… with your job name. For example, “Songs workflow”.
In Task name, enter a name for the first task, for example,
Ingest_songs_data
.In Type, select the Notebook task type.
In Source, select Workspace.
Use the file browser to find the data ingestion notebook, click the notebook name, and click Confirm.
In Cluster, select Shared_job_cluster or the cluster you created in the
Create a cluster
step.Click Create.
Click below the task you just created and select Notebook.
In Task name, enter a name for the task, for example,
Prepare_songs_data
.In Type, select the Notebook task type.
In Source, select Workspace.
Use the file browser to find the data preparation notebook, click the notebook name, and click Confirm.
In Cluster, select Shared_job_cluster or the cluster you created in the
Create a cluster
step.Click Create.
Click below the task you just created and select Notebook.
In Task name, enter a name for the task, for example,
Analyze_songs_data
.In Type, select the Notebook task type.
In Source, select Workspace.
Use the file browser to find the data analysis notebook, click the notebook name, and click Confirm.
In Cluster, select Shared_job_cluster or the cluster you created in the
Create a cluster
step.Click Create.
To run the workflow, Click . To view details for the run, click the link in the Start time column for the run in the job runs view. Click each task to view details for the task run.
To view the results when the workflow completes, click the final data analysis task. The Output page appears and displays the query results.
Step 7: Schedule the data pipeline job
Note
To demonstrate using a Databricks job to orchestrate a scheduled workflow, this getting started example separates the ingestion, preparation, and analysis steps into separate notebooks, and each notebook is then used to create a task in the job. If all of the processing is contained in a single notebook, you can easily schedule the notebook directly from the Databricks notebook UI. See Create and manage scheduled notebook jobs.
A common requirement is to run a data pipeline on a scheduled basis. To define a schedule for the job that runs the pipeline:
Click Workflows in the sidebar.
In the Name column, click the job name. The side panel displays the Job details.
Click Add trigger in the Job details panel and select Scheduled in Trigger type.
Specify the period, starting time, and time zone. Optionally select the Show Cron Syntax checkbox to display and edit the schedule in Quartz Cron Syntax.
Click Save.
Learn more
To learn more about Databricks notebooks, see Introduction to Databricks notebooks.
To learn more about Databricks Jobs, see What are Databricks Jobs?.
To learn more about Delta Lake, see What is Delta Lake?.
To learn more about data processing pipelines with Delta Live Tables, see What is Delta Live Tables?.