pandas funções definidas pelo usuário

Uma função definida pelo usuário (UDF) do pandas - também conhecida como UDF vetorizada - é uma função definida pelo usuário que usa o Apache Arrow para transferir dados e pandas para trabalhar com os dados. Pandas UDFs permitem operações vetorizadas que podem aumentar o desempenho em até 100x em comparação com UDFs Python linha por vez.

Para obter informações básicas, consulte a postagem nos blogs New Pandas UDFs e Python Type Hints no próximo lançamento do Apache Spark 3.0.

Você define um UDF pandas usando a palavra-chave pandas_udf como decorador e envolve a função com uma dica de tipo Python. Este artigo descreve os diferentes tipos de UDFs de pandas e mostra como usar UDFs de pandas com dicas de tipo.

Série para série UDF

Você usa uma UDF de série para série de pandas para vetorizar operações escalares. Você pode usá-los com APIs como select e withColumn.

A função Python deve receber uma série de pandas como entrada e retornar uma série de pandas do mesmo comprimento, e você deve especificá-los nas dicas de tipo do Python. Inicie a execução de um UDF do pandas dividindo as colunas em lotes, chamando a função para cada lote como um subconjunto dos dados e, em seguida, concatenando os resultados.

O exemplo a seguir mostra como criar um UDF pandas que compute o produto de 2 colunas.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iterador de série para iterador de série UDF

Um UDF iterador é o mesmo que um UDF escalar pandas, exceto:

  • A função Python

    • Toma um iterador de lotes em vez de um único lote de entrada como entrada.

    • Retorna um iterador de lotes de saída em vez de um único lote de saída.

  • O comprimento de toda a saída no iterador deve ser igual ao comprimento de toda a entrada.

  • O UDF pandas agrupado usa uma única coluna Spark como entrada.

Você deve especificar a dica de tipo do Python como Iterator[pandas.Series] -> Iterator[pandas.Series].

Essa UDF do pandas é útil quando a execução da UDF requer a inicialização de algum estado, por exemplo, carregar um arquivo modelo do machine learning para aplicar inferência a cada lote de entrada.

O exemplo a seguir mostra como criar um UDF pandas com suporte a iterador.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iterador de várias séries para Iterador de série UDF

Um Iterador de série múltipla para Iterador de UDF de série tem características e restrições semelhantes a Iterador de série para Iterador de UDF de série. A função especificada usa um iterador de lotes e gera um iterador de lotes. Também é útil quando a execução do UDF requer a inicialização de algum estado.

As diferenças são:

  • A função Python subjacente usa um iterador de uma tupla de pandas Series.

  • O UDF pandas agrupado usa várias colunas do Spark como entrada.

Você especifica as dicas de tipo como Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Série para UDF escalar

As UDFs de pandas de série para escalar são semelhantes às funções de agregação do Spark. Uma série para pandas escalares UDF define uma agregação de uma ou mais séries de pandas para um valor escalar, onde cada série de pandas representa uma coluna Spark. Você usa uma série para escalar UDF de pandas com APIs como select, withColumn, groupBy.agg e PySpark.sql.Window.

Você expressa a dica de tipo como pandas.Series, ... -> Any. O tipo de retorno deve ser um tipo de dados primitivo e o escalar retornado pode ser um tipo primitivo Python, por exemplo, int ou float ou um tipo de dados NumPy, como numpy.int64 ou numpy.float64. Any idealmente deve ser um tipo escalar específico.

Este tipo de UDF não suporta agregação parcial e todos os dados para cada grupo são carregados na memória.

O exemplo a seguir mostra como usar esse tipo de UDF para compute a média com as operações select, groupBy e window:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Para uso detalhado, consulte PySpark.sql.functions.pandas_udf.

Uso

Definindo o tamanho dos lotes da seta

Observação

Esta configuração não tem impacto sobre compute configurado com o modo de acesso compartilhado e Databricks Runtime 13.3 LTS até 14.2.

As partições de dados no Spark são convertidas em lotes de registro Arrow, o que pode levar temporariamente a um alto uso de memória na JVM. Para evitar possíveis exceções de falta de memória, você pode ajustar o tamanho dos lotes de registros Arrow definindo a configuração spark.sql.execution.arrow.maxRecordsPerBatch para um número inteiro que determina o número máximo de linhas para cada lote. O valor default é de 10.000 registros por lote. Se o número de colunas for grande, o valor deve ser ajustado de acordo. Usando esse limite, cada partição de dados é dividida em 1 ou mais lotes de registro para processamento.

Timestamp com semântica de fuso horário

O Spark armazena internamente carimbos de data/hora como valores UTC, e os dados de carimbo de data/hora trazidos sem um fuso horário especificado são convertidos como hora local em UTC com resolução de microssegundos.

Quando os dados de carimbo de data/hora são exportados ou exibidos no Spark, o fuso horário da sessão é usado para localizar os valores de carimbo de data/hora. O fuso horário da sessão é definido com a configuração spark.sql.session.timeZone e default para o fuso horário local do sistema JVM. pandas usa um tipo datetime64 com resolução de nanossegundos, datetime64[ns], com fuso horário opcional por coluna.

Quando os dados do carimbo de data/hora são transferidos do Spark para os pandas, eles são convertidos em nanossegundos e cada coluna é convertida no fuso horário da sessão do Spark e localizada nesse fuso horário, o que remove o fuso horário e exibe os valores como hora local. Isso ocorre ao chamar toPandas() ou pandas_udf com colunas de carimbo de data/hora.

Quando os dados de timestamp são transferidos de pandas para Spark, eles são convertidos em microssegundos UTC. Isso ocorre ao chamar createDataFrame com um DataFrame do pandas ou ao retornar um carimbo de data/hora de um UDF do pandas. Essas conversões são feitas automaticamente para garantir que o Spark tenha dados no formato esperado, portanto, não é necessário fazer nenhuma dessas conversões por conta própria. Quaisquer valores de nanossegundos são truncados.

Um UDF padrão carrega dados de carimbo de data/hora como objetos de data e hora do Python, o que é diferente de um carimbo de data/hora do pandas. Para obter o melhor desempenho, recomendamos que você use a funcionalidade de série temporal do pandas ao trabalhar com carimbos de data/hora em um UDF do pandas. Para obter detalhes, consulte Funcionalidade de séries temporais/datas.

Notebook de exemplo

O Notebook a seguir ilustra as melhorias de desempenho que você pode obter com pandas UDFs:

Notebook de benchmark de pandas UDFs

Abra o bloco de anotações em outra guia