Tutorial: Run an end-to-end lakehouse analytics pipeline
This tutorial shows you how to set up an end-to-end analytics pipeline for a Databricks lakehouse.
Important
This tutorial uses interactive notebooks to complete common ETL tasks in Python on Unity Catalog enabled clusters. If you are not using Unity Catalog, see Run your first ETL workload on Databricks.
Tasks in this tutorial
By the end of this article, you will feel comfortable:
Writing and reading data from a Unity Catalog external location.
Configuring incremental data ingestion to a Unity Catalog table with Auto Loader.
Executing notebook cells to process, query, and preview data.
Databricks provides a suite of production-ready tools that allow data professionals to quickly develop and deploy extract, transform, and load (ETL) pipelines. Unity Catalog allows data stewards to configure and secure storage credentials, external locations, and database objects for users throughout an organization. Databricks SQL allows analysts to run SQL queries against the same tables used in production ETL workloads, allowing for real time business intelligence at scale.
You can also use Delta Live Tables to build ETL pipelines. Databricks created Delta Live Tables to reduce the complexity of building, deploying, and maintaining production ETL pipelines. See Tutorial: Run your first Delta Live Tables pipeline.
Requirements
Note
If you do not have cluster control privileges, you can still complete most of the steps below as long as you have access to a cluster.
Step 1: Create a cluster
To do exploratory data analysis and data engineering, create a cluster to provide the compute resources needed to execute commands.
Click Compute in the sidebar.
Click New in the sidebar, then select Cluster. This opens the New Cluster/Compute page.
Specify a unique name for the cluster.
Select the Single node radio button.
Select Single User from the Access mode dropdown.
Make sure your email address is visible in the Single User field.
Select the desired Databricks runtime version, 11.1 or above to use Unity Catalog.
Click Create compute to create the cluster.
To learn more about Databricks clusters, see Compute.
Step 2: Create a Databricks notebook
To create a notebook in your workspace, click New in the sidebar, and then click Notebook. A blank notebook opens in the workspace.
To learn more about creating and managing notebooks, see Manage notebooks.
Step 3: Write and read data from an external location managed by Unity Catalog
Databricks recommends using Auto Loader for incremental data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage.
Use Unity Catalog to manage secure access to external locations. Users or service principals with READ FILES
permissions on an external location can use Auto Loader to ingest data.
Normally, data will arrive in an external location due to writes from other systems. In this demo, you can simulate data arrival by writing out JSON files to an external location.
Copy the code below into a notebook cell. Replace the string value for catalog
with the name of a catalog with CREATE CATALOG
and USE CATALOG
permissions. Replace the string value for external_location
with the path for an external location with READ FILES
, WRITE FILES
, and CREATE EXTERNAL TABLE
permissions.
External locations can be defined as an entire storage container, but often point to a directory nested in a container.
The correct format for an external location path is "gs://bucket-name/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
Executing this cell should print a line that reads 12 bytes, print the string “Hello world!”, and display all the databases present in the catalog provided. If you are unable to get this cell to run, confirm that you are in a Unity Catalog enabled workspace and request proper permissions from your workspace administrator to complete this tutorial.
The Python code below uses your email address to create a unique database in the catalog provided and a unique storage location in external location provided. Executing this cell will remove all data associated with this tutorial, allowing you to execute this example idempotently. A class is defined and instantiated that you will use to simulate batches of data arriving from a connected system to your source external location.
Copy this code to a new cell in your notebook and execute it to configure your environment.
Note
The variables defined in this code should allow you to safely execute it without risk of conflicting with existing workspace assets or other users. Restricted network or storage permissions will raise errors when executing this code; contact your workspace administrator to troubleshoot these restrictions.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
You can now land a batch of data by copying the following code into a cell and executing it. You can manually execute this cell up to 60 times to trigger new data arrival.
RawData.land_batch()
Step 4: Configure Auto Loader to ingest data to Unity Catalog
Databricks recommends storing data with Delta Lake. Delta Lake is an open source storage layer that provides ACID transactions and enables the data lakehouse. Delta Lake is the default format for tables created in Databricks.
To configure Auto Loader to ingest data to a Unity Catalog table, copy and paste the following code into an empty cell in your notebook:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
To learn more about Auto Loader, see What is Auto Loader?.
To learn more about Structured Streaming with Unity Catalog, see Using Unity Catalog with Structured Streaming.
Step 5: Process and interact with data
Notebooks execute logic cell-by-cell. Use these steps to execute the logic in your cell:
To run the cell you completed in the previous step, select the cell and press SHIFT+ENTER.
To query the table you’ve just created, copy and paste the following code into an empty cell, then press SHIFT+ENTER to run the cell.
df = spark.read.table(table)
To preview the data in your DataFrame, copy and paste the following code into an empty cell, then press SHIFT+ENTER to run the cell.
display(df)
To learn more about interactive options for visualizing data, see Visualizations in Databricks notebooks.
Step 6: Schedule a job
You can run Databricks notebooks as production scripts by adding them as a task in a Databricks job. In this step, you will create a new job that you can trigger manually.
To schedule your notebook as a task:
Click Schedule on the right side of the header bar.
Enter a unique name for the Job name.
Click Manual.
In the Cluster drop-down, select the cluster you created in step 1.
Click Create.
In the window that appears, click Run now.
To see the job run results, click the icon next to the Last run timestamp.
For more information on jobs, see What are Databricks Jobs?.
Step 7: Query table from Databricks SQL
Anyone with the USE CATALOG
permission on the current catalog, the USE SCHEMA
permission on the current schema, and SELECT
permissions on the table can query the contents of the table from their preferred Databricks API.
You need access to a running SQL warehouse to execute queries in Databricks SQL.
The table you created earlier in this tutorial has the name target_table
. You can query it using the catalog you provided in the first cell and the database with the patern e2e_lakehouse_<your-username>
. You can use Catalog Explorer to find the data objects that you created.