Execute sua primeira carga de trabalho de Transmissão estruturada

Este artigo apresenta exemplos de código e explicação dos conceitos básicos necessários para executar suas primeiras consultas de Transmissão estruturada no Databricks. Você pode usar a Transmissão Estruturada para cargas de trabalho de processamento quase em tempo real e incrementais.

A Transmissão Estruturada é uma das várias tecnologias que alimentam as tabelas de transmissão no Delta Live Tables. A Databricks recomenda o uso de Delta Live Tables para todas as novas cargas de trabalho de ETL, ingestão e Transmissão estruturada. Consulte O que é Delta Live Tables?.

Observação

Embora as Delta Live Tables apresentem sintaxe ligeiramente modificada para declarar tabelas de transmissão, a sintaxe geral para configurar leituras e transformações de transmissão se aplica a todos os casos de uso de transmissão em Databricks. Delta Live Tables também simplifica a transmissão gerenciando informações de estado, metadados e inúmeras configurações.

Ler de um fluxo de dados

Você pode usar a Transmissão Estruturada para ingerir dados de fontes de dados compatíveis de forma incremental. Algumas das fontes de dados mais comuns usadas em cargas de trabalho de Transmissão Estruturada do Databricks incluem o seguinte:

  • Arquivos de dados no armazenamento de objetos na nuvem

  • Barramentos e filas de mensagens

  • Delta Lake

A Databricks recomenda o uso do Auto Loader para ingestão de transmissão a partir do armazenamento de objetos na nuvem. O Auto Loader oferece compatibilidade com a maioria dos formatos de arquivo compatíveis com a Transmissão Estruturada. Consulte O que é o Auto Loader? .

Cada fonte de dados oferece uma série de opções para especificar como carregar lotes de dados. Durante a configuração do leitor, as principais opções que você pode precisar definir se enquadram nas seguintes categorias:

  • Opções que especificam a fonte de dados ou o formato (por exemplo, tipo de arquivo, delimitadores e esquema).

  • Opções que configuram o acesso aos sistemas de origem (por exemplo, configurações de porta e credenciais).

  • Opções que especificam por onde começar em uma transmissão (por exemplo, deslocamentos de Kafka ou leitura de todos os arquivos existentes).

  • Opções que controlam a quantidade de dados processados em cada lote (por exemplo, offsets, arquivos ou bytes máximos por lote).

Use o Auto Loader para ler dados de transmissão do armazenamento de objetos

O exemplo a seguir demonstra o carregamento de dados JSON com o Auto Loader, que utiliza cloudFiles para denotar formato e opções. A opção schemaLocation habilita a inferência e a evolução do esquema. Cole o seguinte código em uma célula do notebook do Databricks e execute a célula para criar um DataFrame de stream denominado raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Tal como outras operações de leitura em Databricks, a configuração de uma leitura de transmissão, na verdade, não carrega dados. Você deve acionar uma ação nos dados antes do início da transmissão.

Observação

Chamando display() em uma transmissão DataFrame começar a transmissão Job. Para a maioria dos casos de uso estruturados de transmissão, a ação que aciona uma transmissão deve ser a gravação de dados em um coletor. Consulte Preparando seu código estruturado de transmissão para produção.

Executar uma transformação de transmissão

A Transmissão estruturada é compatível com a maioria das transformações disponíveis no Databricks e no Spark SQL. Você pode até mesmo carregar modelos MLflow como UDFs e fazer previsões de transmissão como uma transformação.

O exemplo de código a seguir completa uma transformação simples para enriquecer os dados JSON ingeridos com informações adicionais usando as funções do Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

O transformed_df resultante contém instruções de consulta para carregar e transformar cada registro à medida que ele chega à fonte de dados.

Observação

A Transmissão Estruturada trata as fontes de dados como conjuntos de dados ilimitados ou infinitos. Dessa forma, algumas transformações não são compatíveis com as cargas de trabalho de Transmissão Estruturada porque exigiriam a classificação de um número infinito de itens.

A maioria das agregações e muitas junções exigem o gerenciamento de informações de estado com marcas d'água, janelas e modo de saída. Consulte Aplicar marcas d'água para controlar os limites de processamento de dados.

Gravar em um coletor de dados

Um coletor de dados é o alvo de uma operação de gravação em transmissão. Os coletores comuns usados nas cargas de trabalho de transmissão do Databricks incluem o seguinte:

  • Delta Lake

  • Barramentos e filas de mensagens

  • Bancos de dados de valor chave

