Tutorial: COPY INTO com Spark SQL

Databricks recomenda que o senhor use o comando COPY INTO para carregamento de dados incremental e em massa para fontes de dados que contêm milhares de arquivos. A Databricks recomenda que o senhor use o Auto Loader para casos de uso avançados.

Neste tutorial, o senhor usa o comando COPY INTO para carregar dados do armazenamento de objetos cloud em uma tabela em seu Databricks workspace.

Requisitos

  1. Um Databricks account, e um Databricks workspace em seu account. Para criá-los, consulte Get começar with Databricks.

  2. Um produto para todos os fins cluster em seu workspace executando Databricks Runtime 11.3 LTS ou acima. Para criar um clusters todo-propósito, consulte a referência de configuração de computação.

  3. Familiaridade com a interface de usuário Databricks workspace . Consulte Navegar no espaço de trabalho.

  4. Familiaridade com o site Databricks Notebook.

  5. Um local no qual o senhor pode gravar dados; esta demonstração usa o DBFS root como exemplo, mas o Databricks recomenda um local de armazenamento externo configurado com Unity Catalog.

passo 1. Configure seu ambiente e crie um gerador de dados

Este tutorial pressupõe uma familiaridade básica com o Databricks e uma configuração do default workspace . Se não conseguir executar o código fornecido, entre em contato com o administrador do site workspace para certificar-se de que tem acesso ao recurso compute e a um local onde possa gravar dados.

Observe que o código fornecido usa um parâmetro source para especificar o local que o senhor configurará como sua COPY INTO fonte de dados. Conforme escrito, esse código aponta para um local em DBFS root. Se o senhor tiver permissões de gravação em um local de armazenamento de objetos externo, substitua a parte dbfs:/ das cadeias de caracteres de origem pelo caminho para o armazenamento de objetos. Como esse bloco de código também faz uma exclusão recursiva para redefinir essa demonstração, certifique-se de não apontar isso para os dados de produção e de manter o diretório /user/{username}/copy-into-demo aninhado para evitar sobrescrever ou excluir dados existentes.

  1. Crie um novo Notebook SQL e anexe-o a um cluster que esteja executando o Databricks Runtime 11.3 LTS ou acima.

  2. Copie e execute o código a seguir para redefinir o local de armazenamento e o banco de dados usados neste site tutorial:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copie e execute o código a seguir para configurar algumas tabelas e funções que serão usadas para gerar dados aleatoriamente:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

passo 2: gravar os dados de amostra no armazenamento em nuvem

A gravação em formatos de dados diferentes do Delta Lake é rara no Databricks. O código fornecido aqui grava em JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.

  1. Copie e execute o código a seguir para gravar um lote de dados brutos em JSON:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

passo 3: Use COPY INTO para carregar dados JSON idempotentemente

O senhor deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO. Em Databricks Runtime 11.3 LTS e acima, o senhor não precisa fornecer nada além de um nome de tabela na declaração CREATE TABLE. Nas versões anteriores do Databricks Runtime, o senhor deve fornecer um esquema ao criar uma tabela vazia.

  1. Copie e execute o código a seguir para criar a tabela de destino Delta e carregar os dados da fonte:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Como essa ação é idempotente, o senhor pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.

passo 4: Visualize o conteúdo da sua tabela

O senhor pode executar uma consulta simples em SQL para revisar manualmente o conteúdo dessa tabela.

  1. Copie e execute o código a seguir para visualizar sua tabela:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

passo 5: Carregar mais dados e visualizar resultados

Você pode executar novamente as passos 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los idempotentemente no Delta Lake com COPY INTO e visualizar os resultados. Tente executar essas passos fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executando COPY INTO várias vezes sem a chegada de novos dados.

passo 6: Tutorial de limpeza

Quando terminar de usar o site tutorial, o senhor poderá limpar o recurso associado se não quiser mais mantê-lo.

  1. Copie e execute o código a seguir para eliminar o banco de dados, as tabelas e remover todos os dados:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Para interromper seu recurso compute, acesse os clusters tab e encerre seu recurso cluster.

Outros recursos