COPY INTO を使用してデータを読み込む

COPY INTO SQL コマンドを使用すると、ファイルの場所から Delta テーブルにデータを読み込むことができます。これは、再試行可能でべき等な操作です。既に読み込まれているソースの場所のファイルはスキップされます。

COPY INTO は、次の機能を提供します。

  • S3、ADLS Gen2、APFS、GCS、Unity Catalog ボリュームなどのクラウド ストレージから簡単に構成できるファイルまたはディレクトリ フィルター。

  • 複数のソース ファイル形式のサポート: CSV、JSON、XML、 AvroORCParquet 、テキスト、バイナリ ファイル

  • デフォルトによる Exactly-once (べき等) ファイル処理

  • ターゲット表スキーマの推論、マッピング、マージ、および進化

警告

COPY INTO 削除ベクトルのワークスペース設定を尊重します。 有効にすると、 Databricks Runtime 14.0 以上を実行している SQL ウェアハウスまたはコンピュート上で COPY INTO を実行するときに、ターゲット テーブルでベクトル削除が有効になります。 削除ベクトルを有効にすると、Databricks Runtime 11.3 LTS 以下のテーブルに対するクエリがブロックされます。 削除ベクトルとは何かを参照してください。 削除ベクトルを自動で有効にします

要件

アカウント管理者は、ユーザーが COPY INTOを使用してデータを読み込む前に、「 インジェスト用のデータ アクセスの構成 」の手順に従ってクラウド オブジェクト ストレージ内のデータへのアクセスを構成する必要があります。

例: スキーマレス Delta Lake テーブル へのデータのロード

この機能は、Databricks Runtime 11.3 LTS 以降で利用できます。

空のプレースホルダー Delta テーブルを作成して、COPY_OPTIONSmergeSchematrue に設定することで、後で COPY INTO コマンド中にスキーマが推論されるようにすることができます。

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

上記の SQL ステートメントはべき等であり、Delta テーブルに一度だけデータを取り込むようにスケジュールできます。

空の Delta テーブルは、 COPY INTO以外では使用できません。 INSERT INTOMERGE INTO は、スキーマレス Delta テーブルにデータを書き込むためにサポートされていません。 COPY INTOを使用してテーブルにデータが挿入されると、テーブルはクエリ可能になります。

COPY INTO のターゲット表の作成を参照してください。

例: スキーマを設定し、Delta Lake テーブルに データを読み込む

次の例は、 Deltaテーブルを作成し、COPY INTO SQL コマンドを使用してDatabricks データセットからテーブルにサンプル データを読み込む方法を示しています。 Databricks クラスター に接続された ノートブック からサンプルの Python、R、Scala、または SQL コードを実行できます。の Databricks SQLSQLウェアハウス に関連付けられた クエリ から SQL コードを実行することもできます。

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

クリーンアップするには、次のコードを実行してテーブルを削除します。

spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload

参考

関連リソース