registros de transmissão para serviço externo com Delta Live Tables sinks
Prévia
A API Delta Live Tables sink
está em visualização pública.
Este artigo descreve o Delta Live Tables sink
API e como usá-lo com fluxos DLT para gravar registros transformados por um pipeline em um coletor de dados externo, como Unity Catalog gerenciar e tabelas externas, Hive metastore tabelas e serviços de transmissão de eventos, como Apache Kafka ou Azure Event Hubs.
O que são as pias Delta Live Tables?
Delta Live Tables Os sinks permitem que o senhor grave dados transformados em destinos como o serviço de transmissão de eventos, como Apache Kafka ou Azure Event Hubs, e tabelas externas gerenciadas por Unity Catalog ou Hive metastore. Anteriormente, as tabelas de transmissão e a visualização materializada criadas em um Delta Live Tables pipeline podiam ser persistidas apenas para Databricks gerenciar Delta tabelas. Usando sinks, o senhor agora tem mais opções para manter a saída do pipeline Delta Live Tables.
Quando devo usar as pias Delta Live Tables?
A Databricks recomenda o uso de Delta Live Tables sinks se o senhor precisar:
Desenvolva um caso de uso operacional, como detecção de fraude, análise de tempo real e recomendações de clientes. Os casos de uso operacional normalmente leem dados de um barramento de mensagens, como um tópico do Apache Kafka, e depois processam os dados com baixa latência e gravam os registros processados de volta em um barramento de mensagens. Essa abordagem permite que você obtenha menor latência ao não escrever ou ler no armazenamento em nuvem.
Grave dados transformados de seus fluxos Delta Live Tables em tabelas gerenciadas por uma instância externa Delta, incluindo Unity Catalog gerenciar e tabelas externas e Hive metastore tabelas.
Execute o processo reverso extrair, transformar, carregar (ETL) em coletores externos a Databricks, como os tópicos Apache Kafka . Essa abordagem permite que o senhor ofereça suporte eficaz a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas Unity Catalog ou de outro armazenamento Databricks-gerenciar.
Como posso usar as pias Delta Live Tables?
Observação
Somente as consultas de transmissão que usam
spark.readStream
edlt.read_stream
são compatíveis. não há suporte para consultas de lotes.Somente
append_flow
pode ser usado para gravar em coletores. Outros fluxos, comoapply_changes
, não são suportados.A execução de uma atualização completa do site refresh não limpa os dados de resultados de computação anteriores nos sinks. Isso significa que todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
À medida que os dados do evento são ingeridos de uma fonte de transmissão para o seu site Delta Live Tables pipeline, o senhor processa e refina essa funcionalidade de uso de dados Delta Live Tables e, em seguida, usa o processamento de fluxo de acréscimo para transmitir os registros de dados transformados para um sink Delta Live Tables. Você cria esse coletor usando a função create_sink()
. Para obter mais detalhes sobre o uso da função create_sink
, consulte a referência da API do sink.
Para implementar um sumidouro Delta Live Tables, siga as etapas abaixo:
Configure um Delta Live Tables pipeline para processar os dados do evento de transmissão e preparar os registros de dados para gravação em um sink Delta Live Tables.
Configure e crie o coletor do Delta Live Tables para usar o formato de coletor de destino preferencial.
Use um fluxo de acréscimo para gravar os registros preparados no coletor.
Essas etapas são abordadas no restante do tópico.
Configure um pipeline Delta Live Tables para preparar registros para gravação em um sink
A primeira etapa é configurar um Delta Live Tables pipeline para transformar os dados brutos de transmissão de eventos nos dados preparados que o senhor gravará no coletor.
Para entender melhor esse processo, o senhor pode seguir este exemplo de um pipeline do Delta Live Tables que processa dados de eventos de fluxo de cliques dos dados de amostra wikipedia-datasets
no Databricks. Este pipeline analisa o dataset bruto para identificar as páginas da Wikipédia que têm um link para uma página de documentação Apache Spark e refina progressivamente esses dados para apenas as linhas da tabela em que o link de referência contém Apache_Spark.
Neste exemplo, o pipeline do Delta Live Tables é estruturado usando a arquitetura de medalhão, que organiza os dados em diferentes camadas para melhorar a qualidade e a eficiência do processamento.
Para começar, carregue os registros brutos do JSON do dataset em sua camada de bronze usando Auto Loader. Este código Python demonstra como criar uma tabela de transmissão chamada clickstream_raw
, que contém os dados brutos e não processados da fonte:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
Após a execução desse código, os dados estão agora no nível "bronze" (ou "dados brutos") da arquitetura do Medallion e devem ser limpos. A próxima etapa refina os dados até o nível "prata", o que envolve a limpeza dos tipos de dados e nomes de colunas e o uso do site Delta Live Tables expectations para garantir a integridade dos dados.
O código a seguir demonstra como fazer isso limpando e validando os dados da camada de bronze na tabela prata clickstream_clean
:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Para desenvolver a camada "ouro" de sua estrutura de pipeline, o senhor filtra os dados limpos do fluxo de cliques para isolar as entradas em que a página de referência é Apache_Spark
. Neste último exemplo de código, você seleciona somente as colunas necessárias para gravar na tabela de coletores de destino.
O código a seguir ilustra como criar uma tabela chamada spark_referrers
que representa a camada de ouro:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Depois que esse processo de preparação de dados for concluído, você deverá configurar os coletores de destino nos quais os registros limpos serão gravados.
Configurar um sink do Delta Live Tables
Databricks suporta três tipos de sumidouros de destino nos quais o usuário grava os registros processados a partir dos dados de transmissão:
Pias de mesa Delta
Sumidouros do Apache Kafka
Sumidouros dos Hubs de Eventos do Azure
abaixo são exemplos de configurações para Delta, Kafka e Azure Hubs de eventos:
Para criar um Delta sink por caminho de arquivo:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Para criar um Delta sink por nome de tabela usando um catálogo totalmente qualificado e um caminho de esquema:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Esse código funciona para os sinks do Apache Kafka e do Azure Event Hubs.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"subscribe": "dlt-sink",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Agora que o sink está configurado e o Delta Live Tables pipeline está preparado, o senhor pode começar a transmitir os registros processados para o sink.
Gravar em um coletor do Delta Live Tables com um fluxo de acréscimo
Com seu coletor configurado, a próxima etapa é gravar registros processados nele especificando-o como o destino para a saída de registros por um fluxo de acréscimo. Você faz isso especificando seu coletor como o valor target
no decorador append_flow
.
Para Unity Catalog gerenciar e tabelas externas, use o formato
delta
e especifique o caminho ou o nome da tabela nas opções. Seu pipeline Delta Live Tables deve ser configurado para usar o Unity Catalog.Para os tópicos do Apache Kafka , use o formato
kafka
e especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções que a Spark transmissão estructurada Kafka suporta para a pia. Consulte Configurar o gravador Kafka transmissão estructurada.Para Azure Event Hubs, use o formato
kafka
e especifique o nome do Event Hubs, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções suportadas em um sink de Hubs de Eventos de transmissão estruturada Spark que usa a interface Kafka. Veja a autenticação de entidade de serviço com Microsoft Entra ID e Azure Event Hubs.Para as tabelas do site Hive metastore, use o formato
delta
e especifique o caminho ou o nome da tabela nas opções. Seu pipeline Delta Live Tables deve ser configurado para usar o Hive metastore.
Abaixo estão exemplos de como configurar fluxos para gravar em Delta, Kafka e Azure Event Hubs sinks com registros processados pelo seu Delta Live Tables pipeline.
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
O parâmetro value
é obrigatório para um coletor do Azure Event Hubs. Parâmetros adicionais, como key
, partition
, headers
e topic
, são opcionais.
Para obter mais detalhes sobre o decorador append_flow
, consulte Usar o fluxo append para gravar em uma tabela de transmissão a partir de várias fontes de transmissão.
Limitações
Somente a API Python é compatível. Não há suporte para SQL.
Somente as consultas de transmissão que usam
spark.readStream
edlt.read_stream
são compatíveis. não há suporte para consultas de lotes.Somente
append_flow
pode ser usado para gravar em coletores. Não há suporte para outros fluxos, comoapply_changes
, e o senhor não pode usar um sink em uma definição de Delta Live Tables dataset . Por exemplo, o seguinte não é suportado:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
Para Delta sinks, o nome da tabela deve ser totalmente qualificado. Especificamente, para Unity Catalog gerenciar tabelas externas, o nome da tabela deve ter o formato
<catalog>.<schema>.<table>
. Para o Hive metastore, ele deve estar no formato<schema>.<table>
.A execução de
FullRefresh
não limpará os dados de resultados de computação anteriores nos coletores. Isso significa que todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.Não há suporte para as expectativas do Delta Live Tables.