O senhor começa a usar o site COPY INTO para carregar dados

O comando COPY INTO SQL permite que o senhor carregue dados de um local de arquivo em uma tabela Delta. Essa é uma operação re-triável e idempotente; os arquivos no local de origem que já foram carregados são ignorados.

COPY INTO oferece os seguintes recursos:

  • Filtros de arquivos ou diretórios facilmente configuráveis do armazenamento cloud, incluindo volumes S3, ADLS Gen2, ABFS, GCS e Unity Catalog.

  • Suporte a vários formatos de arquivos de origem: CSV, JSON, XML, Avro, ORC, Parquet, arquivos de texto e binários

  • Processamento de arquivos exatamente uma vez (idempotente) por default

  • Inferência, mapeamento, fusão e evolução do esquema da tabela de destino

Aviso

COPY INTO respeita a configuração workspace para vetores de exclusão. Se ativados, os vetores de exclusão serão ativados na tabela de destino quando COPY INTO forem executados em um SQL warehouse ou compute com Databricks Runtime 14.0 ou superior. Uma vez ativados, os vetores de exclusão bloqueiam as consultas em uma tabela no Databricks Runtime 11.3 LTS e abaixo. Consulte O que são vetores de exclusão? e habilite automaticamente os vetores de exclusão.

Requisitos

Um administrador account deve seguir as passos em Configurar acesso a dados para ingestão para configurar o acesso a dados no armazenamento de objetos cloud antes que os usuários possam carregar o uso de dados COPY INTO.

Exemplo: Carregar dados em uma tabela Delta Lake sem esquema

Observação

Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.

O senhor pode criar tabelas Delta de espaço reservado vazias para que o esquema seja inferido posteriormente durante um comando COPY INTO, definindo mergeSchema como true em COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

A instrução SQL acima é idempotente e pode ser programada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.

Observação

A tabela Delta vazia não pode ser usada fora de COPY INTO. INSERT INTO e MERGE INTO não são compatíveis com a gravação de dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela se torna consultável.

Consulte Criar tabelas de destino para COPY INTO.

Exemplo: Definir o esquema e carregar dados em uma tabela do Delta Lake

O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o comando COPY INTO SQL para carregar dados de amostra do conjunto de dadosDatabricks na tabela. O senhor pode executar o código de exemplo Python, R, Scala, ou SQL a partir de um Notebook anexado a um Databricks cluster. O senhor também pode executar o código SQL a partir de uma consulta associada a um SQL warehouse em Databricks SQL.

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Para limpar, execute o seguinte código, que exclui a tabela:

spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload

Limpe arquivos de metadados

O senhor pode executar vacuum para limpar os arquivos de metadados não referenciados criados por COPY INTO em Databricks Runtime 15.2 e acima.

Referência

Outros recursos