Pythonユーザー定義テーブル関数とは何ですか?
プレビュー
この機能はパブリックプレビュー段階にあります。
ユーザー定義表関数 (UDTF) を使用すると、スカラー値の代わりに表を戻す関数を登録できます。 UDTF は、SQL クエリーで参照される場合、共通テーブル式 (CTE) と同様に機能します。 SQL ステートメントの FROM
句で UDTF を参照し、追加の Spark SQL 演算子を結果にチェーンできます。
UDTF はローカルの SparkSession に登録され、ノートブック レベルまたはジョブ レベルで分離されます。
UDTF は、割り当てられた共有アクセス モードまたは分離なしの共有アクセス モードで構成されたコンピュートでサポートされます。 共有アクセス モードで UDTF を使用することはできません。
UDTF を Unity Catalogのオブジェクトとして登録したり、UDTF を SQLウェアハウスで使用することはできません。
UDTFの基本的な構文は何ですか?
Apache Spark は、Python UDTF を必須の eval
メソッドを持つ Python クラスとして実装します。
結果を行として出力するには、 yield
を使用します。
Apache Spark でクラスを UDTF として使用するには、PySpark udtf
関数をインポートする必要があります。
Databricks では、この関数をデコレーターとして使用し、常に returnType
オプションを使用してフィールド名と型を明示的に指定することをお勧めします。
次の例では、UDTF を使用してスカラー入力から単純なテーブルを作成します。
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, x: int, y: int):
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
Python *args
構文を使用して、不特定多数の入力値を処理するロジックを実装できます。 次の例では、引数の入力の長さと型を明示的にチェックしながら、同じ結果を返します。
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
登録する a UDTF
次の構文を使用して、SQL クエリーで使用するために現在の SparkSession に UDTF を登録することができます。
spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)
次の例では、Python UDTF を SQL に登録します。
spark.udtf.register("simple_udtf", SimpleUDTF)
登録すると、次の例のように、 %sql
マジック コマンドまたは spark.sql()
関数を使用して、SQL で UDTF を使用できます。
%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")
結果を出す
Python UDTF は、結果を返すために yield
を使用して実装されます。 結果は常に、指定されたスキーマを持つ 0 行以上の行を含むテーブルとして返されます。
スカラー引数を渡す場合、 eval
メソッドのロジックは、渡されたスカラー引数のセットで一度だけ実行されます。 テーブル引数の場合、 eval
メソッドは入力テーブルの行ごとに 1 回実行されます。
ロジックは、入力ごとに 0、1、または多数の行を返すように記述できます。
次の UDTF は、コンマ区切りリストの項目を個別のエントリに区切ることによって、入力ごとに 0 行以上を返す方法を示しています。
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
テーブル引数を UDTF に渡す
SQL キーワード TABLE()
を使用して、テーブル引数を UDTF に渡すことができます。 次の例のように、テーブル名またはクエリーを使用できます。
TABLE(table_name);
TABLE(SELECT * FROM table_name);
テーブル引数は、一度に 1 行ずつ処理されます。 標準の PySpark 列フィールド注釈を使用して、各行の列を操作できます。 次の例は、PySpark Row
型を明示的にインポートし、 id
フィールドで渡されたテーブルをフィルター処理する方法を示しています。
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# | 6|
# | 7|
# | 8|
# | 9|
# +---+
スカラー引数を UDTF に渡す
次の値の任意の組み合わせを使用して、スカラー引数を UDTF に渡すことができます。
スカラー定数
スカラー関数
リレーションのフィールド
リレーションでフィールドを渡すには、UDTF を登録し、SQL LATERAL
キーワードを使用する必要があります。
注
インライン テーブルの別名を使用して、列のあいまいさを解消できます。
次の例は、 LATERAL
を使用してテーブルから UDTF にフィールドを渡す方法を示しています。
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
spark.udtf.register("itemize", Itemize)
spark.sql("""
SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
(2, 'spoons,'),
(3, ''),
(4, 'knives,cups') t(id, item_list),
LATERAL itemize(id, item_list) b
""").show()
UDTF の デフォルト値を設定する
オプションで、 __init__
メソッドを実装して、Python ロジックで参照できるクラス変数のデフォルト値を設定できます。
__init__
メソッドは引数を受け付けず、SparkSession の変数または状態情報にアクセスできません。
UDTF で Apache Arrow を使用する
Databricks では、少量のデータを入力として受け取り、大きなテーブルを出力する UDTF には Apache Arrow を使用することをお勧めします。
次の例のように、UDTF を宣言するときに useArrow
パラメーターを指定することで、Arrow を有効にすることができます。
from pyspark.sql.functions import udtf
@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
def eval(self, x: int):
yield x, x + 1