Delta Sharingオープン共有を使用して共有されたデータの読み取り (受信者用)

この記事では、Delta Sharing オープン共有プロトコルを使用して 共有 されているデータを読み取る方法について説明します。 オープン共有では、データ プロバイダーによってチームのメンバーと共有された資格情報ファイルを使用して、共有データへの安全な読み取りアクセスを取得します。 資格情報が有効であり、プロバイダーがデータを共有し続ける限り、アクセスは保持されます。 プロバイダーは、資格情報の有効期限とローテーションを管理します。 データの更新は、ほぼリアルタイムで利用できます。 共有データの読み取りとコピーの作成はできますが、ソース データを変更することはできません。

Databricks 間 Delta Sharing を使用してデータが共有されている場合、データにアクセスするために資格情報ファイルは必要なく、この記事は適用されません。 手順については、「 Databricks-to-Databricks Delta Sharing (受信者用) を使用して共有されたデータを読み取る」を参照してください。

以下のセクションでは、Databricks、Apache Spark、pandas、Power BI を使用して、資格情報ファイルを使用して共有データにアクセスし、読み取る方法について説明します。 Delta Sharing コネクタの完全な一覧と使用方法については、 Delta Sharing オープンソースのドキュメントを参照してください。 共有データへのアクセスで問題が発生した場合は、データ提供者にお問い合わせください。

パートナー統合は、特に明記されていない限り、サードパーティによって提供され、お客様は、その製品およびサービスを使用するために適切なプロバイダーのアカウントを持っている必要があります。 Databricks はこのコンテンツを最新の状態に保つために最善を尽くしていますが、パートナー統合ページのコンテンツの統合や正確性については表明しません。 統合に関して適切なプロバイダーに連絡してください。

始める前に

チームのメンバーは、データ プロバイダーによって共有される資格情報ファイルをダウンロードする必要があります。 オープン共有モデルでアクセスするを参照してください。

セキュリティで保護されたチャンネルを使用して、そのファイルまたはファイルの場所をあなたと共有する必要があります。

Databricks: オープンな共有コネクタ を使用して共有データを読み取る

このセクションでは、開いている共有コネクタを使用して、Databricks ワークスペースのノートブックを使用して共有データにアクセスする方法について説明します。 自分またはチームの別のメンバーが資格情報ファイルを DBFS に格納し、それを使用してデータ プロバイダーの Databricks アカウントに対する認証を行い、データ プロバイダーが共有したデータを読み取ります。

データ プロバイダーが Databricks 間の共有を使用していて、資格情報ファイルを共有していない場合は、 Unity Catalog を使用してデータにアクセスする必要があります。 手順については、「 Databricks-to-Databricks Delta Sharing (受信者用) を使用して共有されたデータを読み取る」を参照してください。

この例では、個別に実行できる複数のセルを持つノートブックを作成します。 代わりに、ノートブック コマンドを同じセルに追加して、順番に実行することもできます。

ステップ 1: 資格情報ファイルを DBFS に格納する (Python の手順)

このステップでは、チームのユーザーが共有データにアクセスできるように、Databricks の Python ノートブックを使用して資格情報ファイルを格納します。

自分またはチームの誰かが既に資格情報ファイルを DBFS に保存している場合は、次の手順にスキップします。

  1. テキスト エディターで、資格情報ファイルを開きます。

  2. Databricks ワークスペースで、[ 新しい> ノートブック] をクリックします。

    • 名前を入力します。

    • ノートブックの既定の言語を Python に設定します。

    • ノートブックにアタッチするクラスターを選択します。

    • [作成]をクリックします。

    ノートブックがノートブック エディターで開きます。

  3. Python または pandas を使用して共有データにアクセスするには、 差分共有 Python コネクタをインストールします。 ノートブック エディターで、次のコマンドを貼り付けます。

    %sh pip install delta-sharing
    
  4. セルを実行します。

    delta-sharing Python ライブラリがまだインストールされていない場合は、クラスターにインストールされます。

  5. 新しいセルに次のコマンドを貼り付けて、資格情報ファイルの内容を DBFS のフォルダーにアップロードします。 変数を次のように置き換えます。

    • <dbfs-path>: 資格情報ファイルを保存するフォルダーへのパス

    • <credential-file-contents>: 資格情報ファイルの内容。 これはファイルへのパスではなく、ファイルのコピーされた内容です。

      資格情報ファイルには、 shareCredentialsVersionendpoint、および bearerTokenの 3 つのフィールドを定義する JSON が含まれています。

      %scala
      dbutils.fs.put("<dbfs-path>/config.share","""
      <credential-file-contents>
      """)
      
  6. セルを実行します。

    資格情報ファイルがアップロードされたら、このセルを削除できます。 すべてのワークスペース ユーザーは DBFS から資格情報ファイルを読み取ることができ、資格情報ファイルはワークスペース内のすべてのクラスターと SQLウェアハウスの DBFS で使用できます。 セルを削除するには、右端のセルアクションメニ セルアクション ュー で [x ] をクリックします。

