チュートリアル: Spark SQLを使用したCOPY INTO
Databricks では、数千のファイルを含むデータソースの増分データ読み込みと一括データ読み込みには、COPY INTO コマンドを使用することをお勧めします。Databricks では、高度なユースケースには Auto Loader を使用することをお勧めします。
このチュートリアルでは、 COPY INTO
コマンドを使用して、クラウドオブジェクトストレージから Databricks ワークスペースのテーブルにデータを読み込みます。
要件
Databricks アカウントと、アカウント内の Databricks ワークスペース。 これらを作成するには、「 はじめに: アカウントとワークスペースのセットアップ」を参照してください。
Databricks Runtime 11.3 LTS 以降を実行しているワークスペース内の汎用 クラスター 。 All-Purposeクラスターを作成するには、 コンピュート構成リファレンスを参照してください。
Databricks ワークスペースのユーザー インターフェイスに精通している。 「ワークスペースの移動」を参照してください。
Databricks ノートブックの操作に精通している。
データを書き込むことができる場所。このデモでは、例として DBFSルートを使用しますが、 Databricks では、 Unity Catalogで構成された外部ストレージの場所をお勧めします。
ステップ1.環境を構成し、データ ジェネレーターを作成する
このチュートリアルは、 Databricks とデフォルト ワークスペース構成に関する基本的な知識があることを前提としています。 提供されたコードを実行できない場合は、ワークスペース管理者に問い合わせて、コンピュート リソースにアクセスできることと、データを書き込むことができる場所があることを確認してください。
提供されているコードでは、 source
パラメーターを使用して、 COPY INTO
データソースとして構成する場所を指定していることに注意してください。 記述されているように、このコードはルート上の場所を指し DBFS。 外部オブジェクトストレージの場所に対する書き込み権限がある場合は、ソース文字列の dbfs:/
部分をオブジェクトストレージへのパスに置き換えます。 このコード ブロックでは、このデモをリセットするために再帰的な削除も行われるため、これを本番運用データに指定しないようにし、既存のデータの上書きや削除を避けるために、 /user/{username}/copy-into-demo
ネストされたディレクトリを保持してください。
新しい SQL ノートブックを作成し 、Databricks Runtime 11.3 LTS 以降を実行している クラスターにアタッチします 。
次のコードをコピーして実行し、このチュートリアルで使用するストレージの場所とデータベースをリセットします。
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
次のコードをコピーして実行し、データをランダムに生成するために使用されるいくつかのテーブルと関数を構成します。
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
ステップ 2: サンプルデータをクラウドストレージに書き込む
Delta Lake 以外のデータ形式への書き込みは、Databricks ではまれです。 ここで提供されるコードは JSON に書き込み、別のシステムからオブジェクトをストレージにダンプする可能性のある外部システムをシミュレートします。
次のコードをコピーして実行し、生の JSON データのバッチを書き込みます。
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
ステップ 3: COPY INTO を使用して JSON データをべき等にロードする
ターゲットの Delta Lake テーブルは、 COPY INTO
を使用する前に作成する必要があります。 Databricks Runtime 11.3 LTS 以降では、 CREATE TABLE
ステートメントにテーブル名以外のものを指定する必要はありません。 以前のバージョンの Databricks Runtime では、空のテーブルを作成するときにスキーマを指定する必要があります。
次のコードをコピーして実行し、ターゲットの Delta テーブルを作成し、ソースからデータを読み込みます。
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
このアクションはべき等であるため、複数回実行できますが、データは 1 回しか読み込まれません。
ステップ 4: 表の内容をプレビューする
単純な SQL クエリを実行して、このテーブルの内容を手動で確認できます。
次のコードをコピーして実行し、テーブルをプレビューします。
-- Review updated table SELECT * FROM user_ping_target
ステップ 5: より多くのデータを読み込み、結果をプレビューする
ステップ 2 から 4 を何度も再実行して、ランダムな生 JSON データの新しいバッチをソースに配置し、それらをべき等にロードして COPY INTO
と Delta Lake し、結果をプレビューできます。 これらのステップを順不同または複数回実行して、新しいデータが到着せずに複数の生データのバッチが書き込まれたり、 COPY INTO
複数回実行されたりすることをシミュレートしてみてください。
ステップ 6: チュートリアルのクリーンアップ
このチュートリアルを終了したら、関連するリソースを保持する必要がなくなった場合は、それらをクリーンアップできます。
次のコードをコピーして実行し、データベースとテーブルを削除し、すべてのデータを削除します。
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
コンピュート リソースを停止するには、 クラスター タブに移動し、 クラスターを終了します 。
関連リソース
COPY INTOの リファレンス記事