pandas function APIs
pandas function APIs enable you to directly apply a Python native function that takes and outputs pandas instances to a PySpark DataFrame. Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.
There are three types of pandas function APIs:
Grouped map
Map
Cogrouped map
pandas function APIs leverage the same internal logic that pandas UDF execution uses. They share characteristics such as PyArrow, supported SQL types, and the configurations.
For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.
Grouped map
You transform your grouped data using groupBy().applyInPandas()
to implement the “split-apply-combine” pattern. Split-apply-combine consists of three steps:
Split the data into groups by using
DataFrame.groupBy
.Apply a function on each group. The input and output of the function are both
pandas.DataFrame
. The input data contains all the rows and columns for each group.Combine the results into a new
DataFrame
.
To use groupBy().applyInPandas()
, you must define the following:
A Python function that defines the computation for each group
A
StructType
object or a string that defines the schema of the outputDataFrame
The column labels of the returned pandas.DataFrame
must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame
.
All data for a group is loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.
The following example shows how to use groupby().apply()
to subtract the mean from each value in the group.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
For detailed usage, see pyspark.sql.GroupedData.applyInPandas.
Map
You perform map operations with pandas instances by DataFrame.mapInPandas()
in order to transform an iterator of pandas.DataFrame
to another iterator of pandas.DataFrame
that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.
The underlying function takes and outputs an iterator of pandas.DataFrame
. It can return output of arbitrary length in contrast to some pandas UDFs such as Series to Series.
The following example shows how to use mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
For detailed usage, see pyspark.sql.DataFrame.mapInPandas.
Cogrouped map
For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas()
to cogroup two PySpark DataFrame
s by a common key and then apply a Python function to each cogroup as shown:
Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
Apply a function to each cogroup. The input of the function is two
pandas.DataFrame
(with an optional tuple representing the key). The output of the function is apandas.DataFrame
.Combine the
pandas.DataFrame
s from all groups into a new PySparkDataFrame
.
To use groupBy().cogroup().applyInPandas()
, you must define the following:
A Python function that defines the computation for each cogroup.
A
StructType
object or a string that defines the schema of the output PySparkDataFrame
.
The column labels of the returned pandas.DataFrame
must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame
.
All data for a cogroup is loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.
The following example shows how to use groupby().cogroup().applyInPandas()
to perform an asof join
between two datasets.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
For detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.