Tutorial: Executar seu primeiro pipeline Delta Live Tables

Este tutorial mostra aos senhores como configurar um Delta Live Tables pipeline a partir do código em um Databricks Notebook e executar o pipeline acionando uma atualização pipeline. Este tutorial inclui um exemplo pipeline para ingerir e processar uma amostra dataset com código de exemplo usando o Python e SQL interfaces. O senhor também pode usar as instruções deste tutorial para criar um pipeline com qualquer Notebook com a sintaxe Delta Live Tables definida corretamente.

O senhor pode configurar o pipeline Delta Live Tables e acionar atualizações usando a interface do usuário Databricks workspace ou opções de ferramentas automatizadas, como API, CLI, Databricks ativo Bundles, ou como uma tarefa em um fluxo de trabalho Databricks. Para se familiarizar com a funcionalidade e o recurso do Delta Live Tables, o Databricks recomenda usar primeiro a interface do usuário para criar e executar o pipeline. Além disso, quando o senhor configura um pipeline na interface do usuário, o Delta Live Tables gera uma configuração JSON para o pipeline, que pode ser usada para implementar seu fluxo de trabalho programático.

Para demonstrar a funcionalidade do Delta Live Tables, os exemplos deste tutorial download estão disponíveis publicamente dataset. No entanto, o site Databricks tem várias maneiras de se conectar à fonte de dados e ingerir dados que o pipeline que implementa casos de uso no mundo real usará. Consulte Ingerir dados com o Delta Live Tables.

Requisitos

  • Para iniciar um pipeline, o senhor deve ter permissão de criação de clusters ou acesso a uma política de cluster que defina clusters do Delta Live Tables. O tempo de execução do Delta Live Tables cria um cluster antes de executar o pipeline e falha se o senhor não tiver a permissão correta.

  • Para usar os exemplos neste tutorial, seu workspace deve ter o Unity Catalog habilitado.

  • O senhor deve ter as seguintes permissões no Unity Catalog:

    • READ VOLUME e WRITE VOLUME, ou ALL PRIVILEGES, para o volume my-volume.

    • USE SCHEMA ou ALL PRIVILEGES para o esquema default.

    • USE CATALOG ou ALL PRIVILEGES para o catálogo main.

    Para definir essas permissões, consulte o administrador do Databricks ou os privilégios e objetos protegidos do Unity Catalog.

  • Os exemplos deste tutorial usam um volume do Unity Catalog para armazenar dados de amostra. Para usar esses exemplos, crie um volume e use o catálogo, o esquema e os nomes de volume desse volume para definir o caminho do volume usado pelos exemplos.

Observação

Se o seu workspace não tiver o Unity Catalog ativado, o Notebook com exemplos que não exigem o Unity Catalog está anexado a este artigo. Para usar esses exemplos, selecione Hive metastore como a opção de armazenamento quando o senhor criar o pipeline.

Onde o senhor executa Delta Live Tables consultas?

Delta Live Tables As consultas são implementadas principalmente no Databricks Notebook, mas o Delta Live Tables não foi projetado para ser executado interativamente nas células do Notebook. A execução de uma célula que contém a sintaxe Delta Live Tables em um Databricks Notebook resulta em uma mensagem de erro. Para executar suas consultas, o senhor deve configurar seu Notebook como parte de um pipeline.

Importante

  • O senhor não pode confiar na ordem de execução célula a célula do Notebook ao escrever consultas para Delta Live Tables. Delta Live Tables avalia e executa todo o código definido no Notebook, mas tem um modelo de execução diferente de um Notebook que executa todo o comando.

  • O senhor não pode misturar idiomas em um único arquivo de código-fonte do Delta Live Tables. Por exemplo, um Notebook pode conter apenas consultas Python ou consultas SQL. Se o senhor precisar usar vários idiomas em um pipeline, use o Notebook ou arquivos específicos de vários idiomas no pipeline.

O senhor também pode usar o código Python armazenado em arquivos. Por exemplo, é possível criar um módulo Python que pode ser importado para o pipeline Python ou definir funções definidas pelo usuário (UDFs) Python para uso em consultas SQL. Para saber mais sobre a importação de módulos Python, consulte Importar módulos Python de pastas Git ou arquivos de espaço de trabalho. Para saber mais sobre como usar as UDFs do Python, consulte Funções escalares definidas pelo usuário - Python.

Exemplo: Ingerir e processar dados de nomes de bebês de Nova York

