Referência da linguagem Python do Delta Live Tables

Este artigo fornece detalhes para a interface de programação Python do Delta Live Tables.

Para obter informações sobre a SQL API, consulte a referência da linguagem SQL Delta Live Tables.

Para obter detalhes específicos sobre a configuração do Auto Loader, consulte O que é o Auto Loader?.

Limitações

A interface do Python de Tabelas Delta Live tem as seguintes limitações:

  • O Python table e view as funções devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Como as transformações do DataFrame são executadas após a resolução do gráfico de fluxo de dados completo, o uso dessas operações pode ter efeitos colaterais indesejados. Essas operações incluem funções como collect()count(), toPandas(), save() e saveAsTable(). No entanto, você pode incluir essas funções fora de table nossas definições de view função porque esse código é executado uma vez durante a fase de inicialização do gráfico.

  • A função pivot() não é compatível. A pivot operação no Spark exige um carregamento rápido dos dados de entrada para calcular o esquema da saída. Esse recurso não é compatível com Delta Live Tables.

Importar o módulo Python dlt

As funções do Python das Tabelas do Delta Live são definidas no módulo dlt. Seus pipelines implementados com a API do Python devem importar este módulo:

import dlt

Criar uma exibição materializada Delta Live Tables ou tabela de transmissão

No Python, o Delta Live Tables determina se um dataset deve ser atualizado como uma exibição materializada ou uma tabela de transmissão com base na consulta definidora. O decorador @table é usado para definir exibições materializadas e tabelas de transmissão.

Para definir uma visualização materializada em Python, aplique @table a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de streaming, aplique @table a uma consulta que executa uma leitura de transmissão em uma fonte de dados. Os dois tipos de datasets têm a mesma especificação de sintaxe, como segue:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Criar uma exibição de Delta Live Tables

Para definir uma visualização em Python, aplique o decorador do @view. Como o decorador @table, você pode usar exibições em Delta Live Tables para datasets estáticos ou de transmissão. A seguir está a sintaxe para definir visualizações com Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exemplo: definir tabelas e exibições

Para definir uma tabela ou visualização em Python, aplique o decorador @dlt.view ou @dlt.table a uma função. Você pode utilizar o nome da função ou o parâmetro name para atribuir a tabela ou visualizar o nome. O exemplo a seguir define dois datasets diferentes: uma visualização chamada taxi_raw que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data que usa a taxi_raw exibição como entrada:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Exemplo: acessar um dataset definido no mesmo pipeline

Além de ler a partir de fontes de dados externas, você pode acessar datasets definidos no mesmo pipeline com a função Delta Live Tables read(). O seguinte exemplo demonstra a criação de um dataset do customers_filtered utilizando a função read():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Você também pode utilizar a função spark.table() para acessar um dataset definido no mesmo pipeline. Ao utilizar a função spark.table() para acessar um dataset definido no pipeline, no argumento de função prepend a palavra-chave LIVE ao nome do dataset:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Exemplo: Leitura de uma tabela cadastrada em um metastore

Para ler dados de uma tabela registrada no Hive metastore , omita a palavra-chave LIVE no argumento da função e, opcionalmente, qualifique o nome da tabela com o nome do banco de dados:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Para obter um exemplo de leitura de uma tabela Unity Catalog , consulte Ingerir dados em um pipeline Unity Catalog .

Exemplo: acesse um dataset usando spark.sql

Você também pode retornar um dataset utilizando uma expressão do spark.sql em uma função de consulta. Para ler de um dataset interno, acrescente o nome do dataset LIVE.:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Escreva em uma tabela de transmissão de transmissão de origem múltipla

Visualização

O suporte do Delta Live Tables para @append_flow está em visualização pública.

Você pode usar o decorador @append_flow para gravar em uma tabela de transmissão de diversas fontes de transmissão para fazer o seguinte:

  • Adicione e remova fontes de transmissão que acrescentam dados a uma tabela de transmissão existente sem exigir uma refresh completa. Por exemplo, você pode ter uma tabela que combina dados regionais de cada região em que você opera. À medida que novas regiões são implementadas, você pode adicionar os dados da nova região à tabela sem realizar uma refresh completa.

  • Atualize uma tabela de transmissão anexando dados históricos faltantes (preenchimento). Por exemplo, você tem uma tabela de transmissão existente que é gravada por um tópico do Apache Kafka. Você também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de transmissão e não pode transmitir os dados porque precisa realizar uma agregação complexa antes de inserir os dados.

Para criar uma tabela de destino para os registros gerados pelo processamento @append_flow, use a função create_streaming_table().

Observação

Se o senhor precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função create_streaming_table(). O senhor não pode definir expectativas na definição @append_flow.

A seguir está a sintaxe para @append_flow:

import dlt

dlt.create_streaming_table("<target-table-name>")

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment") # optional
def <function-name>():
  return (<streaming query>)

