Comece a usar COPY INTO para carregar dados

O comando SQL COPY INTO permite carregar dados de um local de arquivo em uma tabela Delta. Esta é uma operação re-testável e idempotente; os arquivos no local de origem que já foram carregados são ignorados.

COPY INTO oferece os seguintes recursos:

  • Filtros de arquivos ou diretórios facilmente configuráveis a partir de armazenamento clouds , incluindo volumes S3, ADLS Gen2, ABFS, GCS e Unity Catalog.

  • Suporte a vários formatos de arquivos de origem: CSV, JSON, XML, Avro, ORC, Parquet, arquivos de texto e binários

  • Processamento de arquivo exatamente uma vez (idempotente) por default

  • Inferência, mapeamento, fusão e evolução do esquema da tabela de destino

Aviso

COPY INTO respeita a configuração workspace para vetores de exclusão. Se ativados, os vetores de exclusão serão ativados na tabela de destino quando COPY INTO forem executados em um SQL warehouse ou compute com Databricks Runtime 14.0 ou superior. Uma vez ativados, os vetores de exclusão bloqueiam as consultas a uma tabela em Databricks Runtime 11.3 LTS e abaixo. Consulte O que são vetores de exclusão? e Auto-enable deletion vectors (Ativação automática de vetores de exclusão).

Requisitos

Um administrador account deve seguir as passos em Configurar acesso a dados para ingestão para configurar o acesso a dados no armazenamento de objetos cloud antes que os usuários possam carregar o uso de dados COPY INTO.

Exemplo: carregar dados em uma tabela Delta Lake sem esquema

Observação

Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.

Você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja inferido posteriormente durante um comando COPY INTO definindo mergeSchema como true em COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

A instrução SQL acima é idempotente e pode ser agendada para execução para ingerir dados exatamente uma vez em uma tabela Delta.

Observação

A tabela Delta vazia não pode ser usada fora de COPY INTO. INSERT INTO e MERGE INTO não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela torna-se consultável.

Consulte Criar tabelas de destino para COPY INTO.

Exemplo: Definir esquema e carregar dados em uma tabela Delta Lake

O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o comando SQL COPY INTO para carregar dados de amostra do conjunto de dados do Databricks na tabela. O senhor pode executar o exemplo de código Python, R, Scala ou SQL a partir de um Notebook conectado a um cluster Databricks. O senhor também pode executar o código SQL de uma consulta associada a um SQL warehouse no Databricks SQL.

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Para limpar, execute o seguinte código, que exclui a tabela:

spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload

Referência

Recursos adicionais