O exemplo deste artigo usa um site disponível publicamente dataset que contém registros de nomes de bebês do Estado de Nova York. Esses exemplos demonstram o uso de um pipeline do Delta Live Tables para:

  • Leia os dados brutos do site CSV de um site disponível publicamente dataset em uma tabela.

  • Leia os registros da tabela de dados brutos e use as expectativas do Delta Live Tables para criar uma nova tabela que contenha dados limpos.

  • Use os registros limpos como entrada para as consultas Delta Live Tables que criam conjuntos de dados derivados.

Este código demonstra um exemplo simplificado da arquitetura medalhão. Consulte Qual é a arquitetura medalhão do lakehouse?.

As implementações desse exemplo são fornecidas para as interfaces Python e SQL. O senhor pode seguir os passos para criar um novo Notebook que contenha o código de exemplo, ou pode pular para Create a pipeline e usar um dos Notebooks fornecidos nesta página.

Implementar um pipeline do Delta Live Tables com Python

Python O código que cria o conjunto de dados Delta Live Tables deve retornar DataFrames. Para usuários não familiarizados com Python e DataFrames, a Databricks recomenda o uso da interface SQL. Consulte Implementar um pipeline Delta Live Tables com SQL.

Todas as APIs Python do Delta Live Tables são implementadas no módulo dlt. Seu código Delta Live Tables pipeline implementado com Python deve importar explicitamente o módulo dlt na parte superior do Python Notebook e dos arquivos. Delta Live Tables difere de muitos scripts Python de uma key maneira: o senhor não chama as funções que executam a ingestão de dados e as transformações para criar o conjunto de dados Delta Live Tables. Em vez disso, o Delta Live Tables interpreta as funções de decoração do módulo dlt em todos os arquivos carregados em um pipeline e cria um gráfico de fluxo de dados.

Para implementar o exemplo neste tutorial, copie e cole o seguinte código Python em um novo Python Notebook. Adicione cada trecho de código de exemplo à sua própria célula no site Notebook na ordem descrita. Para revisar as opções de criação de Notebook, consulte Create a Notebook.

Quando o senhor cria um pipeline com a interface Python, por default, os nomes das tabelas são definidos por nomes de funções. Por exemplo, o exemplo Python a seguir cria três tabelas denominadas baby_names_raw, baby_names_prepared e top_baby_names_2021. O senhor pode substituir o nome da tabela usando o parâmetro name. Consulte Criar uma tabela Delta Live Tables materializada view ou de transmissão.

Importante

Para evitar um comportamento inesperado na execução do pipeline, não inclua código que possa ter efeitos colaterais nas funções que definem o conjunto de dados. Para saber mais, consulte a referência do Python.

Importar o módulo do Delta Live Tables

Todas as APIs do Python do Delta Live Tables são implementadas no módulo dlt. Importe explicitamente o módulo dlt no topo de notebooks e arquivos Python.

O exemplo a seguir mostra esta importação, juntamente com as declarações de importação para pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Faça o download dos dados

Para obter os dados deste exemplo, o senhor faz o download de um arquivo CSV e o armazena no volume da seguinte forma:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Substitua <catalog-name>, <schema-name> e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Unity Catalog.

Criar uma tabela com os arquivos do armazenamento de objetos

O Delta Live Tables suporta o carregamento de dados de todos os formatos suportados pela Databricks. Consulte Opções de formato de dados.

O decorador @dlt.table diz ao Delta Live Tables para criar uma tabela que contenha o resultado de um DataFrame retornado por uma função. Adicione o decorador @dlt.table antes de qualquer definição de função Python que retorne um Spark DataFrame para registrar uma nova tabela em Delta Live Tables. O exemplo a seguir demonstra o uso do nome da função como o nome da tabela e a adição de um comentário descritivo à tabela:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Adicionar uma tabela de um dataset upstream no pipeline

você pode usar dlt.read() para ler dados de outros conjuntos de dados declarados em seu pipeline atual do Delta Live Tables. Declarar novas tabelas dessa forma cria uma dependência que o Delta Live Tables resolve automaticamente antes de executar as atualizações. O código a seguir também inclui exemplos de monitoramento e aplicação da qualidade dos dados com expectativas. Consulte Gerenciar a qualidade de dados com o Delta Live Tables.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Criar uma tabela com exibições com dados enriquecidos

Como o Delta Live Tables processam atualizações de pipelines como uma série de gráficos de dependência, você pode declarar visualizações altamente enriquecidas que potencializam dashboards, BI e funções analíticas, declarando tabelas com lógica de negócios específica.

