Google Cloud Storageからのデータのオンボード

この記事では、Google Cloud Storage から新しい Databricks ワークスペースにデータをオンボードする方法について説明します。 Unity Catalog ボリューム (推奨) または Unity Catalog 外部ロケーションに対応するクラウドオブジェクトストレージの場所にあるソース データに安全にアクセスする方法を学習します。 次に、Delta Live Tables で Auto Loader を使用して、Unity Catalog マネージド テーブルにデータを増分的に取り込む方法について説明します。

始める前に

管理者でない場合、この記事では、管理者から次の情報が提供されていることを前提としています。

重要

これらの前提条件について質問がある場合は、アカウント管理者にお問い合わせください。

ステップ 1: クラスターを作成する

クラスターを作成するには、次の手順を実行します。

  1. Databricks ワークスペースにサインインします。

  2. サイドバーで、「 新規 > クラスター」をクリックします。

  3. クラスター UI で、クラスターの一意の名前を指定します。

  4. ソース データへのパスがボリューム パスの場合、 Databricksランタイム バージョンで13.2 以上を選択します。

  5. [クラスターの作成] をクリックします。

ステップ 2: データ探索ノートブックを作成する

このセクションでは、データパイプラインを作成する前にデータを理解できるように、データ探索ノートブックを作成する方法について説明します。

  1. サイドバーで、[+新規]、[ノートブック] の順にクリックします。

    ノートブックは、最後に使用したクラスター (この場合は 、「ステップ 1: クラスターを作成する」で作成したクラスター) に自動的にアタッチされます。

  2. ノートブックの名前を入力します。

  3. 言語ボタンをクリックし、ドロップダウンメニューから Python または SQL を選択します。 デフォルトではPythonが選択されています。

  4. GCS でソース データへのデータ アクセスを確認するには、次のコードをノートブックのセルに貼り付け、[ 実行メニュー]、[ セルの実行] の順にクリックします。

    LIST '<path-to-source-data>'
    
    %fs ls '<path-to-source-data>'
    

    <path-to-source-data> は、データを含むディレクトリへのパスに置き換えます。

    これにより、データセットを含むディレクトリの内容が表示されます。

  5. レコードのサンプルを表示して各レコードの内容と形式をよりよく理解するには、以下をノートブックのセルに貼り付け、実行メニューをクリックし、 [セルの実行] をクリックします。

    SELECT * from read_files('<path-to-source-data>', format => '<file-format>') LIMIT 10
    
    spark.read.format('<file-format>').load('<path-to-source-data>').limit(10).display()
    

    次の値を置き換えます。

    • <file-format>: サポートされているファイル形式。 「ファイル形式のオプション」を参照してください。

    • <path to source data>: データを含むディレクトリ内のファイルへのパス。

    指定したファイルの最初の 10 個のレコードが表示されます。

ステップ 3: 生データを取り込む

生データを取り込むには、次の操作を行います。

  1. サイドバーで、「新規ノートブック」>をクリックします。

    ノートブックは、最後に使用したクラスター (この場合は、この記事の前半で作成したクラスター) に自動的にアタッチされます。

  2. ノートブックの名前を入力します。

  3. 言語ボタンをクリックし、ドロップダウンメニューから Python または SQL を選択します。 デフォルトではPythonが選択されています。

  4. 次のコードをノートブックのセルに貼り付けます。

    CREATE OR REFRESH STREAMING TABLE
      <table-name>
    AS SELECT
      *
    FROM
      STREAM read_files(
        '<path-to-source-data>',
        format => '<file-format>'
      )
    
    @dlt.table(table_properties={'quality': 'bronze'})
    def <table-name>():
      return (
         spark.readStream.format('cloudFiles')
         .option('cloudFiles.format', '<file-format>')
         .load(f'{<path-to-source-data>}')
     )
    

    次の値を置き換えます。

    • <table-name>: 取り込まれたレコードを含むテーブルの名前。

    • <path-to-source-data>: ソース データへのパス。

    • <file-format>: サポートされているファイル形式。 「ファイル形式のオプション」を参照してください。

注:

Delta Live Tables は、ノートブック セルで対話的に実行するようには設計されていません。 Delta Live Tables 構文を含むセルをノートブックで実行すると、クエリが構文的に有効かどうかを示すメッセージが返されますが、クエリ ロジックは実行されません。 次のステップでは、作成したインジェスト ノートブックからパイプラインを作成する方法について説明します。

ステップ 4: パイプラインを作成して発行する

パイプラインを作成して Unity Catalog に発行するには、次の操作を行います。

  1. サイドバーで、[ワークフロー]、[Delta Live Tables ]タブの順にクリックし、[パイプラインの作成]をクリックします。

  2. パイプラインの名前を入力します。

  3. パイプライン モードの場合は、トリガー済み を選択します。

  4. ソース コードの場合は、パイプライン ソース コードが含まれているノートブックを選択します。

  5. [宛先] で [Unity Catalog] を選択します。

  6. テーブルが Unity Catalog によって管理され、親スキーマにアクセスできるすべてのユーザーがクエリを実行できるようにするには、ドロップダウン リストから [カタログ ] と [ターゲット スキーマ ] を選択します。

  7. クラスター作成のアクセス許可がない場合は、 をサポートする クラスターポリシー Delta Live Tablesをドロップダウン リストから選択します。

  8. [Advanced] で、チャンネル[Preview] に設定します。

  9. 他のすべてのデフォルト値をそのまま使用し、[ 作成] をクリックします。

ステップ 5: パイプラインをスケジュールする

パイプラインをスケジュールするには、次の操作を行います。

  1. サイドバーで、「 Delta Live Tables」をクリックします。

  2. スケジュールするパイプラインの名前をクリックします。

  3. [スケジュール] をクリックして>スケジュールを追加します

  4. [ジョブ名] に、ジョブの名前を入力します。

  5. スケジュールスケジュールに設定します。

  6. 期間、開始時刻、およびタイムゾーンを指定します。

  7. パイプラインの開始、成功、または失敗時にアラートを受信するように、1つ以上のEメール アドレスを設定します。

  8. [作成]をクリックします。

次のステップ