Assim como acontece com as fontes de dados, a maioria dos coletores de dados oferece várias opções para controlar como os dados são gravados no sistema de destino. Durante a configuração do gravador, as principais opções que você pode precisar definir se enquadram nas seguintes categorias:

  • Modo de saída (anexar por padrão).

  • Um local de ponto de verificação (obrigatório para cada gravador).

  • Intervalos de trigger; consulte Configurar intervalos de trigger de Transmissão Estruturada.

  • Opções que especificam o coletor de dados ou o formato (por exemplo, tipo de arquivo, delimitadores e esquema).

  • Opções que configuram o acesso aos sistemas de destino (por exemplo, configurações de porta e credenciais).

Execute uma gravação em lote incremental no Delta Lake

O exemplo a seguir grava no Delta Lake com um caminho de arquivo especificado e um ponto de verificação.

Importante

É importante que você sempre defina um local de ponto de verificação exclusivo para cada gravador de transmissão configurada. O ponto de verificação fornece a identidade exclusiva da sua transmissão, rastreando todos os registros processados e as informações de estado associadas à sua consulta de transmissão.

A configuração availableNow do acionador instrui a Transmissão Estruturada a processar todos os registros não processados anteriormente do conjunto de dados de origem e, em seguida, desligar, para você poder executar com segurança o código a seguir sem se preocupar em deixar uma transmissão em execução:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

Neste exemplo, nenhum novo registro chega em nossa fonte de dados, portanto a execução repetida desse código não ingere novos registros.

Aviso

A execução da Transmissão estruturada pode impedir que a rescisão automática desligue os recursos de computação. Para evitar custos inesperados, é importante que você encerre as consultas de transmissão.

Preparando seu código de Transmissão estruturada para produção

A Databricks recomenda o uso de tabelas Delta Live para a maioria das cargas de trabalho de Transmissão Estruturadas. As recomendações a seguir apresentam um ponto de partida para preparar cargas de trabalho de Transmissão Estruturada para produção:

  • Remova o código desnecessário dos notebooks que retornariam resultados, como display e count.

  • Não execute cargas de trabalho de Transmissão Estruturada em clusters interativos; sempre agende fluxos como jobs.

  • Para ajudar os jobs de transmissão a se recuperarem automaticamente, configure os trabalhos com repetições infinitas.

  • Não use o escalonamento automático para cargas de trabalho com Transmissão Estruturada.

Para mais recomendações, consulte Considerações de produção para Transmissão estruturada.

Ler dados do Delta Lake, transformar e gravar no Delta Lake

Delta Lake tem amplo suporte para trabalhar com transmissão estruturada tanto como fonte quanto como sumidouro. Veja tabela Delta transmissão de leituras e escritas.

O exemplo a seguir mostra a sintaxe de exemplo para carregar de forma incremental todos os novos registros de uma tabela Delta, uni-los a um instantâneo de outra tabela Delta e gravá-los em uma tabela Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Você deve ter permissões adequadas configuradas para ler tabelas de origem e gravar em tabelas de destino e no local do ponto de verificação especificado. Preencha todos os parâmetros indicados entre colchetes angulares (<>) usando os valores relevantes para suas fontes de dados e coletores.

Observação

O Delta Live Tables tem sintaxe totalmente declarativa para a criação de pipelines do Delta Lake e gerencia propriedades como acionadores e pontos de verificação automaticamente. Consulte O que são Delta Live Tables?

Ler dados do Kafka, transformar e gravar no Kafka

O Apache Kafka e outros barramentos de mensagens oferecem parte da menor latência disponível para grandes conjuntos de dados. Você pode usar o Databricks para aplicar transformações aos dados ingeridos do Kafka e, em seguida, gravar os dados de volta no Kafka.

Observação

A gravação de dados no armazenamento de objetos em nuvem adiciona sobrecarga de latência. Se você quiser armazenar dados de um barramento de mensagens no Delta Lake, mas precisar da menor latência possível para cargas de trabalho de transmissão, a Databricks recomenda configurar trabalhos de transmissão separados para ingerir dados no lakehouse e aplicar transformações quase em tempo real para os coletores de barramento de mensagens downstream.

O exemplo de código a seguir demonstra um padrão simples para enriquecer os dados do Kafka, juntando-os aos dados em uma tabela Delta e, em seguida, retornando ao Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Você deve ter as permissões adequadas configuradas para acessar seu serviço do Kafka. Preencha todos os parâmetros indicados entre colchetes angulares (<>) usando os valores relevantes para suas fontes de dados e coletores. Consulte Processamento de stream com Apache Kafka e Databricks.