O que são funções de tabela definidas pelo usuário em Python?

Visualização

Esse recurso está na Prévia Pública.

Uma função de tabela definida pelo usuário (UDTF) permite registrar funções que retornam tabelas em vez de valores escalares. Os UDTFs funcionam de forma semelhante às expressões de tabela comuns (CTEs) quando referenciados em query SQL. Você faz referência a UDTFs na cláusula FROM de uma instrução SQL e pode encadear operadores Spark SQL adicionais aos resultados.

Os UDTFs são registrados no SparkSession local e são isolados no nível do Notebook ou Job .

Os UDTFs são suportados em compute configurada com modos de acesso compartilhado atribuídos ou sem isolamento. Você não pode usar UDTFs no modo de acesso compartilhado.

Você não pode registrar UDTFs como objetos no Unity Catalog e UDTFs não podem ser usados com SQL warehouse.

Qual é a sintaxe básica de um UDTF?

O Apache Spark implementa UDTFs Python como classes Python com um método eval obrigatório.

Você emite resultados como linhas usando yield.

Para que o Apache Spark use sua classe como UDTF, você deve importar a função PySpark udtf .

A Databricks recomenda usar essa função como decorador e sempre especificar explicitamente nomes e tipos de campos usando a opção returnType .

O exemplo a seguir cria uma tabela simples a partir de entradas escalares usando um UDTF:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Você pode usar a sintaxe *args do Python e implementar lógica para lidar com um número não especificado de valores de entrada. O exemplo a seguir retorna o mesmo resultado enquanto verifica explicitamente o comprimento e os tipos de entrada dos argumentos:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

registrar-se em UDTF

Você pode registrar um UDTF no SparkSession atual para uso na query SQL usando a seguinte sintaxe:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

O exemplo a seguir registra um UDTF do Python para SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

Depois de registrado, você poderá usar o UDTF em SQL usando o comando mágico %sql ou a função spark.sql() , como nos exemplos a seguir:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

Gerando resultados

UDTFs Python são implementados com yield para retornar resultados. Os resultados são sempre retornados como uma tabela contendo 0 ou mais linhas com o esquema especificado.

Ao passar argumentos escalares, a lógica na execução do método eval exatamente uma vez com o conjunto de argumentos escalares passados. Para argumentos de tabela, o método eval é executado uma vez para cada linha da tabela de entrada.

A lógica pode ser escrita para retornar 0, 1 ou muitas linhas por entrada.

O UDTF a seguir demonstra o retorno de 0 ou mais linhas para cada entrada separando itens de uma lista separada por vírgula em entradas separadas:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

Passar um argumento de tabela para um UDTF

Você pode usar a palavra-chave SQL TABLE() para transmitir um argumento de tabela para um UDTF. Você pode usar um nome de tabela ou uma query, como nos exemplos a seguir:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

Os argumentos da tabela são processados uma linha por vez. Você pode usar anotação de campo de coluna padrão do PySpark para interagir com colunas em cada linha. O exemplo a seguir demonstra a importação explícita do tipo Row do PySpark e a filtragem da tabela transmitida no campo id :

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

Passar argumentos escalares para um UDTF

É possível transmitir argumentos escalares para um UDTF utilizando qualquer combinação dos seguintes valores:

  • Constantes escalares

  • Funções escalares

  • Campos em uma relação

Para passar campos em uma relação, é necessário registrar o UDTF e utilizar a palavra-chave SQL LATERAL .

Observação

Você pode usar aliases de tabela em linha para desambiguar colunas.

O exemplo a seguir demonstra o uso LATERAL para transmitir campos de uma tabela para um UDTF:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

Definir valores padrão para UDTFs

Opcionalmente, você pode implementar um método __init__ para definir valores default para variáveis de classe que podem ser referenciadas na lógica do Python.

O método __init__ não aceita argumentos e não tem acesso a variáveis ou informações de estado no SparkSession.

Use Apache Arrow com UDTFs

A Databricks recomenda a utilização do Apache Arrow para UDTFs que recebem uma pequena quantidade de dados como entrada, mas produzem uma tabela grande.

Você pode ativar a seta especificando o parâmetro useArrow ao declarar o UDTF, como no exemplo a seguir:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1