Exemplo: Escreva em uma tabela de transmissão de vários tópicos do Kafka

O exemplo a seguir cria uma tabela de transmissão chamada kafka_target e grava nessa tabela de transmissão a partir de dois tópicos do Kafka:

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

Exemplo: execução de um preenchimento único de dados

O exemplo a seguir executa uma query para anexar dados históricos a uma tabela de transmissão:

Observação

Para garantir um preenchimento único e verdadeiro quando a query de preenchimento faz parte de um pipeline cuja execução é programada ou contínua, remova a query depois de executar o pipeline uma vez. Para anexar novos dados se eles chegarem ao diretório de preenchimento, deixe a query no lugar.

import dlt

@dlt.table()
def csv_target():
  return spark.readStream.format("csv").load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream.format("csv").load("path/to/backfill/data/dir")

Criar uma tabela para ser usada como destino das operações de transmissão

Use a função create_streaming_table() para criar uma tabela de destino para os registros de saída das operações de transmissão, inclusive os registros de saída apply_changes() e @append_flow.

Observação

As funções create_target_table() e create_streaming_live_table() estão obsoletas. A Databricks recomenda atualizar o código existente para usar a create_streaming_table() função.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)

Argumentos

name

Tipo: str

O nome da tabela.

Este parâmetro é necessário.

comment

Tipo: str

Uma descrição opcional para a tabela.

spark_conf

Tipo: dict

Uma lista opcional de configurações do Spark para a execução desta consulta.

table_properties

Tipo: dict

Uma lista opcional de propriedades da tabela da tabela.

partition_cols

Tipo: array

Uma lista opcional de uma ou mais colunas para usar no particionamento da tabela.

path

Tipo: str

Um local de armazenamento opcional para dados da tabela. Se não estiver definido, o sistema adotará como padrão o local de armazenamento do pipeline.

schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Esquemas podem ser definidos como uma string SQL DDL, ou com um Python StructType.

expect_all expect_all_or_drop expect_all_or_fail

Tipo: dict

Restrições opcionais de qualidade de dados para a tabela. Veja múltiplas expectativas.

Controle como as tabelas são materializadas

As tabelas também oferecem controle adicional de sua materialização:

Observação

Para tabelas com menos de 1 TB, a Databricks recomenda deixar que o Delta Live Tables controle a organização de dados. A menos que você espere que sua tabela cresça além de um terabyte, geralmente não é recomendável especificar colunas de partição.

Exemplo: especificar um esquema e colunas de partição

Opcionalmente, você pode especificar um esquema de tabela utilizando uma string do Python StructType ou SQL DDL. Quando especificada com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.

O exemplo a seguir cria uma tabela chamada sales com um esquema especificado usando Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

O exemplo a seguir especifica o esquema para uma tabela usando uma string DDL, define uma coluna gerada e define uma coluna de partição:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Por padrão, Delta Live Tables infere o esquema da definição table se você não especificar um esquema.

Configurar uma tabela de transmissão para ignorar alterações em uma tabela de transmissão de origem

Observação

  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option() . Você não pode usar este sinalizador em uma função dlt.read_stream() .

  • Você não pode usar o sinalizador skipChangeCommits quando a tabela de transmissão de origem estiver definida como destino de uma função apply_changes() .

Por default, as tabelas de transmissão exigem fontes somente anexadas. Quando uma tabela de transmissão usa outra tabela de transmissão como origem, e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, processamento de “direito ao esquecimento” do GDPR, o sinalizador skipChangeCommits pode ser definido ao ler a tabela de transmissão de origem para ignorar essas mudanças. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Propriedades do Python Delta Live Tables

As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com Delta Live Tables:

@tabela ou @visualizar

name

Tipo: str

Um nome opcional para a tabela ou visualização. Se não estiver definida, o nome da função será usado como a tabela ou o nome de visualização.

comment

Tipo: str

Uma descrição opcional para a tabela.

spark_conf

Tipo: dict

Uma lista opcional de configurações do Spark para a execução desta consulta.

table_properties

Tipo: dict

Uma lista opcional de propriedades da tabela da tabela.

path

Tipo: str

Um local de armazenamento opcional para dados da tabela. Se não estiver definido, o sistema adotará como padrão o local de armazenamento do pipeline.

partition_cols

Tipo: a collection of str

Uma coleção opcional, por exemplo, a list, de uma ou mais colunas para usar para particionar a tabela.

schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Esquemas podem ser definidos como uma string SQL DDL, ou com um Python StructType.

temporary

Tipo: bool

Criar uma tabela, mas não publicar os metadados da tabela. A palavra-chave temporary instrui o Delta Live Tables a criar uma tabela que esteja disponível para o pipeline, mas que não deva ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas em uma única atualização.

O padrão é 'falso'.

Definição de tabela ou exibição

def <function-name>()

Uma função do Python que define o dataset. Se o parâmetro name não estiver configurado, então <function-name> será utilizado como o nome do dataset de destino.

