Referência da linguagem Python do Delta Live Tables
Este artigo fornece detalhes para a interface de programação Python do Delta Live Tables.
Para obter informações sobre a SQL API, consulte a referência da linguagem SQL Delta Live Tables.
Para obter detalhes específicos sobre a configuração do Auto Loader, consulte O que é o Auto Loader?.
Limitações
A interface do Python de Tabelas Delta Live tem as seguintes limitações:
O Python
table
eview
as funções devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Como as transformações do DataFrame são executadas após a resolução do gráfico de fluxo de dados completo, o uso dessas operações pode ter efeitos colaterais indesejados. Essas operações incluem funções comocollect()
count()
,toPandas()
,save()
esaveAsTable()
. No entanto, você pode incluir essas funções fora detable
nossas definições deview
função porque esse código é executado uma vez durante a fase de inicialização do gráfico.A função
pivot()
não é compatível. Apivot
operação no Spark exige um carregamento rápido dos dados de entrada para calcular o esquema da saída. Esse recurso não é compatível com Delta Live Tables.
Importar o módulo Python dlt
As funções do Python das Tabelas do Delta Live são definidas no módulo dlt
. Seus pipelines implementados com a API do Python devem importar este módulo:
import dlt
Criar uma exibição materializada Delta Live Tables ou tabela de transmissão
No Python, o Delta Live Tables determina se um dataset deve ser atualizado como uma exibição materializada ou uma tabela de transmissão com base na consulta definidora. O decorador @table
é usado para definir exibições materializadas e tabelas de transmissão.
Para definir uma visualização materializada em Python, aplique @table
a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de streaming, aplique @table
a uma consulta que executa uma leitura de transmissão em uma fonte de dados. Os dois tipos de datasets têm a mesma especificação de sintaxe, como segue:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Criar uma exibição de Delta Live Tables
Para definir uma visualização em Python, aplique o decorador do @view
. Como o decorador @table
, você pode usar exibições em Delta Live Tables para datasets estáticos ou de transmissão. A seguir está a sintaxe para definir visualizações com Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Exemplo: definir tabelas e exibições
Para definir uma tabela ou visualização em Python, aplique o decorador @dlt.view
ou @dlt.table
a uma função. Você pode utilizar o nome da função ou o parâmetro name
para atribuir a tabela ou visualizar o nome. O exemplo a seguir define dois datasets diferentes: uma visualização chamada taxi_raw
que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data
que usa a taxi_raw
exibição como entrada:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Exemplo: acessar um dataset definido no mesmo pipeline
Além de ler a partir de fontes de dados externas, você pode acessar datasets definidos no mesmo pipeline com a função Delta Live Tables read()
. O seguinte exemplo demonstra a criação de um dataset do customers_filtered
utilizando a função read()
:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
Você também pode utilizar a função spark.table()
para acessar um dataset definido no mesmo pipeline. Ao utilizar a função spark.table()
para acessar um dataset definido no pipeline, no argumento de função prepend a palavra-chave LIVE
ao nome do dataset:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Exemplo: Leitura de uma tabela cadastrada em um metastore
Para ler dados de uma tabela registrada no Hive metastore , omita a palavra-chave LIVE
no argumento da função e, opcionalmente, qualifique o nome da tabela com o nome do banco de dados:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Para obter um exemplo de leitura de uma tabela Unity Catalog , consulte Ingerir dados em um pipeline Unity Catalog .
Exemplo: acesse um dataset usando spark.sql
Você também pode retornar um dataset utilizando uma expressão do spark.sql
em uma função de consulta. Para ler de um dataset interno, acrescente o nome do dataset LIVE.
:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Escreva em uma tabela de transmissão de transmissão de origem múltipla
Visualização
O suporte do Delta Live Tables para @append_flow
está em visualização pública.
Você pode usar o decorador @append_flow
para gravar em uma tabela de transmissão de diversas fontes de transmissão para fazer o seguinte:
Adicione e remova fontes de transmissão que acrescentam dados a uma tabela de transmissão existente sem exigir uma refresh completa. Por exemplo, você pode ter uma tabela que combina dados regionais de cada região em que você opera. À medida que novas regiões são implementadas, você pode adicionar os dados da nova região à tabela sem realizar uma refresh completa.
Atualize uma tabela de transmissão anexando dados históricos faltantes (preenchimento). Por exemplo, você tem uma tabela de transmissão existente que é gravada por um tópico do Apache Kafka. Você também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de transmissão e não pode transmitir os dados porque precisa realizar uma agregação complexa antes de inserir os dados.
Para criar uma tabela de destino para os registros gerados pelo processamento @append_flow
, use a função create_streaming_table().
Observação
Se o senhor precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função create_streaming_table()
. O senhor não pode definir expectativas na definição @append_flow
.
A seguir está a sintaxe para @append_flow
:
import dlt
dlt.create_streaming_table("<target-table-name>")
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment") # optional
def <function-name>():
return (<streaming query>)
Exemplo: Escreva em uma tabela de transmissão de vários tópicos do Kafka
O exemplo a seguir cria uma tabela de transmissão chamada kafka_target
e grava nessa tabela de transmissão a partir de dois tópicos do Kafka:
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
Exemplo: execução de um preenchimento único de dados
O exemplo a seguir executa uma query para anexar dados históricos a uma tabela de transmissão:
Observação
Para garantir um preenchimento único e verdadeiro quando a query de preenchimento faz parte de um pipeline cuja execução é programada ou contínua, remova a query depois de executar o pipeline uma vez. Para anexar novos dados se eles chegarem ao diretório de preenchimento, deixe a query no lugar.
import dlt
@dlt.table()
def csv_target():
return spark.readStream.format("csv").load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream.format("csv").load("path/to/backfill/data/dir")
Criar uma tabela para ser usada como destino das operações de transmissão
Use a função create_streaming_table()
para criar uma tabela de destino para os registros de saída das operações de transmissão, inclusive os registros de saída apply_changes() e @append_flow.
Observação
As funções create_target_table()
e create_streaming_live_table()
estão obsoletas. A Databricks recomenda atualizar o código existente para usar a create_streaming_table()
função.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argumentos |
---|
Tipo: O nome da tabela. Este parâmetro é necessário. |
Tipo: Uma descrição opcional para a tabela. |
Tipo: Uma lista opcional de configurações do Spark para a execução desta consulta. |
Tipo: Uma lista opcional de propriedades da tabela da tabela. |
Tipo: Uma lista opcional de uma ou mais colunas para usar no particionamento da tabela. |
Tipo: Um local de armazenamento opcional para dados da tabela. Se não estiver definido, o sistema adotará como padrão o local de armazenamento do pipeline. |
Tipo: Uma definição de esquema opcional para a tabela. Esquemas podem ser definidos como uma string SQL DDL, ou com um Python |
Tipo: Restrições opcionais de qualidade de dados para a tabela. Veja múltiplas expectativas. |
Controle como as tabelas são materializadas
As tabelas também oferecem controle adicional de sua materialização:
Especifique como as tabelas são particionadas usando
partition_cols
. Você pode usar o particionamento para acelerar query.Você pode definir as propriedades da tabela ao definir uma view ou tabela. Consulte as propriedades da tabela Delta Live Tables.
Defina um local de armazenamento para os dados da tabela usando a configuração
path
. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline sepath
não estiver definido.Você pode usar colunas geradas em sua definição de esquema. Consulte Exemplo: Especifique um esquema e colunas de partição.
Observação
Para tabelas com menos de 1 TB, a Databricks recomenda deixar que o Delta Live Tables controle a organização de dados. A menos que você espere que sua tabela cresça além de um terabyte, geralmente não é recomendável especificar colunas de partição.
Exemplo: especificar um esquema e colunas de partição
Opcionalmente, você pode especificar um esquema de tabela utilizando uma string do Python StructType
ou SQL DDL. Quando especificada com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.
O exemplo a seguir cria uma tabela chamada sales
com um esquema especificado usando Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
O exemplo a seguir especifica o esquema para uma tabela usando uma string DDL, define uma coluna gerada e define uma coluna de partição:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Por padrão, Delta Live Tables infere o esquema da definição table
se você não especificar um esquema.
Configurar uma tabela de transmissão para ignorar alterações em uma tabela de transmissão de origem
Observação
O sinalizador
skipChangeCommits
funciona apenas comspark.readStream
usando a funçãooption()
. Você não pode usar este sinalizador em uma funçãodlt.read_stream()
.Você não pode usar o sinalizador
skipChangeCommits
quando a tabela de transmissão de origem estiver definida como destino de uma função apply_changes() .
Por default, as tabelas de transmissão exigem fontes somente anexadas. Quando uma tabela de transmissão usa outra tabela de transmissão como origem, e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, processamento de “direito ao esquecimento” do GDPR, o sinalizador skipChangeCommits
pode ser definido ao ler a tabela de transmissão de origem para ignorar essas mudanças. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Propriedades do Python Delta Live Tables
As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com Delta Live Tables:
@tabela ou @visualizar |
---|
Tipo: Um nome opcional para a tabela ou visualização. Se não estiver definida, o nome da função será usado como a tabela ou o nome de visualização. |
Tipo: Uma descrição opcional para a tabela. |
Tipo: Uma lista opcional de configurações do Spark para a execução desta consulta. |
Tipo: Uma lista opcional de propriedades da tabela da tabela. |
Tipo: Um local de armazenamento opcional para dados da tabela. Se não estiver definido, o sistema adotará como padrão o local de armazenamento do pipeline. |
Tipo: Uma coleção opcional, por exemplo, a |
Tipo: Uma definição de esquema opcional para a tabela. Esquemas podem ser definidos como uma string SQL DDL, ou com um Python |
Tipo: Criar uma tabela, mas não publicar os metadados da tabela. A palavra-chave O padrão é 'falso'. |
Definição de tabela ou exibição |
---|
Uma função do Python que define o dataset. Se o parâmetro |
Uma declaração Spark SQL que retorna um dataset Spark ou DataFrame Koalas. Use
Você também pode utilizar a função
Use Utilize a função Use a sintaxe do PySpark para definir consultas Delta Live Tables com Python. |
Expectativas |
---|
Declarar uma restrição de qualidade de dados identificada por |
Declarar uma restrição de qualidade de dados identificada por |
Declarar uma restrição de qualidade de dados identificada por |
Declare uma ou mais restrições de qualidade de dados. |
Declare uma ou mais restrições de qualidade de dados. |
Declare uma ou mais restrições de qualidade de dados. |
Altere a captura de dados com Python em Delta Live Tables
Use a função apply_changes()
na API Python para usar a funcionalidade CDC do Delta Live Tables. A interface Python do Delta Live Tables também fornece a função create_streaming_table(). O senhor pode usar essa função para criar a tabela de destino exigida pela função apply_changes()
.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Observação
O comportamento padrão para eventos de INSERT
e UPDATE
é realizar upsert de eventos do CDC a partir da fonte: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino.O tratamento de DELETE
eventos pode ser especificado com a APPLY AS DELETE WHEN
condição.
Importante
Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino do apply_changes
, você também deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados que o campo sequence_by
.
Consulte Captura simplificada de dados de alterações (CDC) com a API APPLY CHANGES em Delta Live Tables.
Argumentos |
---|
Tipo: O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função Este parâmetro é necessário. |
Tipo: A fonte de dados que contém os registros do CDC. Este parâmetro é necessário. |
Tipo: A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
Argumentos para funções Este parâmetro é necessário. |
Tipo: O nome da coluna que especifica a ordem lógica dos eventos de CDC nos dados de origem. O Delta Live Tables usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem. Você pode especificar:
Argumentos para funções Este parâmetro é necessário. |
Tipo: Permitir a ingestão de atualizações que contenham um subconjunto das colunas de destino. Quando um evento CDC corresponder a uma linha existente e Este parâmetro é opcional. O padrão é |
Tipo: Especifica quando um evento CDC deve ser tratado como um Você pode especificar:
Este parâmetro é opcional. |
Tipo: Especifica quando um evento de CDC deve ser tratado como uma tabela completa O parâmetro Você pode especificar:
Este parâmetro é opcional. |
Tipo: Um subconjunto de colunas a incluir na tabela de destino. Use
Argumentos para funções Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento |
Tipo: Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como Esta cláusula é opcional. O padrão é SCD tipo 1. |
Tipo: Um subconjunto de colunas de saída a serem rastreadas para história na tabela de destino. Use Argumentos para funções Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento |