pandas ユーザー定義関数

pandas ユーザー定義関数 (UDF) (ベクトル化された UDF とも呼ばれます) は、 Apache Arrow を使用してデータを転送し、pandasを使用してデータを操作するユーザー定義関数です。 pandasの UDF ではベクトル化された操作が可能で、1度に1行ずつの Python UDF と比較してパフォーマンスを最大 100 倍向上させることができます。

背景情報については、ブログ記事 Apache Spark 3.0 の今後のリリースの新しい Pandas UDF と Python 型ヒントを参照してください。

キーワード pandas_udf をデコレータとして使用して pandas UDF を定義し、 Python 型ヒントで関数をラップします。 この記事では、さまざまな種類の pandas UDF について説明し、型ヒントと共に pandas UDF を使用する方法を示します。

シリーズからシリーズへの UDF

シリーズからシリーズへのpandas UDF を使用して、スカラー演算をベクトル化します。 selectwithColumnなどの APIs で使用できます。

Python 関数は、pandas シリーズを入力として受け取り、同じ長さのpandas シリーズを返す必要があり、Python 型ヒントでこれらを指定する必要があります。 Spark は、列をバッチに分割し、各バッチの関数をデータのサブセットとして呼び出し、結果を連結することで pandas UDF を実行します。

次の例は、2つの列の積を計算するpandas UDFの作成方法を示しています。

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

シリーズのイテレータからシリーズのイテレータへのUDF

イテレータ UDF は、次の点を除いてスカラー pandas UDF と同じです。

  • Python 関数

    • 単一の入力バッチの代わりにバッチのイテレータを入力として受け取ります。

    • 単一の出力バッチではなく、出力バッチのイテレータを返します。

  • イテレータの出力全体の長さは、入力全体の長さと同じである必要があります。

  • ラップされたpandas UDF は、1 つの Spark 列を入力として受け取ります。

Python タイプ ヒントは Iterator[pandas.Series] -> Iterator[pandas.Series]として指定する必要があります。

このpandas UDF は、UDF の実行で何らかの状態を初期化する必要がある場合 (たとえば、機械学習モデル ファイルを読み込んですべての入力バッチに推論を適用するなど) に役立ちます。

次の例は、イテレータをサポートする pandas UDF を作成する方法を示しています。

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

複数のシリーズのイテレータからシリーズのイテレータへの UDF

複数シリーズのイテレータからシリーズUDFのイテレータは、 シリーズのイテレータからシリーズUDFのイテレータと同様の特性と制限があります。 指定された関数は、バッチのイテレータを受け取り、バッチのイテレータを出力します。また、UDF の実行で何らかの状態を初期化する必要がある場合にも役立ちます。

違いは次のとおりです。

  • 基礎となるPython関数は、pandasシリーズの タプル のイテレータを取ります。

  • ラップされたpandas UDF は、 複数の Spark 列を入力として受け取ります。

型ヒントは -> Iterator[pandas.Series] Iterator[Tuple[pandas.Series, ...]] として指定します。

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

シリーズからスカラーへの UDF

系列からスカラー pandasへの UDF は、Spark 集計関数に似ています。 シリーズからスカラー pandasへの UDF は、1 つ以上のpandas シリーズからスカラー値への集計を定義し、各pandas シリーズは Spark 列を表します。 selectシリーズを使用withColumnして、 groupBy.agg、 、 PySpark .sqlなどの APIs でUDF をスカラーします 。ウィンドウ 。

型ヒントは pandas.Series, ... -> Anyとして表します。 戻り値の型はプリミティブデータ型である必要があり、返されるスカラーは Python プリミティブ型 ( intfloat など) または NumPy データ型 ( numpy.int64numpy.float64など) のいずれかです。 Any 理想的には特定のスカラー型である必要があります。

この種類の UDF は部分集計をサポート せず 、各グループのすべてのデータがメモリに読み込まれます。

次の例は、このタイプの UDF を使用して、 selectgroupBy、および window 操作で平均をコンピュートする方法を示しています。

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

使用方法の詳細については、 PySpark.sql.functions.pandas_udf を参照してください。

使い方

Arrow バッチサイズ の設定

注:

この構成は、共有アクセス モードおよびDatabricks Runtime 13.3 LTSから 14.2 で構成されたコンピュートには影響しません。

Spark のデータ パーティションは Arrow レコード バッチに変換されるため、JVM のメモリ使用量が一時的に高くなる可能性があります。 メモリ不足の例外を回避するには、 spark.sql.execution.arrow.maxRecordsPerBatch 構成を各バッチの最大行数を決定する整数に設定することで、Arrow レコード バッチのサイズを調整できます。 既定値は、バッチあたり 10,000 レコードです。 列数が多い場合は、それに応じて値を調整する必要があります。 この制限を使用して、各データ・パーティションは、処理のために 1 つ以上のレコード・バッチに分割されます。

タイムゾーンのセマンティクスを含むタイムスタンプ

Spark はタイムスタンプを UTC 値として内部的に保存し、指定されたタイム ゾーンなしで取り込まれたタイムスタンプ データは、ローカル時刻としてマイクロ秒単位で UTC に変換されます。

タイムスタンプ データが Spark でエクスポートまたは表示される場合、セッション タイム ゾーンを使用してタイムスタンプ値がローカライズされます。 セッション・タイム・ゾーンは spark.sql.session.timeZone 構成で設定され、デフォルトは JVM システムのローカル・タイム・ゾーンになります。 Pandasは、ナノ秒の解像度datetime64[ns] datetime64 タイプを使用し、列ごとにオプションのタイムゾーンを使用します。

タイムスタンプ データが Spark から pandas に転送されると、ナノ秒に変換され、各列は Spark セッションのタイム ゾーンに変換されてから、そのタイム ゾーンにローカライズされ、タイム ゾーンが削除され、値が現地時間として表示されます。 これは、タイムスタンプ列を使用して toPandas() または pandas_udf を呼び出すときに発生します。

タイムスタンプ データがpandasから Spark に転送されると、UTC マイクロ秒に変換されます。 これは、pandas DataFrame を使用して createDataFrame を呼び出すとき、またはpandas UDF からタイムスタンプを返すときに発生します。 これらの変換は、Spark が予期される形式のデータを保持するために自動的に行われるため、これらの変換を自分で行う必要はありません。 ナノ秒の値は切り捨てられます。

標準の UDF は、タイムスタンプデータを Python 日時オブジェクトとして読み込みますが、これはpandasのタイムスタンプとは異なります。 最適なパフォーマンスを得るには、pandas UDF でタイムスタンプを操作するときに pandas 時系列機能を使用することをお勧めします。 詳細については、「 時系列/日付機能」を参照してください。

ノートブックの例

次のノートブックは、pandas UDF で実現できるパフォーマンスの向上を示しています。

pandas UDF のベンチマークノートブック

ノートブックを新しいタブで開く