Tutorial: Executar um pipeline analítico de Lakehouse de ponta a ponta

Este tutorial mostra como configurar um pipeline de análise de ponta a ponta para um Lakehouse do Databricks.

Importante

Este tutorial usa blocos de anotações interativos para concluir tarefas comuns de ETL em Python em clusters habilitados para o Unity Catalog. Se você não estiver usando o Unity Catalog, consulte Executar sua primeira carga de trabalho de ETL no Databricks.

Tarefas neste tutorial

Ao final deste artigo, você se sentirá confortável:

  1. Lançamento de clusters compute habilitados para Unity Catalog.

  2. Criando um Notebook do Databricks.

  3. Escrever e ler dados de um local externo Unity Catalog .

  4. Configurando a ingestão de dados incrementais para uma tabela Unity Catalog com o Auto Loader.

  5. Executando células Notebook para processar, query e visualizar dados.

  6. programar um Notebook como um Jobdo Databricks.

  7. Consultando tabelas Unity Catalog do Databricks SQL

O Databricks oferece um conjunto de ferramentas prontas para produção que permitem aos profissionais de dados desenvolver e implantar rapidamente pipelines de extração, transformação e carga (ETL).O Unity Catalog permite que os administradores de dados configurem e protejam credenciais de armazenamento, locais externos e objetos de banco de dados para usuários em toda a organização. O Databricks SQL permite que os analistas executem consultas SQL nas mesmas tabelas usadas em cargas de trabalho ETL de produção, permitindo business intelligence em tempo real em escala.

Você também pode usar Delta Live Tables para criar pipelines ETL. O Databricks criou o Delta Live Tables para reduzir a complexidade da criação, implantação e manutenção de pipelines ETL de produção. Consulte Tutorial: Execute seu primeiro pipeline das Delta Live Tables.

Requisitos

Observação

Se você não tiver privilégios de controle de cluster, ainda poderá concluir a maioria das passos abaixo, desde que tenha acesso a um cluster.

Etapa 1: criar um cluster

Para realizar análise exploratória de dados e data engineering, crie um cluster para fornecer os recursos de compute necessários para executar comandos.

  1. Clique em ícone de computação Calcular na barra lateral.

  2. Clique Novo ícone Novo na barra lateral e selecione clusters. Isso abre a página Novos clusters/compute .

  3. Especifique um nome exclusivo para o cluster.

  4. Selecione o botão de opção Nó único.

  5. Selecione Usuário único no dropdown Modo de acesso.

  6. Confira se seu endereço de e-mail está visível no campo Usuário único.

  7. Selecione a versão de Databricks runtime desejada, 11.1 ouacima para utilizar o catálogo Unity.1.

  8. Clique em Criar compute para criar os clusters.

Para saber mais sobre clusters Databricks, consulte Compute.

Etapa 2: Criar um bloco de anotações Databricks

Para começar a escrever e executar código interativo no Databricks, crie um notebook.

  1. Clique Novo ícone Novo na barra lateral e clique em Notebook.

  2. Na página Criar Notebook:

    • Especifique um nome exclusivo para o seu notebook.

    • Certifique-se de que o idioma padrão esteja definido como Python.

    • Use o menu dropdown Conectar para selecionar os clusters criados na etapa 1 nodropdown clusters .

O Notebook abre com uma célula vazia.

Para saber mais sobre como criar e gerenciar notebooks, consulte Gerenciar notebooks.

Etapa 3: Gravar e ler dados de um local externo gerenciado pelo Unity Catalog 2.

o Databricks recomenda o uso do Auto Loader para a ingestão de dados incrementais. O Auto Loader detecta e processa automaticamente novos arquivos à medida que chegam ao armazenamento de objetos na nuvem.

Use o Unity Catalog para gerenciar o acesso seguro a locais externos. Usuários ou entidades de serviço com permissões READ FILES em um local externo podem usar Auto Loader para ingerir dados.

Normalmente, os dados chegarão a uma localização externa devido às gravações de outros sistemas.Nesta demonstração, você pode simular a chegada de dados escrevendo arquivos JSON em um local externo.

Copie o código abaixo em uma célula do note6.Substitua o valor da string de catalog pelo nome de um catálogo com permissões CREATE CATALOG e USE CATALOG . Substitua o valor da string de external_location pelo caminho de um local externo com permissões READ FILES, WRITE FILES e CREATE EXTERNAL TABLE .