As tabelas em Delta Live Tables são conceitualmente equivalentes à visualização materializada. Diferentemente da visualização tradicional em Spark, que executa a lógica cada vez que o view é consultado, uma tabela Delta Live Tables armazena a versão mais recente dos resultados da consulta em arquivos de dados. Como o Delta Live Tables gerencia as atualizações de todos os conjuntos de dados em um pipeline, o senhor pode programar as atualizações do pipeline para atender aos requisitos de latência da visualização materializada e saber que as consultas a essas tabelas contêm a versão mais recente dos dados disponíveis.

A tabela definida pelo seguinte código demonstra a semelhança conceitual com uma exibição materializada derivada de dados upstream em seu pipeline:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Para configurar um pipeline que usa o Notebook, consulte Criar um pipeline.

Implementar um pipeline Delta Live Tables com SQL

Databricks recomenda Delta Live Tables com SQL como a forma preferida para os usuários de SQL criarem novos pipelines ETL, ingestão e transformações em Databricks. A interface SQL para Delta Live Tables amplia o Spark SQL padrão com muitas novas palavras-chave, construções e funções com valor de tabela. Essas adições ao padrão SQL permitem que os usuários declarem dependências entre o conjunto de dados e a infraestrutura de nível de produção implantada sem aprender novas ferramentas ou conceitos adicionais.

Para os usuários familiarizados com o Spark DataFrames e que precisam de suporte para testes mais extensos e operações que são difíceis de implementar com SQL, como operações de metaprogramação, a Databricks recomenda o uso da interface Python. Consulte Implementar um pipeline do Delta Live Tables com Python.

Faça o download dos dados

Para obter os dados para este exemplo, copie o código a seguir, cole-o em um novo Notebook e, em seguida, execute o Notebook. Para revisar as opções de criação de Notebook, consulte Create a Notebook.

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Substitua <catalog-name>, <schema-name> e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Unity Catalog.

Criar uma tabela a partir de arquivos no Unity Catalog

Para o restante deste exemplo, copie os seguintes trechos de SQL e cole-os em um novo SQL Notebook, separado do Notebook da seção anterior. Adicione cada snippet do exemplo SQL à sua própria célula no Notebook na ordem descrita.

O Delta Live Tables suporta o carregamento de dados de todos os formatos suportados pela Databricks. Consulte Opções de formato de dados.

Todas as instruções SQL do Delta Live Tables usam a sintaxe e a semântica CREATE OR REFRESH. Quando você atualiza um pipeline, o Delta Live Tables determina se o resultado logicamente correto da tabela pode ser obtido por meio do processamento incremental ou se é necessária uma recomputação completa.

O exemplo a seguir cria uma tabela carregando dados do arquivo CSV armazenado no volume do Unity Catalog:

CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Substitua <catalog-name>, <schema-name> e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Unity Catalog.

Adicionar uma tabela de um dataset upstream ao pipeline

O senhor pode usar o esquema virtual live para consultar dados de outros conjuntos de dados declarados em seu site atual Delta Live Tables pipeline. Declarar novas tabelas dessa forma cria uma dependência que o Delta Live Tables resolve automaticamente antes de executar as atualizações. O esquema live é uma palavra-chave personalizada implementada em Delta Live Tables que pode ser substituída por um esquema de destino se o senhor quiser publicar seu conjunto de dados. Consulte Usar Unity Catalog com seu pipeline Delta Live Tables e Publicar dados de Delta Live Tables para o Hive metastore.

O código a seguir também inclui exemplos de monitoramento e aplicação da qualidade dos dados com expectativas. Consulte Gerenciar a qualidade dos dados com o Delta Live Tables.

CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Crie uma exibição de dados enriquecida

Como o Delta Live Tables processam atualizações de pipelines como uma série de gráficos de dependência, você pode declarar visualizações altamente enriquecidas que potencializam dashboards, BI e funções analíticas, declarando tabelas com lógica de negócios específica.

A consulta a seguir usa um view materializado para criar um view enriquecido a partir dos dados upstream. Diferentemente da visualização tradicional no site Spark, que executa a lógica cada vez que o site view é consultado, a visualização materializada armazena a versão mais recente dos resultados da consulta em arquivos de dados. Como o Delta Live Tables gerencia as atualizações de todos os conjuntos de dados em um pipeline, o senhor pode programar as atualizações do pipeline para atender aos requisitos de latência da visualização materializada e saber que as consultas a essas tabelas contêm a versão mais recente dos dados disponíveis.

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Para configurar um pipeline que usa o Notebook, continue em Create a pipeline.

