COPY INTO を使用してデータを読み込む

COPY INTO SQL コマンドを使用すると、ファイルの場所から Delta テーブルにデータを読み込むことができます。これは再試行可能でべき等な操作です。ソースの場所にあるファイルのうち、すでにロードされているものはスキップされます。

COPY INTO には、次の機能があります。

  • S3、ADLS Gen2、ABFS、GCS、Unity Catalogボリュームなど、クラウドストレージから簡単に設定できるファイルまたはディレクトリフィルター。

  • 複数のソースファイル形式のサポート:CSV、JSON、XML、 AvroORCParquet、テキスト、バイナリファイル

  • デフォルトでの Exactly-once (べき等) ファイル処理

  • ターゲット・テーブル・スキーマの推論、マッピング、マージ、および進化

警告

COPY INTO 削除ベクトルのワークスペース設定を尊重します。 有効にすると、14.0 以降を実行している SQLウェアハウスまたはコンピュートでCOPY INTO実行するときに、ターゲット テーブルで削除ベクトル Databricks Runtime 有効になります。 有効にすると、削除ベクトルは Databricks Runtime 11.3 LTS 以前のテーブルに対するクエリをブロックします。 削除ベクトルとはおよび削除ベクトルの自動有効化を参照してください。

要件

アカウント管理者は、ユーザーが COPY INTOを使用してデータを読み込む前に、インジェスト用のデータ アクセスの構成 の手順に従ってクラウドオブジェクトストレージ内のデータへのアクセスを構成する必要があります。

例: スキーマレス Delta Lake テーブルへのデータのロード

注:

この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。

空のプレースホルダー Delta テーブルを作成して、後で COPY INTO コマンド中にスキーマが推論されるようにするには、COPY_OPTIONSmergeSchematrue に設定します。

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

上記の SQL ステートメントはべき等であり、データを Delta テーブルに 1 回だけ取り込むように実行するようにスケジュールできます。

注:

空の Delta テーブルは、 COPY INTO以外では使用できません。 INSERT INTOMERGE INTO は、スキーマレス Delta テーブルにデータを書き込むことはサポートされていません。 COPY INTOを使用してデータをテーブルに挿入すると、テーブルはクエリ可能になります。

COPY INTO のターゲットテーブルの作成を参照してください。

例: スキーマを設定し、Delta Lake テーブルにデータを読み込む

次の例は、Delta テーブルを作成し、 COPY INTO SQL コマンドを使用して Databricks データセット からテーブルにサンプル データを読み込む方法を示しています。 Python、R、Scala、または SQL コードの例は、Databricks クラスター にアタッチされた ノートブック から実行できます。また、SQL の Databricks SQLSQLウェアハウス に関連付けられた クエリ から コードを実行することもできます。

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

クリーンアップするには、次のコードを実行してテーブルを削除します。

spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload

メタデータ ファイルのクリーンアップ

VACUUM を実行して、Databricks Runtime 15.2 以降の COPY INTO によって作成された参照されていないメタデータ ファイルをクリーンアップできます。

リファレンス

関連リソース