ステップ 2: ノートブックを使用して共有テーブル を一覧表示および読み取る

このステップでは、共有内のテーブル、または 共有テーブルとパーティションのセットを一覧表示し、テーブルをクエリーします。

  1. Python を使用して、共有内のテーブルを一覧表示します。

    新しいセルに、次のコマンドを貼り付けます。 <dbfs-path> を「 ステップ 1: 資格情報ファイルを DBFS に格納する (Python 命令)」で作成したパスに置き換えます。

    コードが実行されると、Python はクラスター上の DBFS から資格情報ファイルを読み取ります。 パス /dbfs/で DBFS に格納されているデータにアクセスします。

    import delta_sharing
    
    client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
    
    client.list_all_tables()
    
  2. セルを実行します。

    結果は、テーブルの配列と各テーブルのメタデータです。 次の出力は、2 つのテーブルを示しています。

    Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
    

    出力が空の場合、または予期したテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

  3. クエリー 共有テーブル。

    • Scalaを使用する:

      新しいセルに、次のコマンドを貼り付けます。 コードが実行されると、資格情報ファイルが JVM を介して DBFS から読み取られます。

      変数を次のように置き換えます。

      • <profile-path>: 資格情報ファイルの DBFS パス。 たとえば、 /<dbfs-path>/config.share.

      • <share-name>: テーブルの share= の値。

      • <schema-name>: テーブルの schema= の値。

      • <table-name>: テーブルの name= の値。

      %scala
          spark.read.format("deltaSharing")
          .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
      

      セルを実行します。 共有テーブルをロードするたびに、ソースからの新しいデータが表示されます。

    • SQL の使用:

      SQL を使用してデータをクエリーするには、共有テーブルからワークスペースにローカルテーブルを作成し、ローカルテーブルにクエリーを実行します。 共有データは、ローカル テーブルに格納またはキャッシュされません。 ローカル テーブルをクエリーするたびに、共有データの現在の状態が表示されます。

      新しいセルに、次のコマンドを貼り付けます。

      変数を次のように置き換えます。

      • <local-table-name>: ローカルテーブルの名前。

      • <profile-path>: 資格情報ファイルの場所。

      • <share-name>: テーブルの share= の値。

      • <schema-name>: テーブルの schema= の値。

      • <table-name>: テーブルの name= の値。

      %sql
      DROP TABLE IF EXISTS table_name;
      
      CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
      
      SELECT * FROM <local-table-name> LIMIT 10;
      

      コマンドを実行すると、共有データは直接クエリーになります。 テストとして、テーブルはクエリーで、最初の 10 件の結果が返されます。

    出力が空であるか、期待するデータが含まれていない場合は、データ プロバイダーにお問い合わせください。

Apache Spark: 共有データ の読み取り

Spark 3.x 以降を使用して共有データにアクセスするには、次の手順に従います。

これらの手順は、データ プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 オープン共有モデルでアクセスするを参照してください。

Delta Sharing Python コネクタと Spark コネクタ をインストールする

共有データに関連するメタデータ (共有されているテーブルの一覧など) にアクセスするには、次の手順を実行します。 この例では Python を使用しています。

  1. 差分共有 Python コネクタをインストールします。

    pip install delta-sharing
    
  2. Apache Spark コネクタをインストールします。

Spark を使用した共有テーブルの一覧表示

