COPY INTO を使用してデータを読み込む
COPY INTO
SQL コマンドを使用すると、ファイルの場所から Delta テーブルにデータを読み込むことができます。これは再試行可能でべき等な操作です。ソースの場所にあるファイルのうち、すでにロードされているものはスキップされます。
COPY INTO
には、次の機能があります。
S3、ADLS Gen2、ABFS、GCS、Unity Catalogボリュームなど、クラウドストレージから簡単に設定できるファイルまたはディレクトリフィルター。
複数のソースファイル形式のサポート:CSV、JSON、XML、 Avro、 ORC、 Parquet、テキスト、バイナリファイル
デフォルトでの Exactly-once (べき等) ファイル処理
ターゲット・テーブル・スキーマの推論、マッピング、マージ、および進化
警告
COPY INTO
削除ベクトルのワークスペース設定を尊重します。 有効にすると、14.0 以降を実行している SQLウェアハウスまたはコンピュートでCOPY INTO
実行するときに、ターゲット テーブルで削除ベクトル Databricks Runtime 有効になります。 有効にすると、削除ベクトルは Databricks Runtime 11.3 LTS 以前のテーブルに対するクエリをブロックします。 削除ベクトルとはおよび削除ベクトルの自動有効化を参照してください。
要件
アカウント管理者は、ユーザーが COPY INTO
を使用してデータを読み込む前に、インジェスト用のデータ アクセスの構成 の手順に従ってクラウドオブジェクトストレージ内のデータへのアクセスを構成する必要があります。
例: スキーマレス Delta Lake テーブルへのデータのロード
注:
この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。
空のプレースホルダー Delta テーブルを作成して、後で COPY INTO
コマンド中にスキーマが推論されるようにするには、COPY_OPTIONS
で mergeSchema
を true
に設定します。
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
上記の SQL ステートメントはべき等であり、データを Delta テーブルに 1 回だけ取り込むように実行するようにスケジュールできます。
注:
空の Delta テーブルは、 COPY INTO
以外では使用できません。 INSERT INTO
と MERGE INTO
は、スキーマレス Delta テーブルにデータを書き込むことはサポートされていません。 COPY INTO
を使用してデータをテーブルに挿入すると、テーブルはクエリ可能になります。
COPY INTO のターゲットテーブルの作成を参照してください。
例: スキーマを設定し、Delta Lake テーブルにデータを読み込む
次の例は、Delta テーブルを作成し、 COPY INTO
SQL コマンドを使用して Databricks データセット からテーブルにサンプル データを読み込む方法を示しています。 Python、R、Scala、または SQL コードの例は、Databricks クラスター にアタッチされた ノートブック から実行できます。また、SQL の Databricks SQLSQLウェアハウス に関連付けられた クエリ から コードを実行することもできます。
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
クリーンアップするには、次のコードを実行してテーブルを削除します。
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
メタデータ ファイルのクリーンアップ
VACUUM を実行して、Databricks Runtime 15.2 以降の COPY INTO
によって作成された参照されていないメタデータ ファイルをクリーンアップできます。
関連リソース
同じ Delta テーブルに対する複数の
COPY INTO
操作の例など、一般的な使用パターンについては、COPY INTO を使用した一般的なデータ読み込みパターンを参照してください。