チュートリアル: Spark SQLを使用したCOPY INTO

Databricks では、数千のファイルを含むデータソースの増分データ読み込みと一括データ読み込みには、COPY INTO コマンドを使用することをお勧めします。Databricks では、高度なユースケースには Auto Loader を使用することをお勧めします。

このチュートリアルでは、 COPY INTO コマンドを使用して、クラウドオブジェクトストレージから Databricks ワークスペースのテーブルにデータを読み込みます。

要件

  1. Databricks アカウントと、アカウント内の Databricks ワークスペース。 これらを作成するには、「 はじめに: アカウントとワークスペースのセットアップ」を参照してください。

  2. Databricks Runtime 11.3 LTS 以降を実行しているワークスペース内の汎用 クラスター 。 All-Purposeクラスターを作成するには、 コンピュート構成リファレンスを参照してください。

  3. Databricks ワークスペースのユーザー インターフェイスに精通している。 「ワークスペースの移動」を参照してください。

  4. Databricks ノートブックの操作に精通している。

  5. データを書き込むことができる場所。このデモでは、例として DBFSルートを使用しますが、 Databricks では、 Unity Catalogで構成された外部ストレージの場所をお勧めします。

ステップ1.環境を構成し、データ ジェネレーターを作成する

このチュートリアルは、 Databricks とデフォルト ワークスペース構成に関する基本的な知識があることを前提としています。 提供されたコードを実行できない場合は、ワークスペース管理者に問い合わせて、コンピュート リソースにアクセスできることと、データを書き込むことができる場所があることを確認してください。

提供されているコードでは、 source パラメーターを使用して、 COPY INTO データソースとして構成する場所を指定していることに注意してください。 記述されているように、このコードはルート上の場所を指し DBFS。 外部オブジェクトストレージの場所に対する書き込み権限がある場合は、ソース文字列の dbfs:/ 部分をオブジェクトストレージへのパスに置き換えます。 このコード ブロックでは、このデモをリセットするために再帰的な削除も行われるため、これを本番運用データに指定しないようにし、既存のデータの上書きや削除を避けるために、 /user/{username}/copy-into-demo ネストされたディレクトリを保持してください。

  1. 新しい SQL ノートブックを作成し 、Databricks Runtime 11.3 LTS 以降を実行している クラスターにアタッチします

  2. 次のコードをコピーして実行し、このチュートリアルで使用するストレージの場所とデータベースをリセットします。

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 次のコードをコピーして実行し、データをランダムに生成するために使用されるいくつかのテーブルと関数を構成します。

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

ステップ 2: サンプルデータをクラウドストレージに書き込む

Delta Lake 以外のデータ形式への書き込みは、Databricks ではまれです。 ここで提供されるコードは JSON に書き込み、別のシステムからオブジェクトをストレージにダンプする可能性のある外部システムをシミュレートします。

  1. 次のコードをコピーして実行し、生の JSON データのバッチを書き込みます。

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

ステップ 3: COPY INTO を使用して JSON データをべき等にロードする

ターゲットの Delta Lake テーブルは、 COPY INTOを使用する前に作成する必要があります。 Databricks Runtime 11.3 LTS 以降では、 CREATE TABLE ステートメントにテーブル名以外のものを指定する必要はありません。 以前のバージョンの Databricks Runtime では、空のテーブルを作成するときにスキーマを指定する必要があります。

  1. 次のコードをコピーして実行し、ターゲットの Delta テーブルを作成し、ソースからデータを読み込みます。

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

このアクションはべき等であるため、複数回実行できますが、データは 1 回しか読み込まれません。

ステップ 4: 表の内容をプレビューする

単純な SQL クエリを実行して、このテーブルの内容を手動で確認できます。

  1. 次のコードをコピーして実行し、テーブルをプレビューします。

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

ステップ 5: より多くのデータを読み込み、結果をプレビューする

ステップ 2 から 4 を何度も再実行して、ランダムな生 JSON データの新しいバッチをソースに配置し、それらをべき等にロードして COPY INTO と Delta Lake し、結果をプレビューできます。 これらのステップを順不同または複数回実行して、新しいデータが到着せずに複数の生データのバッチが書き込まれたり、 COPY INTO 複数回実行されたりすることをシミュレートしてみてください。

ステップ 6: チュートリアルのクリーンアップ

このチュートリアルを終了したら、関連するリソースを保持する必要がなくなった場合は、それらをクリーンアップできます。

  1. 次のコードをコピーして実行し、データベースとテーブルを削除し、すべてのデータを削除します。

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. コンピュート リソースを停止するには、 クラスター タブに移動し、 クラスターを終了します

関連リソース