共有内のテーブルを一覧表示します。 次の例では、 <profile-path> を資格情報ファイルの場所に置き換えます。

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

結果は、テーブルの配列と各テーブルのメタデータです。 次の出力は、2 つのテーブルを示しています。

Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

出力が空の場合、または予期したテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

Spark を使用した共有データへのアクセス

以下を実行して、これらの変数を置き換えます。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

  • <version-as-of>:随意。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。

  • <timestamp-as-of>:随意。 指定されたタイムスタンプより前または指定されたタイムスタンプのバージョンでデータをロードします。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 0.6.0 以上の delta-sharing-spark が必要です。

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

以下を実行して、これらの変数を置き換えます。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

  • <version-as-of>:随意。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。

  • <timestamp-as-of>:随意。 指定されたタイムスタンプより前または指定されたタイムスタンプのバージョンでデータをロードします。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 0.6.0 以上の delta-sharing-spark が必要です。

spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

Spark を使用した共有変更データ フィードへのアクセス

テーブル履歴が共有されていて、ソース テーブルで変更データ フィード (CDF) が有効になっている場合は、次のコマンドを実行してこれらの変数を置き換えることで、変更データ フィードにアクセスできます。 delta-sharing-spark 0.5.0 以上が必要です。

開始パラメーターは 1 つだけ指定する必要があります。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

  • <starting-version>:随意。 クエリーの開始バージョン。 長整数型として指定します。

  • <ending-version>:随意。 クエリの終了バージョン。 終了バージョンが指定されていない場合、API は最新のテーブル バージョンを使用します。

  • <starting-timestamp>:随意。 クエリーの開始タイムスタンプは、このタイムスタンプ以上で作成されたバージョンに変換されます。 yyyy-mm-dd hh:mm:ss[.fffffffff]の形式の文字列として指定します。

  • <ending-timestamp>:随意。 クエリーの終了タイムスタンプは、このタイムスタンプ以前に作成されたバージョンに変換されます。 次の形式の文字列として指定します。 yyyy-mm-dd hh:mm:ss[.fffffffff]

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

出力が空であるか、期待するデータが含まれていない場合は、データ プロバイダーにお問い合わせください。

Spark 構造化ストリーミング を使用した共有テーブルへのアクセス

テーブル履歴が共有されている場合は、共有データをストリーム読み取りできます。 0.6.0 以上の delta-sharing-spark が必要です。

サポートされているオプション:

  • ignoreDeletes: データを削除するトランザクションを無視します。

  • ignoreChanges: UPDATEMERGE INTODELETE (パーティション内)、 OVERWRITEなどのデータ変更操作によってソース テーブルでファイルが書き換えられた場合は、更新を再処理します。 変更されていない行は引き続き出力できます。 したがって、ダウンストリームのコンシューマーは重複を処理できる必要があります。 削除はダウンストリームに反映されません。 ignoreChanges ignoreDeletesを包含します。したがって、 ignoreChangesを使用する場合、ソース表の削除または更新によってストリームが中断されることはありません。

  • startingVersion: 開始する共有テーブルのバージョン。 このバージョン (両端を含む) 以降のすべてのテーブル変更は、ストリーミングソースによって読み取られます。

  • startingTimestamp: 開始するタイムスタンプ。 タイムスタンプ (両端を含む) 以降にコミットされたすべてのテーブル変更は、ストリーミングソースによって読み取られます。 例: "2023-01-01 00:00:00.0".

  • maxFilesPerTrigger: すべてのマイクロバッチで考慮される新しいファイルの数。

  • maxBytesPerTrigger: 各マイクロバッチで処理されるデータの量。 このオプションでは、最小入力単位がこの制限よりも大きい場合にストリーミングクエリーを前進させるために、バッチがほぼこの量のデータを処理し、制限を超えて処理する可能性があることを意味する「ソフトマックス」を設定します。

  • readChangeFeed: ストリームは、共有テーブルの変更データ フィードを読み取ります。

サポートされていないオプション:

  • Trigger.availableNow

構造化ストリーミングクエリ のサンプル

spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Databricksでのストリーミング」も参照してください。

削除を有効にしてテーブルを読み取る

プレビュー

この機能はパブリックプレビュー段階です。

