Comece a usar COPY INTO para carregar dados

O comando SQL COPY INTO permite carregar dados de um local de arquivo em uma tabela Delta. Esta é uma operação re-testável e idempotente; os arquivos no local de origem que já foram carregados são ignorados.

COPY INTO oferece os seguintes recursos:

  • Filtros de arquivos ou diretórios facilmente configuráveis a partir de armazenamento clouds , incluindo volumes S3, ADLS Gen2, ABFS, GCS e Unity Catalog.

  • Suporte a vários formatos de arquivos de origem: CSV, JSON, XML, Avro, ORC, Parquet, arquivos de texto e binários

  • Processamento de arquivo exatamente uma vez (idempotente) por default

  • Inferência, mapeamento, fusão e evolução do esquema da tabela de destino

Aviso

COPY INTO respeita a configuração do workspace para vetores de exclusão. Se habilitados, os vetores de exclusão serão habilitados na tabela de destino durante a execução COPY INTO em um SQL warehouse ou compute executando o Databricks Runtime 14.0 ou superior. Uma vez habilitados, os vetores de exclusão bloqueiam consultas em uma tabela no Databricks Runtime 11.3 LTS e abaixo. Consulte O que são vetores de exclusão? e Habilitar automaticamente vetores de exclusão.

Requisitos

Um administrador account deve seguir as passos em Configurar acesso a dados para ingestão para configurar o acesso a dados no armazenamento de objetos cloud antes que os usuários possam carregar o uso de dados COPY INTO.

Exemplo: carregar dados em uma tabela Delta Lake sem esquema

Observação

Este recurso está disponível no Databricks Runtime 11.0e acima.

Você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja inferido posteriormente durante um comando COPY INTO definindo mergeSchema como true em COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

A instrução SQL acima é idempotente e pode ser agendada para execução para ingerir dados exatamente uma vez em uma tabela Delta.

Observação

A tabela Delta vazia não pode ser usada fora de COPY INTO. INSERT INTO e MERGE INTO não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela torna-se consultável.

Consulte Criar tabelas de destino para COPY INTO.

Exemplo: Definir esquema e carregar dados em uma tabela Delta Lake

O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o comando SQL COPY INTO para carregar dados de amostra do conjunto de dados do Databricks na tabela. O senhor pode executar o exemplo de código Python, R, Scala ou SQL a partir de um Notebook conectado a um cluster Databricks. O senhor também pode executar o código SQL de uma consulta associada a um SQL warehouse no Databricks SQL.

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Para limpar, execute o seguinte código, que exclui a tabela:

spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload

Referência

Recursos adicionais