Referência da linguagem Python do Delta Live Tables

Este artigo contém detalhes sobre a interface de programação Delta Live Tables Python .

Para obter informações sobre a SQL API, consulte a referência da linguagem SQL Delta Live Tables.

Para obter detalhes específicos sobre a configuração do Auto Loader, consulte O que é o Auto Loader?.

Antes de começar

A seguir, há considerações importantes quando o senhor implementa o pipeline com a interface Delta Live Tables Python :

  • Como as funções Python table() e view() são invocadas várias vezes durante o planejamento e a execução de uma atualização pipeline, não inclua código em uma dessas funções que possa ter efeitos colaterais (por exemplo, código que modifique dados ou envie um email). Para evitar um comportamento inesperado, suas funções Python que definem o conjunto de dados devem incluir apenas o código necessário para definir a tabela ou view.

  • Para realizar operações como o envio de e-mail ou a integração com um serviço de monitoramento externo, especialmente em funções que definem conjuntos de dados, use ganchos de eventos. A implementação dessas operações nas funções que definem seu conjunto de dados causará um comportamento inesperado.

  • O Python table e view as funções devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Essas operações incluem funções como collect()count(), toPandas(), save() e saveAsTable(). Como as transformações do DataFrame são executadas após a resolução do gráfico de fluxo de dados completo, o uso dessas operações pode ter efeitos colaterais indesejados.

Importar o módulo Python dlt

As funções do Python das Tabelas do Delta Live são definidas no módulo dlt. Seus pipelines implementados com a API do Python devem importar este módulo:

import dlt

Criar uma exibição materializada Delta Live Tables ou tabela de transmissão

Em Python, Delta Live Tables determina se deve atualizar uma dataset como uma tabela materializada view ou de transmissão com base na consulta de definição. O decorador @table pode ser usado para definir tanto a exibição materializada quanto as tabelas de transmissão.

Para definir um view materializado em Python, aplique @table a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de transmissão, aplique @table a uma consulta que execute uma leitura de transmissão em uma fonte de dados ou use a função create_streaming_table(). Os dois tipos de dataset têm a mesma especificação de sintaxe, como segue:

Observação

Para usar o argumento cluster_by para habilitar o líquido clustering, seu pipeline deve estar configurado para usar o canal de visualização.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Criar uma exibição de Delta Live Tables

Para definir uma visualização em Python, aplique o decorador do @view. Como o decorador @table, você pode usar exibições em Delta Live Tables para datasets estáticos ou de transmissão. A seguir está a sintaxe para definir visualizações com Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exemplo: definir tabelas e exibições

Para definir uma tabela ou visualização em Python, aplique o decorador @dlt.view ou @dlt.table a uma função. Você pode utilizar o nome da função ou o parâmetro name para atribuir a tabela ou visualizar o nome. O exemplo a seguir define dois datasets diferentes: uma visualização chamada taxi_raw que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data que usa a taxi_raw exibição como entrada:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Exemplo: acessar um dataset definido no mesmo pipeline

Além de ler a partir de fontes de dados externas, você pode acessar datasets definidos no mesmo pipeline com a função Delta Live Tables read(). O seguinte exemplo demonstra a criação de um dataset do customers_filtered utilizando a função read():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Você também pode utilizar a função spark.table() para acessar um dataset definido no mesmo pipeline. Ao utilizar a função spark.table() para acessar um dataset definido no pipeline, no argumento de função prepend a palavra-chave LIVE ao nome do dataset:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Exemplo: Leitura de uma tabela cadastrada em um metastore

Para ler dados de uma tabela registrada no site Hive metastore, no argumento da função, omita a palavra-chave LIVE e, opcionalmente, qualifique o nome da tabela com o nome do banco de dados:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Para obter um exemplo de leitura de uma tabela Unity Catalog , consulte Ingerir dados em um pipeline Unity Catalog .

Exemplo: acesse um dataset usando spark.sql

Você também pode retornar um dataset utilizando uma expressão do spark.sql em uma função de consulta. Para ler de um dataset interno, acrescente o nome do dataset LIVE.:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Criar uma tabela para ser usada como destino das operações de transmissão

Use a função create_streaming_table() para criar uma tabela de destino para os registros de saída das operações de transmissão, inclusive os registros de saída apply_changes(), apply_changes_from_snapshot() e @append_flow.

Observação

As funções create_target_table() e create_streaming_live_table() estão obsoletas. A Databricks recomenda atualizar o código existente para usar a create_streaming_table() função.

Observação

Para usar o argumento cluster_by para habilitar o líquido clustering, seu pipeline deve estar configurado para usar o canal de visualização.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)

