Pythonユーザー定義テーブル関数とは何ですか?

プレビュー

この機能はパブリックプレビュー段階にあります。

ユーザー定義表関数 (UDTF) を使用すると、スカラー値の代わりに表を戻す関数を登録できます。 UDTF は、SQL クエリーで参照される場合、共通テーブル式 (CTE) と同様に機能します。 SQL ステートメントの FROM 句で UDTF を参照し、追加の Spark SQL 演算子を結果にチェーンできます。

UDTF はローカルの SparkSession に登録され、ノートブック レベルまたはジョブ レベルで分離されます。

UDTF は、割り当てられた共有アクセス モードまたは分離なしの共有アクセス モードで構成されたコンピュートでサポートされます。 共有アクセス モードで UDTF を使用することはできません。

UDTF を Unity Catalogのオブジェクトとして登録したり、UDTF を SQLウェアハウスで使用することはできません。

UDTFの基本的な構文は何ですか?

Apache Spark は、Python UDTF を必須の eval メソッドを持つ Python クラスとして実装します。

結果を行として出力するには、 yieldを使用します。

Apache Spark でクラスを UDTF として使用するには、PySpark udtf 関数をインポートする必要があります。

Databricks では、この関数をデコレーターとして使用し、常に returnType オプションを使用してフィールド名と型を明示的に指定することをお勧めします。

次の例では、UDTF を使用してスカラー入力から単純なテーブルを作成します。

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Python *args 構文を使用して、不特定多数の入力値を処理するロジックを実装できます。 次の例では、引数の入力の長さと型を明示的にチェックしながら、同じ結果を返します。

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

登録する a UDTF

次の構文を使用して、SQL クエリーで使用するために現在の SparkSession に UDTF を登録することができます。

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

次の例では、Python UDTF を SQL に登録します。

spark.udtf.register("simple_udtf", SimpleUDTF)

登録すると、次の例のように、 %sql マジック コマンドまたは spark.sql() 関数を使用して、SQL で UDTF を使用できます。

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

結果を出す

Python UDTF は、結果を返すために yield を使用して実装されます。 結果は常に、指定されたスキーマを持つ 0 行以上の行を含むテーブルとして返されます。

スカラー引数を渡す場合、 eval メソッドのロジックは、渡されたスカラー引数のセットで一度だけ実行されます。 テーブル引数の場合、 eval メソッドは入力テーブルの行ごとに 1 回実行されます。

ロジックは、入力ごとに 0、1、または多数の行を返すように記述できます。

次の UDTF は、コンマ区切りリストの項目を個別のエントリに区切ることによって、入力ごとに 0 行以上を返す方法を示しています。

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

テーブル引数を UDTF に渡す

SQL キーワード TABLE() を使用して、テーブル引数を UDTF に渡すことができます。 次の例のように、テーブル名またはクエリーを使用できます。

TABLE(table_name);
TABLE(SELECT * FROM table_name);

テーブル引数は、一度に 1 行ずつ処理されます。 標準の PySpark 列フィールド注釈を使用して、各行の列を操作できます。 次の例は、PySpark Row 型を明示的にインポートし、 id フィールドで渡されたテーブルをフィルター処理する方法を示しています。

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

スカラー引数を UDTF に渡す

次の値の任意の組み合わせを使用して、スカラー引数を UDTF に渡すことができます。

  • スカラー定数

  • スカラー関数

  • リレーションのフィールド

リレーションでフィールドを渡すには、UDTF を登録し、SQL LATERAL キーワードを使用する必要があります。

インライン テーブルの別名を使用して、列のあいまいさを解消できます。

次の例は、 LATERAL を使用してテーブルから UDTF にフィールドを渡す方法を示しています。

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

UDTF の デフォルト値を設定する

オプションで、 __init__ メソッドを実装して、Python ロジックで参照できるクラス変数のデフォルト値を設定できます。

__init__ メソッドは引数を受け付けず、SparkSession の変数または状態情報にアクセスできません。

UDTF で Apache Arrow を使用する

Databricks では、少量のデータを入力として受け取り、大きなテーブルを出力する UDTF には Apache Arrow を使用することをお勧めします。

次の例のように、UDTF を宣言するときに useArrow パラメーターを指定することで、Arrow を有効にすることができます。

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1