As localizações externas podem ser definidas como um contêiner de armazenamento inteiro, mas frequentemente apontam para um diretório dentro de um contêiner.

O formato correto para um caminho de localização externo é "gs://bucket-name/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

A execução desta célula deverá imprimir uma linha que lê 12 bytes, imprimir as strings “Olá mundo!”, e exibir todos os bancos de dados presentes no catálogo fornecido. Se você não conseguir executar esta célula, confirme se está em um workspace habilitado para o Unity Catalog e solicite as permissões adequadas ao administrador workspace para concluir este tutorial.

O código Python abaixo usa seu endereço email para criar um banco de dados exclusivo no catálogo fornecido e um local de armazenamento exclusivo em local externo fornecido. A execução desta célula removerá todos os dados associados a este tutorial, permitindo que você execute este exemplo de forma idempotente. É definida e instanciada uma classe que você usará para simular lotes de dados que chegam de um sistema conectado ao local externo de origem.

Copie este código para uma nova célula em seu notebook e execute-o para configurar seu ambiente.

Observação

As variáveis definidas neste código devem garantir que você possa executá-lo com segurança, sem risco de conflitos com ativos já existentes no ambiente de trabalho ou outros usuários.As permissões de rede ou armazenamento restritas resultarão em erros ao executar este código; entre em contato com o administrador do seu ambiente de trabalho para solucionar essas restrições.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)


# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Agora você pode obter muitos dados copiando o código a seguir em uma célula e executando-o. Você pode executar manualmente esta célula até 60 vezes para acionar a chegada de novos dados.

RawData.land_batch()

Etapa 4: Configurar o Auto Loader para consumir dados no Unity Catalog

O Databricks recomenda o armazenamento de dados com o Delta Lake. Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e habilita o data lakehouse. Delta Lake é o formato padrão para tabelas criadas em Databricks.

Para configurar o Auto Loader para importar dados em uma tabela Unity Catalog, copie e cole o código abaixo em uma célula vazia do seu notebook:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?.

Para saber mais sobre a transmissão estruturada com o Unity Catalog, consulte Usando o Unity Catalog com transmissão estruturada.

Etapa 5: processar e interagir com os dados

Os notebooks executam a lógica célula por célula. Use estas etapas para executar a lógica em sua célula:

  1. Para executar a célula que o senhor completou na etapa anterior, selecione a célula e pressione SHIFT+ENTER.

  2. Para consultar a tabela que você acabou de criar, copie e cole o código a seguir em uma célula vazia e pressione SHIFT+ENTER para executar a célula.

    df = spark.read.table(table_name)
    
  3. Para visualizar os dados em seu DataFrame, copie e cole o seguinte código em uma célula vazia e pressione SHIFT+ENTER para executar a célula.

    display(df)
    

Para saber mais sobre as opções interativas para visualizar dados, consulte visualizações em notebooks de dados.

Etapa 6: agendar um trabalho

Você pode executar Databricks Notebook como scripts de produção, adicionando-os como uma tarefa em um Databricks Job. Nesta passo, você criará um novo Job que pode ser acionado manualmente.

Para agendar seu notebook como uma tarefa:

  1. Clique em Agendar no lado direito da barra de cabeçalho.

  2. Insira um nome exclusivo para o Nome do job.

  3. Clique em Manual.

  4. Na lista suspensa Cluster, selecione o cluster que você criou na etapa 1.

  5. Clique em Criar.

  6. Na janela exibida, clique em Executar agora.

  7. Para ver os resultados da execução Job , clique no botão Link externo ícone ao lado do carimbo de data/hora da última execução .

Para obter mais informações sobre o Job, consulte O que são trabalhos do Databricks?.

Etapa 7: Tabela de consulta do Databricks SQL

Qualquer pessoa com a permissão USE CATALOG no catálogo atual, a permissão USE SCHEMA no esquema atual e as permissões SELECT na tabela pode consultar o conteúdo da tabela a partir de sua API Databricks preferida.

Você precisa ter acesso a um depósito SQL em execução para executar consultas no Databricks SQL.

A tabela que você criou anteriormente neste tutorial tem o nome target_table. Você pode query -lo usando o catálogo fornecido na primeira célula e o banco de dados com o padrão e2e_lakehouse_<your-username>. É possível usar o Catalog Explorer para localizar os objetos de dados que você criou.

Integrações adicionais

Saiba mais sobre integrações e ferramentas para engenharia de dados com Databricks: