Referência da linguagem Python do Delta Live Tables
Este artigo contém detalhes sobre a interface de programação Delta Live Tables Python .
Para obter informações sobre a SQL API, consulte a referência da linguagem SQL Delta Live Tables.
Para obter detalhes específicos sobre a configuração do Auto Loader, consulte O que é o Auto Loader?.
Antes de começar
A seguir, há considerações importantes quando o senhor implementa o pipeline com a interface Delta Live Tables Python :
Como as funções Python
table()
eview()
são invocadas várias vezes durante o planejamento e a execução de uma atualização pipeline, não inclua código em uma dessas funções que possa ter efeitos colaterais (por exemplo, código que modifique dados ou envie um email). Para evitar um comportamento inesperado, suas funções Python que definem o conjunto de dados devem incluir apenas o código necessário para definir a tabela ou view.Para realizar operações como o envio de e-mail ou a integração com um serviço de monitoramento externo, especialmente em funções que definem conjuntos de dados, use ganchos de eventos. A implementação dessas operações nas funções que definem seu conjunto de dados causará um comportamento inesperado.
O Python
table
eview
as funções devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Essas operações incluem funções comocollect()
count()
,toPandas()
,save()
esaveAsTable()
. Como as transformações do DataFrame são executadas após a resolução do gráfico de fluxo de dados completo, o uso dessas operações pode ter efeitos colaterais indesejados.
Importar o módulo Python dlt
As funções do Python das Tabelas do Delta Live são definidas no módulo dlt
. Seus pipelines implementados com a API do Python devem importar este módulo:
import dlt
Criar uma exibição materializada Delta Live Tables ou tabela de transmissão
Em Python, Delta Live Tables determina se deve atualizar uma dataset como uma tabela materializada view ou de transmissão com base na consulta de definição. O decorador @table
pode ser usado para definir tanto a exibição materializada quanto as tabelas de transmissão.
Para definir um view materializado em Python, aplique @table
a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de transmissão, aplique @table
a uma consulta que execute uma leitura de transmissão em uma fonte de dados ou use a função create_streaming_table(). Os dois tipos de dataset têm a mesma especificação de sintaxe, como segue:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Criar uma exibição de Delta Live Tables
Para definir uma visualização em Python, aplique o decorador do @view
. Como o decorador @table
, você pode usar exibições em Delta Live Tables para datasets estáticos ou de transmissão. A seguir está a sintaxe para definir visualizações com Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Exemplo: definir tabelas e exibições
Para definir uma tabela ou visualização em Python, aplique o decorador @dlt.view
ou @dlt.table
a uma função. Você pode utilizar o nome da função ou o parâmetro name
para atribuir a tabela ou visualizar o nome. O exemplo a seguir define dois datasets diferentes: uma visualização chamada taxi_raw
que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data
que usa a taxi_raw
exibição como entrada:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
Exemplo: acessar um dataset definido no mesmo pipeline
Observação
Embora as funções dlt.read()
e dlt.read_stream()
ainda estejam disponíveis e sejam totalmente compatíveis com a interface Python do Delta Live Tables, a Databricks recomenda sempre usar as funções spark.read.table()
e spark.readStream.table()
devido ao seguinte:
As funções do site
spark
suportam a leitura de conjuntos de dados internos e externos, inclusive conjuntos de dados em armazenamento externo ou definidos em outro pipeline. As funções do sitedlt
suportam apenas a leitura de conjuntos de dados internos.As funções
spark
suportam a especificação de opções, comoskipChangeCommits
, para operações de leitura. A especificação de opções não é suportada pelas funçõesdlt
.
Para acessar um dataset definido no mesmo pipeline, use as funções spark.read.table()
ou spark.readStream.table()
, acrescentando a palavra-chave LIVE
ao nome dataset:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
Exemplo: Leitura de uma tabela cadastrada em um metastore
Para ler dados de uma tabela registrada no site Hive metastore, no argumento da função, omita a palavra-chave LIVE
e, opcionalmente, qualifique o nome da tabela com o nome do banco de dados:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Para obter um exemplo de leitura de uma tabela Unity Catalog , consulte Ingerir dados em um pipeline Unity Catalog .
Exemplo: acesse um dataset usando spark.sql
Você também pode retornar um dataset utilizando uma expressão do spark.sql
em uma função de consulta. Para ler de um dataset interno, acrescente o nome do dataset LIVE.
:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Criar uma tabela para ser usada como destino das operações de transmissão
Use a função create_streaming_table()
para criar uma tabela de destino para os registros de saída das operações de transmissão, inclusive os registros de saída apply_changes(), apply_changes_from_snapshot() e @append_flow.
Observação
As funções create_target_table()
e create_streaming_live_table()
estão obsoletas. A Databricks recomenda atualizar o código existente para usar a create_streaming_table()
função.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumentos |
---|
Tipo: O nome da tabela. Este parâmetro é necessário. |
Tipo: Uma descrição opcional para a tabela. |
Tipo: Uma lista opcional de configurações do Spark para a execução desta consulta. |
Tipo: Uma lista opcional de propriedades da tabela da tabela. |
Tipo: Uma lista opcional de uma ou mais colunas para usar no particionamento da tabela. |
Tipo: Opcionalmente, ative o clustering líquido na tabela e defina as colunas a serem usadas como chave clustering. |
Tipo: Um local de armazenamento opcional para os dados da tabela. Se não for definido, o sistema usará como padrão o local de armazenamento pipeline. |
Tipo: Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como SQL DDL strings ou com Python |
Tipo: Restrições opcionais de qualidade de dados para a tabela. Veja múltiplas expectativas. |
Tipo: Uma cláusula opcional de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna. |
Controle como as tabelas são materializadas
As tabelas também oferecem controle adicional de sua materialização:
Especifique como as tabelas são particionadas usando
partition_cols
. Você pode usar o particionamento para acelerar query.Você pode definir as propriedades da tabela ao definir uma view ou tabela. Consulte as propriedades da tabela Delta Live Tables.
Defina um local de armazenamento para os dados da tabela usando a configuração
path
. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline sepath
não estiver definido.Você pode usar colunas geradas em sua definição de esquema. Consulte Exemplo: Especifique um esquema e colunas de partição.
Observação
Para tabelas com menos de 1 TB de tamanho, a Databricks recomenda deixar que o Delta Live Tables controle a organização dos dados. O senhor não deve especificar colunas de partição, a menos que espere que a tabela cresça além de um terabyte.
Exemplo: especificar um esquema e colunas de partição
Opcionalmente, você pode especificar um esquema de tabela utilizando uma string do Python StructType
ou SQL DDL. Quando especificada com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.
O exemplo a seguir cria uma tabela chamada sales
com um esquema especificado usando Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
O exemplo a seguir especifica o esquema para uma tabela usando uma string DDL, define uma coluna gerada e define uma coluna de partição:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Por padrão, Delta Live Tables infere o esquema da definição table
se você não especificar um esquema.
Configurar uma tabela de transmissão para ignorar alterações em uma tabela de transmissão de origem
Observação
O sinalizador
skipChangeCommits
funciona apenas comspark.readStream
usando a funçãooption()
. Você não pode usar este sinalizador em uma funçãodlt.read_stream()
.Você não pode usar o sinalizador
skipChangeCommits
quando a tabela de transmissão de origem estiver definida como destino de uma função apply_changes() .
Por default, as tabelas de transmissão exigem fontes somente anexadas. Quando uma tabela de transmissão usa outra tabela de transmissão como origem, e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, processamento de “direito ao esquecimento” do GDPR, o sinalizador skipChangeCommits
pode ser definido ao ler a tabela de transmissão de origem para ignorar essas mudanças. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Exemplo: Definir restrições de tabela
Visualização
As restrições de tabela estão na Pré-visualização pública.
Ao especificar um esquema, o senhor pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.
O exemplo a seguir define uma tabela com uma restrição primária e estrangeira key:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Exemplo: Definir um filtro de linha e uma máscara de coluna
Visualização
Os filtros de linha e as máscaras de coluna estão na Pré-visualização pública.
Para criar uma tabela materializada view ou de transmissão com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK. O exemplo a seguir demonstra como definir uma tabela materializada view e uma tabela de transmissão com um filtro de linha e uma máscara de coluna:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.
Propriedades do Python Delta Live Tables
As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com Delta Live Tables:
@tabela ou @visualizar |
---|
Tipo: Um nome opcional para a tabela ou visualização. Se não estiver definida, o nome da função será usado como a tabela ou o nome de visualização. |
Tipo: Uma descrição opcional para a tabela. |
Tipo: Uma lista opcional de configurações do Spark para a execução desta consulta. |
Tipo: Uma lista opcional de propriedades da tabela da tabela. |
Tipo: Um local de armazenamento opcional para os dados da tabela. Se não for definido, o sistema usará como padrão o local de armazenamento pipeline. |
Tipo: Uma coleção opcional, por exemplo, um |
Tipo: Opcionalmente, ative o clustering líquido na tabela e defina as colunas a serem usadas como chave clustering. |
Tipo: Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como SQL DDL strings ou com Python |
Tipo: Crie uma tabela, mas não publique metadados para a tabela. A palavra-chave O padrão é 'falso'. |
Tipo: Uma cláusula opcional de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna. |
Definição de tabela ou exibição |
---|
Uma função do Python que define o dataset. Se o parâmetro |
Uma declaração Spark SQL que retorna um dataset Spark ou DataFrame Koalas. Use Quando o senhor usar a função
Você também pode utilizar a função
Use Para definir uma consulta em uma função Delta Live Tables |
Expectativas |
---|
Declarar uma restrição de qualidade de dados identificada por |
Declarar uma restrição de qualidade de dados identificada por |
Declarar uma restrição de qualidade de dados identificada por |
Declare uma ou mais restrições de qualidade de dados. |
Declare uma ou mais restrições de qualidade de dados. |
Declare uma ou mais restrições de qualidade de dados. |
captura de dados de alterações (CDC) de um feed de alterações com Python em Delta Live Tables
Use a função apply_changes()
no site Python API para usar a funcionalidade Delta Live Tables captura de dados de alterações (CDC) (CDC) para processar dados de origem de um feed de dados de alterações (CDF).
Importante
O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, o senhor pode especificar o esquema da tabela de destino. Ao especificar o esquema da tabela de destino apply_changes()
, o senhor deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados dos campos sequence_by
.
Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface Python do Delta Live Tables.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Observação
Para o processamento de APPLY CHANGES
, o comportamento do default para eventos INSERT
e UPDATE
é fazer upsert de eventos CDC da origem: atualizar todas as linhas da tabela de destino que correspondam ao(s) key(s) especificado(s) ou inserir uma nova linha quando não houver um registro correspondente na tabela de destino. O tratamento de eventos DELETE
pode ser especificado com a condição APPLY AS DELETE WHEN
.
Para saber mais sobre o processamento do CDC com um feed de alterações, consulte APLICAR ALTERAÇÕES APIs: Simplifique a captura de dados de alterações (CDC) com Delta Live Tables. Para ver um exemplo de uso da função apply_changes()
, consulte Exemplo: Processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.
Importante
O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, o senhor pode especificar o esquema da tabela de destino. Ao especificar o esquema da tabela de destino apply_changes
, o senhor deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados que o campo sequence_by
.
Consulte a seção APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com Delta Live Tables.
Argumentos |
---|
Tipo: O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função Este parâmetro é necessário. |
Tipo: A fonte de dados que contém os registros do CDC. Este parâmetro é necessário. |
Tipo: A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
Argumentos para funções Este parâmetro é necessário. |
Tipo: O nome da coluna que especifica a ordem lógica dos eventos de CDC nos dados de origem. O Delta Live Tables usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem. Você pode especificar:
Argumentos para funções A coluna especificada deve ser um tipo de dados classificável. Este parâmetro é necessário. |
Tipo: Permitir a ingestão de atualizações que contenham um subconjunto da coluna de destino. Quando um evento CDC corresponde a uma linha existente e Este parâmetro é opcional. O padrão é |
Tipo: Especifica quando um evento CDC deve ser tratado como Você pode especificar:
Este parâmetro é opcional. |
Tipo: Especifica quando um evento de CDC deve ser tratado como uma tabela completa O parâmetro Você pode especificar:
Este parâmetro é opcional. |
Tipo: Um subconjunto de colunas a incluir na tabela de destino. Use
Argumentos para funções Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento |
Tipo: Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como Esta cláusula é opcional. O padrão é SCD tipo 1. |
Tipo: Um subconjunto de colunas de saída a serem rastreadas para história na tabela de destino. Use Argumentos para funções Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento |
captura de dados de alterações (CDC) do banco de dados Snapshot com Python em Delta Live Tables
Visualização
A API APPLY CHANGES FROM SNAPSHOT
está em pré-visualização pública.
Use a função apply_changes_from_snapshot()
no site Python API para usar a funcionalidade Delta Live Tables captura de dados de alterações (CDC) (CDC) para processar os dados de origem do banco de dados Snapshot.
Importante
Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino do apply_changes_from_snapshot()
, você também deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados que o campo sequence_by
.
Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface Python do Delta Live Tables.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Observação
Para o processamento APPLY CHANGES FROM SNAPSHOT
, o comportamento do default é inserir uma nova linha quando não houver um registro correspondente com o mesmo key(s) no destino. Se houver um registro correspondente, ele será atualizado somente se algum dos valores da linha tiver sido alterado. As linhas com chave presentes no destino, mas não mais presentes na origem, são excluídas.
Para saber mais sobre o processamento do CDC com o Snapshot, consulte APLICAR ALTERAÇÕES APIs: Simplifique a captura de dados de alterações (CDC) com Delta Live Tables. Para obter exemplos de uso da função apply_changes_from_snapshot()
, consulte os exemplos de ingestão periódica em Snapshot e ingestão histórica em Snapshot .
Argumentos |
---|
Tipo: O nome da tabela a ser atualizada. O senhor pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função Este parâmetro é necessário. |
Tipo: O nome de uma tabela ou view para Snapshot periodicamente ou uma função lambda Python que retorna o Snapshot DataFrame a ser processado e a versão Snapshot. Consulte Implementar o argumento de origem. Este parâmetro é necessário. |
Tipo: A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
Argumentos para funções Este parâmetro é necessário. |
Tipo: Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como Esta cláusula é opcional. O padrão é SCD tipo 1. |
Tipo: Um subconjunto de colunas de saída a serem rastreadas para história na tabela de destino. Use Argumentos para funções Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento |
Implementar o argumento source
A função apply_changes_from_snapshot()
inclui o argumento source
. Para processar o Snapshot histórico, espera-se que o argumento source
seja uma função lambda Python que retorne dois valores para a função apply_changes_from_snapshot()
: um Python DataFrame contendo os dados Snapshot a serem processados e uma versão Snapshot.
A seguir, a assinatura da função lambda:
lambda Any => Optional[(DataFrame, Any)]
O argumento para a função lambda é a versão mais recentemente processada do site Snapshot.
O valor de retorno da função lambda é
None
ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame que contém o Snapshot a ser processado. O segundo valor da tupla é a versão Snapshot que representa a ordem lógica do Snapshot.
Um exemplo que implementa e chama a função lambda:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
O tempo de execução do Delta Live Tables executa os seguintes passos sempre que o pipeline que contém a função apply_changes_from_snapshot()
é acionado:
Execute a função
next_snapshot_and_version
para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot.Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
Detecta as alterações no novo Snapshot e as aplica de forma incremental à tabela de destino.
Retorna ao passo #1 para carregar o próximo Snapshot e sua versão.