PANDAS ユーザー定義関数

パンダ ユーザー定義関数 (UDF) (ベクトル化された UDF とも呼ばれます) は、 Apache Arrow を使用してデータを転送し、パンダを使用してデータを操作するユーザー定義関数です。 パンダの UDF では、一度に行ずつの Python UDF と比較してパフォーマンスを最大 100 倍向上させることができるベクトル化された操作が可能です。

背景情報については、ブログ記事 Apache Spark 3.0 の今後のリリースの新しい Pandas UDF と Python 型ヒントを参照してください。

キーワード pandas_udf をデコレータとして使用してパンダ UDF を定義し、 Python 型ヒントで関数をラップします。 この記事では、さまざまな種類のパンダ UDF について説明し、型ヒントでパンダ UDF を使用する方法を示します。

シリーズ間 UDF

シリーズからシリーズへのパンダ UDF を使用して、スカラー演算をベクトル化します。 selectwithColumnなどの APIs で使用できます。

Python 関数は、パンダ シリーズを入力として受け取り、同じ長さのパンダ シリーズを返す必要があり、Python 型ヒントでこれらを指定する必要があります。 Spark は、列をバッチに分割し、各バッチの関数をデータのサブセットとして呼び出し、結果を連結することで pandas UDF を実行します。

次の例は、2 列の製品をコンピュートするパンダ UDF を作成する方法を示しています。

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

シリーズのイテレータからシリーズUDF のイテレータへ

反復子 UDF は、次の点を除いてスカラー パンダ UDF と同じです。

  • Python 関数

    • 単一の入力バッチの代わりにバッチの反復子を入力として受け取ります。

    • 単一の出力バッチではなく、出力バッチの反復子を返します。

  • 反復子の出力全体の長さは、入力全体の長さと同じである必要があります。

  • ラップされたパンダ UDF は、1 つの Spark 列を入力として受け取ります。

Python タイプ ヒントは Iterator[pandas.Series] -> Iterator[pandas.Series]として指定する必要があります。

このパンダ UDF は、UDF の実行で何らかの状態を初期化する必要がある場合 (たとえば、機械学習モデル ファイルを読み込んですべての入力バッチに推論を適用するなど) に役立ちます。

次の例は、反復子をサポートする pandas UDF を作成する方法を示しています。

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

複数のシリーズのイテレータからシリーズUDF のイテレータへ

複数のシリーズからシリーズUDFのイテレータへのイテレータには、 シリーズUDFのシリーズのイテレータからイテレータと同様の特性と制限があります。 指定された関数は、バッチの反復子を受け取り、バッチの反復子を出力します。 また、UDF の実行で何らかの状態を初期化する必要がある場合にも役立ちます。

違いは次のとおりです。

  • 基礎となるPython関数は、パンダシリーズの タプル のイテレータを取ります。

  • ラップされたパンダ UDF は、 複数の Spark 列を入力として受け取ります。

型ヒントは -> Iterator[pandas.Series] Iterator[Tuple[pandas.Series, ...]] として指定します。

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

直列からスカラー UDF

系列からスカラー パンダへの UDF は、Spark 集計関数に似ています。 シリーズからスカラー パンダへの UDF は、1 つ以上のパンダ シリーズからスカラー値への集計を定義し、各パンダ シリーズは Spark 列を表します。 selectシリーズを使用withColumnして、 groupBy.agg、 、 PySpark .sqlなどの APIs でUDF をスカラーします 。ウィンドウ 。

型ヒントは pandas.Series, ... -> Anyとして表します。 戻り値の型はプリミティブデータ型である必要があり、返されるスカラーは Python プリミティブ型 ( intfloat など) または NumPy データ型 ( numpy.int64numpy.float64など) のいずれかです。 Any 理想的には特定のスカラー型である必要があります。

この種類の UDF は部分集計をサポート せず 、各グループのすべてのデータがメモリに読み込まれます。

次の例は、このタイプの UDF を使用して、 selectgroupBy、および window 操作で平均をコンピュートする方法を示しています。

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

使用方法の詳細については、 PySpark.sql.functions.pandas_udf を参照してください。

使い

矢印バッチサイズ の設定

注:

この構成は、共有アクセス モードおよびDatabricks Runtime 13.3 LTSから 14.2 で構成されたコンピュートには影響しません。

Spark のデータ パーティションは Arrow レコード バッチに変換されるため、JVM のメモリ使用量が一時的に高くなる可能性があります。 メモリ不足の例外を回避するには、 spark.sql.execution.arrow.maxRecordsPerBatch 構成を各バッチの最大行数を決定する整数に設定することで、Arrow レコード バッチのサイズを調整できます。 既定値は、バッチあたり 10,000 レコードです。 列数が多い場合は、それに応じて値を調整する必要があります。 この制限を使用して、各データ・パーティションは、処理のために 1 つ以上のレコード・バッチに分割されます。

タイムゾーンのセマンティクス を含むタイムスタンプ

Spark はタイムスタンプを UTC 値として内部的に保存し、指定されたタイム ゾーンなしで取り込まれたタイムスタンプ データは、ローカル時刻としてマイクロ秒単位で UTC に変換されます。

タイムスタンプ データが Spark でエクスポートまたは表示される場合、セッション タイム ゾーンを使用してタイムスタンプ値がローカライズされます。 セッション・タイム・ゾーンは spark.sql.session.timeZone 構成で設定され、デフォルトは JVM システムのローカル・タイム・ゾーンになります。 Pandasは、ナノ秒の解像度datetime64[ns] datetime64 タイプを使用し、列ごとにオプションのタイムゾーンを使用します。

タイムスタンプ データが Spark から pandas に転送されると、ナノ秒に変換され、各列は Spark セッションのタイム ゾーンに変換されてから、そのタイム ゾーンにローカライズされ、タイム ゾーンが削除され、値が現地時間として表示されます。 これは、タイムスタンプ列を使用して toPandas() または pandas_udf を呼び出すときに発生します。

タイムスタンプ データがパンダから Spark に転送されると、UTC マイクロ秒に変換されます。 これは、パンダ DataFrame を使用して createDataFrame を呼び出すとき、またはパンダ UDF からタイムスタンプを返すときに発生します。 これらの変換は、Spark が予期される形式のデータを保持するために自動的に行われるため、これらの変換を自分で行う必要はありません。 ナノ秒の値は切り捨てられます。

標準の UDF は、タイムスタンプデータを Python 日時オブジェクトとして読み込みますが、これはパンダのタイムスタンプとは異なります。 最適なパフォーマンスを得るには、pandas UDF でタイムスタンプを操作するときに pandas 時系列機能を使用することをお勧めします。 詳細については、「 時系列/日付機能」を参照してください。

ノートブック の例

次のノートブックは、pandas UDF で実現できるパフォーマンスの向上を示しています。

パンダUDFのベンチマークノートブック

ノートブックを新しいタブで開く