ユーザー定義のスカラー関数 - Python
この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法を示し、Spark SQL での部分式の評価順序に関する注意事項を示します。
Databricks Runtime 14.0 以降では、 Pythonユーザー定義テーブル関数 (UDTF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。 Python ユーザー定義テーブル関数 (UDTF)を参照してください。
注
Databricks Runtime 14.0 以前では、Python UDF と Pandas UDF は、共有アクセス モードを使用する Unity Catalog クラスターではサポートされていません。 スカラー Python UDF と Pandas UDF は、Databricks Runtime 14.1 以降のすべてのアクセス モードでサポートされています。
Databricks Runtime 14.1 以降では、SQL 構文を使用してスカラー Python UDF を Unity Catalog に登録できます。 「 Unity Catalog のユーザー定義関数 (UDF)」を参照してください。
関数を UDF として登録する
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
必要に応じて、UDF の戻り値の型を設定できます。 デフォルトの戻り値の型は StringType
です。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL で UDF を呼び出す
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
DataFrames で UDF を使用する
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
または、注釈構文を使用して同じ UDF を宣言することもできます。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
評価順序と null チェック
Spark SQL (SQL および DataFrame および DataFrame および DataDataset API を含む) は、部分式の評価順序を保証するものではありません。 特に、演算子または関数の入力は、必ずしも左から右、またはその他の固定された順序で評価されるとは限りません。 たとえば、論理 AND
式と OR
式には、左から右への "短絡" セマンティクスはありません。
したがって、 Boolean 式の副作用や評価順序、 WHERE
句や HAVING
句の順序は、クエリーの最適化や計画時に並べ替えることができるため、危険です。 具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、UDF を呼び出す前に null チェックが行われる保証はありません。 例えば
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
この WHERE
句は、null を除外した後に呼び出される strlen
UDF を保証するものではありません。
適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。
UDF 自体をヌル対応にし、UDF 自体の内部でヌル チェックを行う
IF
式またはCASE WHEN
式を使用して null チェックを実行し、条件分岐で UDF を呼び出す
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
制限事項
PySpark UDF には、次の制限が適用されます。
モジュールのインポート制限:PySpark 共有クラスタリングおよびサーバレス コンピュートの UDFGit は、Unity Catalog Databricks Runtime14.2 以前でモジュールをインポートするために、 フォルダ、ワークスペース ファイル、または ボリュームにアクセスできません。
ブロードキャスト変数: 共有クラスタリングおよびサーバレス コンピュート上の PySpark UDF は、ブロードキャスト変数をサポートしていません。
メモリ制限: サーバレス コンピュート上の PySpark UDF には、 PySpark UDFあたり 1GB のメモリ制限があります。 この制限を超えると、次のエラーが発生します。
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.