Tutorial: Work with Apache Spark Scala DataFrames
This article shows you how to load and transform data using the Apache Spark Scala DataFrame API in Databricks.
See also Apache Spark Scala API reference.
What is a DataFrame?
A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. Apache Spark DataFrames provide a rich set of functions (select columns, filter, join, aggregate) that allow you to solve common data analysis problems efficiently.
Apache Spark DataFrames are an abstraction built on top of Resilient Distributed Datasets (RDDs). Spark DataFrames and Spark SQL use a unified planning and optimization engine, allowing you to get nearly identical performance across all supported languages on Databricks (Python, SQL, Scala, and R).
What is a Spark Dataset?
The Apache Spark Dataset API provides a type-safe, object-oriented programming interface. DataFrame
is an alias for an untyped Dataset [Row]
.
The Databricks documentation uses the term DataFrame for most technical references and guide, because this language is inclusive for Python, Scala, and R. See Scala Dataset aggregator example notebook.
Create a DataFrame with Scala
Most Apache Spark queries return a DataFrame. This includes reading from a table, loading data from files, and operations that transform data.
You can also create a DataFrame from a list of classes, such as in the following example:
case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, "Teo"), new Employee(3, "Fang")).toDF
Read a table into a DataFrame
Databricks uses Delta Lake for all tables by default. You can easily load tables to DataFrames, such as in the following example:
spark.read.table("<schema_name>.<table_name>")
Load data into a DataFrame from files
You can load data from many supported file formats. The following example uses a dataset available in the /databricks-datasets
directory, accessible from most workspaces. See Sample datasets.
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
Assign transformation steps to a DataFrame
The results of most Spark transformations return a DataFrame. You can assign these results back to a DataFrame variable, similar to how you might use CTEs, temp views, or DataFrames in other systems.
Combine DataFrames with join and union
DataFrames use standard SQL semantics for join operations. A join returns the combined results of two DataFrames based on the provided matching conditions and join type. The following example is an inner join, which is the default:
val joined_df = df1.join(df2, joinType="inner", usingColumn="id")
You can add the rows of one DataFrame to another using the union operation, as in the following example:
val unioned_df = df1.union(df2)
Filter rows in a DataFrame
You can filter rows in a DataFrame using .filter()
or .where()
. There is no difference in performance or syntax, as seen in the following example:
val filtered_df = df.filter("id > 1")
val filtered_df = df.where("id > 1")
Use filtering to select a subset of rows to return or modify in a DataFrame.
Select columns from a DataFrame
You can select columns by passing one or more column names to .select()
, as in the following example:
val select_df = df.select("id", "name")
You can combine select and filter queries to limit rows and columns returned.
subset_df = df.filter("id > 1").select("name")
View the DataFrame
To view this data in a tabular format, you can use the Databricks display()
command, as in the following example:
display(df)
Print the data schema
Spark uses the term schema to refer to the names and data types of the columns in the DataFrame.
Note
Databricks also uses the term schema to describe a collection of tables registered to a catalog.
You can print the schema using the .printSchema()
method, as in the following example:
df.printSchema()
Save a DataFrame to a table
Databricks uses Delta Lake for all tables by default. You can save the contents of a DataFrame to a table using the following syntax:
df.write.toTable("<table_name>")
Write a DataFrame to a collection of files
Most Spark applications are designed to work on large datasets and work in a distributed fashion, and Spark writes out a directory of files rather than a single file. Many data systems are configured to read these directories of files. Databricks recommends using tables over filepaths for most applications.
The following example saves a directory of JSON files:
df.write.format("json").save("/tmp/json_data")
Run SQL queries in Spark
Spark DataFrames provide a number of options to combine SQL with Scala.
The selectExpr()
method allows you to specify each column as a SQL query, such as in the following example:
display(df.selectExpr("id", "upper(name) as big_name"))
You can import the expr()
function from pyspark.sql.functions
to use SQL syntax anywhere a column would be specified, as in the following example:
import org.apache.spark.sql.functions.expr
display(df.select('id, expr("lower(name) as little_name")))
You can also use spark.sql()
to run arbitrary SQL queries in the Scala kernel, as in the following example:
val query_df = spark.sql("SELECT * FROM <table_name>")
Because logic is executed in the Scala kernel and all SQL queries are passed as strings, you can use Scala formatting to parameterize SQL queries, as in the following example:
val table_name = "my_table"
val query_df = spark.sql(s"SELECT * FROM $table_name")