Tutorial: COPY INTO com Spark SQL

Databricks recomenda que você use o comando COPY INTO para carregamento de dados incremental e em massa para fonte de dados que contém milhares de arquivos. Databricks recomenda que você use o Auto Loader para casos de uso avançados.

Neste tutorial, você usa o comando COPY INTO para carregar dados do armazenamento de objetos cloud em uma tabela em seu workspace do Databricks.

Requisitos

  1. Uma account Databricks e um workspace Databricks em sua account. Para criá-los, consulte Get começar: configuração account e workspace .

  2. Um cluster para todos os fins em seu site workspace executando o Databricks Runtime 11.0 ou acima. Para criar um todo-propósito de clusters, consulte Referência de configuração do Compute.

  3. Familiaridade com a interface do usuário workspace Databricks. Consulte Navegar na área de trabalho.

  4. Familiaridade trabalhando com Databricks Notebook.

  5. Um local onde você pode gravar dados; esta demonstração usa a DBFS root como exemplo, mas o Databricks recomenda um local de armazenamento externo configurado com o Unity Catalog.

passo 1. Configure seu ambiente e crie um gerador de dados

Este tutorial pressupõe familiaridade básica com Databricks e uma configuração workspace default . Se você não conseguir executar o código fornecido, entre em contato com o administrador workspace para garantir que você tenha acesso aos recursos compute e um local no qual possa gravar dados.

Observe que o código fornecido usa um parâmetro source para especificar o local que você configurará como sua fonte de dados COPY INTO . Conforme escrito, esse código aponta para um local na DBFS root. Se você tiver permissões de gravação em um local de armazenamento de objeto externo, substitua a parte dbfs:/ das strings de origem pelo caminho para seu armazenamento de objeto. Como esse bloco de código também faz uma exclusão recursiva para Reset esta demonstração, certifique-se de não apontar isso para dados de produção e de manter o diretório aninhado /user/{username}/copy-into-demo para evitar sobrescrever ou excluir dados existentes.

  1. Crie um novo SQL Notebook e anexe-o a um clusters executando o Databricks Runtime 11.0 ouacima.

  2. Copie e execute o seguinte código para Reset o local de armazenamento e o banco de dados usados neste tutorial:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copie e execute o seguinte código para configurar algumas tabelas e funções que serão utilizadas para gerar dados aleatoriamente:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

passo 2: gravar os dados de amostra no armazenamento em nuvem

Escrever em formatos de dados diferentes de Delta Lake é raro em Databricks. O código fornecido aqui grava em JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.

  1. Copie e execute o seguinte código para gravar lotes de dados JSON brutos:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

passo 3: Use COPY INTO para carregar dados JSON idempotentemente

Você deve criar uma tabela de destino Delta Lake antes de poder usar COPY INTO. No Databricks Runtime 11.0e acima, você não precisa fornecer nada além de um nome de tabela em sua instrução CREATE TABLE . Para versões anteriores do Databricks Runtime, você deve fornecer um esquema ao criar uma tabela vazia.

  1. Copie e execute o seguinte código para criar sua tabela Delta de destino e carregar dados de sua fonte:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Como esta ação é idempotente, você pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.

passo 4: Visualize o conteúdo da sua tabela

Você pode executar uma query SQL simples para revisar manualmente o conteúdo desta tabela.

  1. Copie e execute o seguinte código para visualizar sua tabela:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

passo 5: Carregar mais dados e visualizar resultados

Você pode executar novamente as passos 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los idempotentemente no Delta Lake com COPY INTO e visualizar os resultados. Tente executar essas passos fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executando COPY INTO várias vezes sem a chegada de novos dados.

passo 6: Tutorial de limpeza

Quando terminar este tutorial, você poderá limpar os recursos associados se não quiser mais mantê-los.

  1. Copie e execute o seguinte código para descartar o banco de dados, tabelas e remover todos os dados:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Para interromper seu recurso compute , vá para a tab clusterss e finalize seus clusters.

Recursos adicionais