Criar um pipeline

O Delta Live Tables cria o pipeline resolvendo as dependências definidas no Notebook ou nos arquivos (chamados de código-fonte ou biblioteca) usando a sintaxe do Delta Live Tables. Cada arquivo de código-fonte pode conter apenas um idioma, mas o senhor pode misturar bibliotecas de diferentes idiomas em seu pipeline.

  1. Clique em Delta Live Tables na barra lateral e clique em Create pipeline.

  2. Dê um nome ao pipeline.

  3. (Opcional) Selecione uma edição do produto.

  4. Selecione Acionado para Modo Pipeline.

  5. Configure um ou mais notebooks que contenham o código-fonte do pipeline. Na caixa de texto Paths (Caminhos ), digite o caminho para um Notebook ou clique em Ícone do seletor de arquivos para selecionar um Notebook.

  6. Selecione um destino para o conjunto de dados publicado pelo pipeline: Hive metastore ou o Unity Catalog. Consulte Publicar conjunto de dados.

    • Hive metastore:

    • (Opcional) Digite um Local de armazenamento para os dados de saída do pipeline. O sistema utiliza um local padrão se você deixar Local de armazenamento vazio.

    • (Opcional) Especifique um esquema de destino para publicar seu dataset no Hive metastore.

    • Unity Catalog: Especifique um catálogo e um esquema de destino para publicar seu dataset em Unity Catalog.

  7. (Opcional) Clique em Adicionar notificação para configurar um ou mais endereços email para receber notificações de eventos de pipeline. Consulte Adicionar notificações por email para eventos de pipeline.

  8. (Opcional) Defina as configurações avançadas do pipeline. Para saber mais sobre as opções de configurações avançadas, consulte Configurar as definições de pipeline para Delta Live Tables.

  9. Clique em Criar.

A página Detalhes do pipeline é exibida depois que o senhor clica em Criar. O senhor também pode acessar o site pipeline clicando no nome pipeline no menu Delta Live Tables tab.

Iniciar uma atualização do pipeline

Para começar uma atualização para um pipeline, clique no Ícone de início das mesas dinâmicas Delta botão no painel superior. O sistema retorna uma mensagem confirmando que seu pipeline está iniciando.

Depois de iniciar com êxito a atualização, o sistema Delta Live Tables:

  1. começar a clusters usando uma configuração clusters criada pelo sistema Delta Live Tables. Você também pode especificar uma configuraçãoclusters personalizada.

  2. Cria tabelas que não existem e garante que o esquema esteja correto para as tabelas existentes.

  3. Atualiza tabelas com os dados mais recentes disponíveis.

  4. Desliga o cluster quando a atualização é concluída.

Observação

O modo de execução é definido como Produção por default, o que aprimora os recursos compute efêmera para cada atualização. Você pode usar o modo de desenvolvimento para alterar esse comportamento, permitindo que os mesmos recursos compute sejam usados para várias atualizações de pipeline durante o desenvolvimento e o teste. Consulte Modos de desenvolvimento e produção.

Publicar datasets

O senhor pode disponibilizar o conjunto de dados Delta Live Tables para consulta publicando tabelas em Hive metastore ou Unity Catalog. Se o senhor não especificar um destino para a publicação de dados, as tabelas criadas no pipeline Delta Live Tables só poderão ser acessadas por outras operações nesse mesmo pipeline. Consulte Publicar dados de Delta Live Tables para Hive metastore e Usar Unity Catalog com seu pipeline Delta Live Tables .

Exemplo de notebooks de código fonte

O senhor pode importar esse Notebook para um Databricks workspace e usá-lo para implantar um Delta Live Tables pipeline. Consulte Criar um pipeline.

Comece a usar o notebook Python Delta Live Tables

Abra o bloco de anotações em outra guia

Comece a usar o notebook SQL Delta Live Tables

Abra o bloco de anotações em outra guia

Exemplo de código-fonte Notebook para espaço de trabalho sem Unity Catalog

O senhor pode importar esse Notebook para um Databricks workspace sem o Unity Catalog habilitado e usá-lo para implantar um Delta Live Tables pipeline. Consulte Criar um pipeline.

Comece a usar o notebook Python Delta Live Tables

Abra o bloco de anotações em outra guia

Comece a usar o notebook SQL Delta Live Tables

Abra o bloco de anotações em outra guia