Argumentos

name

Tipo: str

O nome da tabela.

Este parâmetro é necessário.

comment

Tipo: str

Uma descrição opcional para a tabela.

spark_conf

Tipo: dict

Uma lista opcional de configurações do Spark para a execução desta consulta.

table_properties

Tipo: dict

Uma lista opcional de propriedades da tabela da tabela.

partition_cols

Tipo: array

Uma lista opcional de uma ou mais colunas para usar no particionamento da tabela.

cluster_by

Tipo: array

Opcionalmente, ative o clustering líquido na tabela e defina as colunas a serem usadas como chave clustering.

Consulte Usar clusters líquidos para tabelas Delta.

path

Tipo: str

Um local de armazenamento opcional para os dados da tabela. Se não for definido, o sistema usará como padrão o local de armazenamento pipeline.

schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como SQL DDL strings ou com Python StructType.

expect_all expect_all_or_drop expect_all_or_fail

Tipo: dict

Restrições opcionais de qualidade de dados para a tabela. Veja múltiplas expectativas.

row_filter (Pré-visualização pública)

Tipo: str

Uma cláusula opcional de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Controle como as tabelas são materializadas

As tabelas também oferecem controle adicional de sua materialização:

Observação

Para tabelas com menos de 1 TB de tamanho, a Databricks recomenda deixar que o Delta Live Tables controle a organização dos dados. O senhor não deve especificar colunas de partição, a menos que espere que a tabela cresça além de um terabyte.

Exemplo: especificar um esquema e colunas de partição

Opcionalmente, você pode especificar um esquema de tabela utilizando uma string do Python StructType ou SQL DDL. Quando especificada com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.

O exemplo a seguir cria uma tabela chamada sales com um esquema especificado usando Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

O exemplo a seguir especifica o esquema para uma tabela usando uma string DDL, define uma coluna gerada e define uma coluna de partição:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Por padrão, Delta Live Tables infere o esquema da definição table se você não especificar um esquema.

Configurar uma tabela de transmissão para ignorar alterações em uma tabela de transmissão de origem

Observação

  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option() . Você não pode usar este sinalizador em uma função dlt.read_stream() .

  • Você não pode usar o sinalizador skipChangeCommits quando a tabela de transmissão de origem estiver definida como destino de uma função apply_changes() .

Por default, as tabelas de transmissão exigem fontes somente anexadas. Quando uma tabela de transmissão usa outra tabela de transmissão como origem, e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, processamento de “direito ao esquecimento” do GDPR, o sinalizador skipChangeCommits pode ser definido ao ler a tabela de transmissão de origem para ignorar essas mudanças. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Exemplo: Definir restrições de tabela

Visualização

As restrições de tabela estão na Pré-visualização pública.

Ao especificar um esquema, o senhor pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.

O exemplo a seguir define uma tabela com uma restrição primária e estrangeira key:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Exemplo: Definir um filtro de linha e uma máscara de coluna

Visualização

Os filtros de linha e as máscaras de coluna estão na Pré-visualização pública.

Para criar uma tabela materializada view ou de transmissão com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK. O exemplo a seguir demonstra como definir uma tabela materializada view e uma tabela de transmissão com um filtro de linha e uma máscara de coluna:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Propriedades do Python Delta Live Tables

As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com Delta Live Tables:

Observação

Para usar o argumento cluster_by para habilitar o líquido clustering, seu pipeline deve estar configurado para usar o canal de visualização.

@tabela ou @visualizar

name

Tipo: str

Um nome opcional para a tabela ou visualização. Se não estiver definida, o nome da função será usado como a tabela ou o nome de visualização.

comment

Tipo: str

Uma descrição opcional para a tabela.

spark_conf

Tipo: dict

Uma lista opcional de configurações do Spark para a execução desta consulta.

table_properties

Tipo: dict

Uma lista opcional de propriedades da tabela da tabela.

path

Tipo: str

Um local de armazenamento opcional para os dados da tabela. Se não for definido, o sistema usará como padrão o local de armazenamento pipeline.

partition_cols

Tipo: a collection of str

Uma coleção opcional, por exemplo, um list de uma ou mais colunas a serem usadas para particionar a tabela.

cluster_by

Tipo: array

Opcionalmente, ative o clustering líquido na tabela e defina as colunas a serem usadas como chave clustering.

Consulte Usar clusters líquidos para tabelas Delta.

schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como SQL DDL strings ou com Python StructType.

temporary

Tipo: bool

Crie uma tabela, mas não publique metadados para a tabela. A palavra-chave temporary instrui o Delta Live Tables a criar uma tabela que esteja disponível para o pipeline, mas que não deva ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas em uma única atualização.