query

Uma declaração Spark SQL que retorna um dataset Spark ou DataFrame Koalas.

Use dlt.read() ou spark.table() para realizar uma leitura completa de um dataset definido no mesmo pipeline. Ao usar a spark.table() função para ler um dataset definido no mesmo pipeline, acrescente a LIVE palavra-chave ao nome do dataset no argumento da função. Por exemplo, para ler um dataset chamado customers:

spark.table("LIVE.customers")

Você também pode utilizar a função spark.table() para ler de uma tabela registrada no metastore omitindo a palavra-chave LIVE e, opcionalmente, qualificando o nome da tabela com o nome do banco de dados:

spark.table("sales.customers")

Use dlt.read_stream() para realizar uma leitura de streaming de um dataset definido no mesmo pipeline.

Utilize a função spark.sql para definir uma consulta SQL para criar o dataset de retorno.

Use a sintaxe do PySpark para definir consultas Delta Live Tables com Python.

Expectativas

@expect("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, inclua a linha no dataset de destino.

@expect_or_drop("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, solte a linha do dataset de destino.

@expect_or_fail("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, interrompa imediatamente a execução.

@expect_all(expectations)

Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, inclua a linha no dataset de destino.

@expect_all_or_drop(expectations)

Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, solte a linha do dataset de destino.

@expect_all_or_fail(expectations)

Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, interrompa imediatamente a execução.

Altere a captura de dados com Python em Delta Live Tables

Use a função apply_changes() na API Python para usar a funcionalidade CDC do Delta Live Tables. A interface Python do Delta Live Tables também fornece a função create_streaming_table(). O senhor pode usar essa função para criar a tabela de destino exigida pela função apply_changes().

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Observação

O comportamento padrão para eventos de INSERT e UPDATE é realizar upsert de eventos do CDC a partir da fonte: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino.O tratamento de DELETE eventos pode ser especificado com a APPLY AS DELETE WHEN condição.

Importante

Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino do apply_changes, você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by .

Consulte Captura simplificada de dados de alterações (CDC) com a API APPLY CHANGES em Delta Live Tables.

Argumentos

target

Tipo: str

O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função apply_changes() .

Este parâmetro é necessário.

source

Tipo: str

A fonte de dados que contém os registros do CDC.

Este parâmetro é necessário.

keys

Tipo: list

A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

  • Uma lista de strings: ["userId", "orderId"]

  • Uma lista de funções do Spark SQL col(): [col("userId"), col("orderId"]

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é necessário.

sequence_by

Tipo: str ou col()

O nome da coluna que especifica a ordem lógica dos eventos de CDC nos dados de origem. O Delta Live Tables usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem.

Você pode especificar:

  • Uma string: "sequenceNum"

  • Uma função Spark SQL col(): col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é necessário.

ignore_null_updates

Tipo: bool

Permitir a ingestão de atualizações que contenham um subconjunto das colunas de destino. Quando um evento CDC corresponder a uma linha existente e ignore_null_updates para True, as colunas com null manterão seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates for False, os valores existentes serão substituídos por valores null.

Este parâmetro é opcional.

O padrão é False.

apply_as_deletes

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Para lidar com dados fora de ordem, a linha excluída é retida temporariamente como uma marca para exclusão na tabela Delta subjacente e uma view é criada no metastore que filtra essas marcas para exclusão. O intervalo de retenção pode ser configurado com a propriedade da tabela pipelines.cdc.tombstoneGCThresholdInSeconds .

Você pode especificar:

  • Uma string: "Operation = 'DELETE'"

  • Uma função Spark SQL expr(): expr("Operation = 'DELETE'")

Este parâmetro é opcional.

apply_as_truncates

Tipo: str ou expr()

Especifica quando um evento de CDC deve ser tratado como uma tabela completa TRUNCATE. Como esta cláusula aciona uma operação completa de truncamento da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

O parâmetro apply_as_truncates é compatível apenas com o SCD tipo 1. O SCD tipo 2 não suporta truncamento.

Você pode especificar:

  • Uma string: "Operation = 'TRUNCATE'"

  • Uma função Spark SQL expr(): expr("Operation = 'TRUNCATE'")

Este parâmetro é opcional.

column_list

except_column_list

Tipo: list

Um subconjunto de colunas a incluir na tabela de destino. Use column_list para especificar a lista completa de colunas a incluir. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar o valor como uma lista de strings ou como funções Spark SQL col():

  • column_list = ["userId", "name", "city"].

  • column_list = [col("userId"), col("name"), col("city")]

  • except_column_list = ["operation", "sequenceNum"]

  • except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função.

stored_as_scd_type

Tipo: str ou int

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2.

Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.

track_history_column_list

track_history_except_column_list

Tipo: list

Um subconjunto de colunas de saída a serem rastreadas para história na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilize track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de strings ou como funções Spark SQL col() : - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.