Run your first ETL workload on Databricks

Databricks provides a suite of production-ready tools that allow data professionals to quickly develop and deploy extract, transform, and load (ETL) pipelines.

By the end of this article, you will feel comfortable:

  1. Launching a Databricks all-purpose compute cluster.

  2. Creating a Databricks notebook.

  3. Configuring incremental data ingestion to Delta Lake with Auto Loader.

  4. Executing notebook cells to process, query, and preview data.

  5. Scheduling a notebook as a Databricks job.

This tutorial uses interactive notebooks to complete common ETL tasks in Python or Scala.

You can also use Delta Live Tables to build ETL pipelines. Databricks created Delta Live Tables to reduce the complexity of building, deploying, and maintaining production ETL pipelines. See Delta Live Tables quickstart.

Requirements

Note

If you do not have cluster control privileges, you can still complete most of the steps below as long as you have access to a cluster.

If you only have access to the Databricks SQL workspace, see: Get started with Databricks as a data analyst.

Step 1: Create a cluster

To do exploratory data analysis and data engineering, create a cluster to provide the compute resources needed to execute commands.

  1. Click compute icon Compute in the sidebar.

  2. On the Compute page, click Create Cluster. This opens the New Cluster page.

  3. Specify a unique name for the cluster, leave the remaining values in their default state, and click Create Cluster.

To learn more about Databricks clusters, see Clusters.

Step 2: Create a Databricks notebook

To get started writing and executing interactive code on Databricks, create a notebook.

  1. Click Create Icon Create in the sidebar, then click Notebook.

  2. On the Create Notebook page:

    • Specify a unique name for your notebook.

    • Make sure the default language is set to Python or Scala.

    • Select the cluster you created in step 1 from the Cluster dropdown.

    • Click Create.

A notebook opens with an empty cell at the top.

To learn more about creating and managing notebooks, see Manage notebooks.

Step 3: Configure Auto Loader to ingest data to Delta Lake

Databricks recommends using Auto Loader for incremental data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage.

Databricks recommends storing data with Delta Lake. Delta Lake is an open source storage layer that provides ACID transactions and enables the data lakehouse. Delta Lake is the default format for tables created in Databricks.

To configure Auto Loader to ingest data to a Delta Lake table, copy and paste the following code into the empty cell in your notebook:

# Import functions
from pyspark.sql.functions import input_file_name, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", input_file_name().alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))
// Imports
import org.apache.spark.sql.functions.{input_file_name, current_timestamp}
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", input_file_name.as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Note

The variables defined in this code should allow you to safely execute it without risk of conflicting with existing workspace assets or other users. Restricted network or storage permissions will raise errors when executing this code; contact your workspace administrator to troubleshoot these restrictions.

To learn more about Auto Loader, see Auto Loader.

Step 4: Process and interact with data

Notebooks execute logic cell-by-cell. To execute the logic in your cell:

  1. To run the cell you completed in the previous step, select the cell and press SHIFT+ENTER.

  2. To query the table you’ve just created, copy and paste the following code into an empty cell, then press SHIFT+ENTER to run the cell.

    df = spark.read.table(table_name)
    
    val df = spark.read.table(table_name)
    
  3. To preview the data in your DataFrame, copy and paste the following code into an empty cell, then press SHIFT+ENTER to run the cell.

    display(df)
    
    display(df)
    

To learn more about interactive options for visualizing data, see Visualizations.

Step 5: Schedule a job

You can run Databricks notebooks as production scripts by adding them as a task in a Databricks job. In this step, you will create a new job that you can trigger manually.

To schedule your notebook as a task:

  1. Click Schedule on the right side of the header bar.

  2. Enter a unique name for the Job name.

  3. Click Manual.

  4. In the Cluster drop-down, select the cluster you created in step 1.

  5. Click Create.

  6. In the window that appears, click Run now.

  7. To see the job run results, click the External Link icon next to the Last run timestamp.

For more information on jobs, see Jobs.