O padrão é 'falso'.

row_filter (Pré-visualização pública)

Tipo: str

Uma cláusula opcional de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Definição de tabela ou exibição

def <function-name>()

Uma função do Python que define o dataset. Se o parâmetro name não estiver configurado, então <function-name> será utilizado como o nome do dataset de destino.

query

Uma declaração Spark SQL que retorna um dataset Spark ou DataFrame Koalas.

Use dlt.read() ou spark.table() para realizar uma leitura completa de um dataset definido no mesmo pipeline. Ao usar a spark.table() função para ler um dataset definido no mesmo pipeline, acrescente a LIVE palavra-chave ao nome do dataset no argumento da função. Por exemplo, para ler um dataset chamado customers:

spark.table("LIVE.customers")

Você também pode utilizar a função spark.table() para ler de uma tabela registrada no metastore omitindo a palavra-chave LIVE e, opcionalmente, qualificando o nome da tabela com o nome do banco de dados:

spark.table("sales.customers")

Use dlt.read_stream() para realizar uma leitura de streaming de um dataset definido no mesmo pipeline.

Utilize a função spark.sql para definir uma consulta SQL para criar o dataset de retorno.

Use a sintaxe do PySpark para definir consultas Delta Live Tables com Python.

Expectativas

@expect("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, inclua a linha no dataset de destino.

@expect_or_drop("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, solte a linha do dataset de destino.

@expect_or_fail("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, interrompa imediatamente a execução.

@expect_all(expectations)

Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, inclua a linha no dataset de destino.

@expect_all_or_drop(expectations)

Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, solte a linha do dataset de destino.

@expect_all_or_fail(expectations)

Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, interrompa imediatamente a execução.

captura de dados de alterações (CDC) de um feed de alterações com Python em Delta Live Tables

Use a função apply_changes() no site Python API para usar a funcionalidade Delta Live Tables captura de dados de alterações (CDC) (CDC) para processar dados de origem de um feed de dados de alterações (CDF).

Importante

O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, o senhor pode especificar o esquema da tabela de destino. Ao especificar o esquema da tabela de destino apply_changes(), o senhor deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados dos campos sequence_by.

Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface Python do Delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Observação

Para o processamento de APPLY CHANGES, o comportamento do default para eventos INSERT e UPDATE é fazer upsert de eventos CDC da origem: atualizar todas as linhas da tabela de destino que correspondam ao(s) key(s) especificado(s) ou inserir uma nova linha quando não houver um registro correspondente na tabela de destino. O tratamento de eventos DELETE pode ser especificado com a condição APPLY AS DELETE WHEN.

Para saber mais sobre o processamento do CDC com um feed de alterações, consulte APLICAR ALTERAÇÕES APIs: Simplifique a captura de dados de alterações (CDC) com Delta Live Tables. Para ver um exemplo de uso da função apply_changes(), consulte Exemplo: Processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.

Importante

O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, o senhor pode especificar o esquema da tabela de destino. Ao especificar o esquema da tabela de destino apply_changes, o senhor deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by.

Consulte a seção APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com Delta Live Tables.

Argumentos

target

Tipo: str

O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função apply_changes() .

Este parâmetro é necessário.

source

Tipo: str

A fonte de dados que contém os registros do CDC.

Este parâmetro é necessário.

keys

Tipo: list

A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

  • Uma lista de strings: ["userId", "orderId"]

  • Uma lista de funções do Spark SQL col(): [col("userId"), col("orderId"]

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é necessário.

sequence_by

Tipo: str ou col()

O nome da coluna que especifica a ordem lógica dos eventos de CDC nos dados de origem. O Delta Live Tables usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem.

Você pode especificar:

  • Uma string: "sequenceNum"

  • Uma função Spark SQL col(): col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

A coluna especificada deve ser um tipo de dados classificável.

Este parâmetro é necessário.

ignore_null_updates

Tipo: bool

Permitir a ingestão de atualizações que contenham um subconjunto da coluna de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com null mantêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos por valores null.

Este parâmetro é opcional.

O padrão é False.

apply_as_deletes

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como DELETE em vez de upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma lápide na tabela subjacente Delta e é criado um view no metastore que filtra essas lápides. O intervalo de retenção pode ser configurado com a propriedade de tabela pipelines.cdc.tombstoneGCThresholdInSeconds.

Você pode especificar:

  • Uma string: "Operation = 'DELETE'"

  • Uma função Spark SQL expr(): expr("Operation = 'DELETE'")

Este parâmetro é opcional.

apply_as_truncates

Tipo: str ou expr()

Especifica quando um evento de CDC deve ser tratado como uma tabela completa TRUNCATE. Como esta cláusula aciona uma operação completa de truncamento da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

O parâmetro apply_as_truncates é compatível apenas com o SCD tipo 1. O SCD tipo 2 não oferece suporte a operações de truncamento.

Você pode especificar:

  • Uma string: "Operation = 'TRUNCATE'"

  • Uma função Spark SQL expr(): expr("Operation = 'TRUNCATE'")

Este parâmetro é opcional.

column_list

except_column_list

Tipo: list

Um subconjunto de colunas a incluir na tabela de destino. Use column_list para especificar a lista completa de colunas a incluir. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar o valor como uma lista de strings ou como funções Spark SQL col():

  • column_list = ["userId", "name", "city"].

  • column_list = [col("userId"), col("name"), col("city")]

  • except_column_list = ["operation", "sequenceNum"]

  • except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função.

stored_as_scd_type

Tipo: str ou int

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2.

Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.

track_history_column_list

track_history_except_column_list

Tipo: list

Um subconjunto de colunas de saída a serem rastreadas para história na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilize track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de strings ou como funções Spark SQL col() : - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.

captura de dados de alterações (CDC) do banco de dados Snapshot com Python em Delta Live Tables

Visualização

A API APPLY CHANGES FROM SNAPSHOT está em pré-visualização pública.

Use a função apply_changes_from_snapshot() no site Python API para usar a funcionalidade Delta Live Tables captura de dados de alterações (CDC) (CDC) para processar os dados de origem do banco de dados Snapshot.

Importante

Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino do apply_changes_from_snapshot(), você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by .

Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface Python do Delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Observação

Para o processamento APPLY CHANGES FROM SNAPSHOT, o comportamento do default é inserir uma nova linha quando não houver um registro correspondente com o mesmo key(s) no destino. Se houver um registro correspondente, ele será atualizado somente se algum dos valores da linha tiver sido alterado. As linhas com chave presentes no destino, mas não mais presentes na origem, são excluídas.

Para saber mais sobre o processamento do CDC com o Snapshot, consulte APLICAR ALTERAÇÕES APIs: Simplifique a captura de dados de alterações (CDC) com Delta Live Tables. Para obter exemplos de uso da função apply_changes_from_snapshot(), consulte os exemplos de ingestão periódica em Snapshot e ingestão histórica em Snapshot .

Argumentos

target

Tipo: str

O nome da tabela a ser atualizada. O senhor pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função apply_changes().

Este parâmetro é necessário.

source

Tipo: str ou lambda function

O nome de uma tabela ou view para Snapshot periodicamente ou uma função lambda Python que retorna o Snapshot DataFrame a ser processado e a versão Snapshot. Consulte Implementar o argumento de origem.

Este parâmetro é necessário.

keys

Tipo: list

A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

  • Uma lista de strings: ["userId", "orderId"]

  • Uma lista de funções do Spark SQL col(): [col("userId"), col("orderId"]

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é necessário.

stored_as_scd_type

Tipo: str ou int

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2.

Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.

track_history_column_list

track_history_except_column_list

Tipo: list

Um subconjunto de colunas de saída a serem rastreadas para história na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilize track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de strings ou como funções Spark SQL col() : - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.

Implementar o argumento source

A função apply_changes_from_snapshot() inclui o argumento source. Para processar o Snapshot histórico, espera-se que o argumento source seja uma função lambda Python que retorne dois valores para a função apply_changes_from_snapshot(): um Python DataFrame contendo os dados Snapshot a serem processados e uma versão Snapshot.

A seguir, a assinatura da função lambda:

lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão mais recentemente processada do site Snapshot.

  • O valor de retorno da função lambda é None ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame que contém o Snapshot a ser processado. O segundo valor da tupla é a versão Snapshot que representa a ordem lógica do Snapshot.

Um exemplo que implementa e chama a função lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

O tempo de execução do Delta Live Tables executa os seguintes passos sempre que o pipeline que contém a função apply_changes_from_snapshot() é acionado:

  1. Execute a função next_snapshot_and_version para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot.

  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.

  3. Detecta as alterações no novo Snapshot e as aplica de forma incremental à tabela de destino.

  4. Retorna ao passo #1 para carregar o próximo Snapshot e sua versão.

Limitações

A interface Python do Delta Live Tables tem a seguinte limitação:

A função pivot() não é compatível. A pivot operação no Spark exige um carregamento rápido dos dados de entrada para calcular o esquema da saída. Esse recurso não é compatível com Delta Live Tables.