削除は、プロバイダーが共有Deltaテーブルで有効にできるストレージ最適化機能です。 「削除とは何ですか?」を参照してください。 。

プロバイダが削除を有効にしてテーブルを共有している場合は、 delta-sharing-spark 3.1 以降を実行しているコンピュートを使用してテーブルを読み取ることができます。 Databricks クラスターを使用している場合は、Databricks Runtime 14.1 以降を実行しているクラスターを使用してバッチ読み取りを実行できます。 CDF およびストリーミング クエリには Databricks Runtime 14.2 以降が必要です。

バッチ クエリは共有テーブルのテーブル機能に基づいて自動的にresponseFormatを解決できるため、バッチ クエリをそのまま実行できます。

変更データフィード (CDF) を読み取るか、削除または列マッピングが有効になっている共有テーブルでストリーミング クエリを実行するには、追加オプションresponseFormat=deltaを設定する必要があります。

次の例は、バッチ、CDF、およびストリーミング クエリを示しています。

import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: 共有データ の読み取り

pandas 0.25.3以降で共有データにアクセスするには次の手順に従ってください。

これらの手順は、データ プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 オープン共有モデルでアクセスするを参照してください。

Delta Sharing Python コネクタ をインストールする

共有されているテーブルの一覧など、共有データに関連するメタデータにアクセスするには、 差分共有 Python コネクタをインストールする必要があります。

pip install delta-sharing

pandas を使用した共有テーブルの一覧表示

共有内のテーブルを一覧表示するには、次のコマンドを実行し、 <profile-path>/config.share 資格情報ファイルの場所に変更します。

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

出力が空の場合、または予期したテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

pandas を使用して共有データにアクセスする

Python を使用して pandas の共有データにアクセスするには、次のコマンドを実行し、変数を次のように置き換えます。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

pandas を使用した共有チェンジデータフィードへのアクセス

Python を使用して pandas の共有テーブルの変更データ フィードにアクセスするには、次のコマンドを実行し、変数を次のように置き換えます。 変更データ フィードは、データ プロバイダーがテーブルの変更データ フィードを共有したかどうかによって、使用できない場合があります。

  • <starting-version>:随意。 クエリーの開始バージョン。

  • <ending-version>:随意。 クエリの終了バージョン。

  • <starting-timestamp>:随意。 クエリーの開始タイムスタンプ。 これは、このタイムスタンプ以降に作成されたバージョンに変換されます。

  • <ending-timestamp>:随意。 クエリーの終了タイムスタンプ。 これは、このタイムスタンプ以前に作成されたバージョンに変換されます。

import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

出力が空であるか、期待するデータが含まれていない場合は、データ プロバイダーにお問い合わせください。

Power BI: 共有データ の読み取り

Power BI Delta Sharing コネクタを使用すると、 Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。

要件

Databricks に接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の操作を行います。

  1. 共有資格情報ファイルをテキスト エディターで開き、エンドポイント URL とトークンを取得します。

  2. Power BI Desktop を開きます。

  3. [ データの取得 ] メニューで、「 Delta Sharing」を検索します。

  4. コネクタを選択し、[ 接続] をクリックします。

  5. 認証情報ファイルからコピーしたエンドポイント URL を [ Delta Sharing サーバー URL ] フィールドに入力します。

  6. 必要に応じて、[ 詳細オプション ] タブで、ダウンロードできる行の最大数の [行制限 ] を設定します。 これは、デフォルトによって 100 万行に設定されます。

  7. OK をクリックします。

  8. [認証] で、資格情報ファイルから取得したトークンを [ベアラー トークン] にコピーします。

  9. [ 接続] をクリックします。

Power BI Delta Sharing コネクタ の制限事項

Power BI Delta Sharing コネクタには、次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 これを確実にするために、コネクタはインポートされる行の数を、Power BI Desktop の [詳細オプション] タブで設定した 行制限 に制限します。

新しい資格情報を要求する

認証情報のアクティベーション URL またはダウンロードした認証情報が紛失、破損、または侵害された場合、またはプロバイダーから新しい認証情報が送信されずに認証情報の有効期限が切れた場合は、プロバイダーに連絡して新しい認証情報をリクエストしてください。