Delta Sharingオープン共有を使用して共有されたデータを読み取る(受信者向け)

この記事では、Delta Sharing オープン共有 プロトコルを使用して共有されたデータを読み取る方法について説明します。 これには、 Databricks、 Apache Spark、 Pandas、 Power BI、および Tableauを使用して共有データを読み取る手順が含まれています。

オープン共有では、データ プロバイダーによってチームのメンバーと共有された資格情報ファイルを使用して、共有データへの安全な読み取りアクセスを取得します。 資格情報が有効で、プロバイダーが引き続きデータを共有する限り、アクセスは保持されます。 プロバイダーは、資格情報の有効期限とローテーションを管理します。 データの更新はほぼリアルタイムで利用できます。 共有データの読み取りとコピーは可能ですが、ソースデータを変更することはできません。

注:

Databricks 間 Delta Sharing を使用してデータが共有されている場合、データにアクセスするために資格情報ファイルは必要なく、この記事は適用されません。 手順については、「 Databricks間Delta Sharing (受信者用) を使用して共有されたデータを読み取る」を参照してください。

次のセクションでは、 Databricks 、 Apache Spark 、 Pandas 、 Power BIを使用して資格情報ファイルを共有データにアクセスし、読み取る方法について説明します。 Delta Sharingコネクタの完全なリストとその使用方法については、 Delta Sharingオープンソース ドキュメントを参照してください。 共有データにアクセスする際に問題が発生した場合は、データ プロバイダーにお問い合わせください。

注:

パートナー統合は、特に記載がない限り、サードパーティによって提供されており、その製品およびサービスを使用するには、適切なプロバイダーのアカウントを持っている必要があります。 Databricks は、このコンテンツを最新の状態に保つために最善を尽くしていますが、パートナー統合ページの統合またはコンテンツの正確性については一切表明しません。 統合に関して、適切なプロバイダーに連絡してください。

始める前に

チームのメンバーは、データ プロバイダーによって共有される資格情報ファイルをダウンロードする必要があります。 「オープン共有モデルでアクセスを取得する」を参照してください。

そのファイルまたはファイルの場所をあなたと共有するには、安全なチャンネルを使用する必要があります。

Databricks: オープンな共有コネクタを使用して共有データを読み取る

このセクションでは、オープン共有コネクタを使用して、Databricks ワークスペース内のノートブックを使用して共有データにアクセスする方法について説明します。 あなたまたはチームの他のメンバーが資格情報ファイルを DBFS に保存し、それを使用してデータ プロバイダーの Databricks アカウントに認証し、データ プロバイダーが共有したデータを読み取ります。

注:

データ プロバイダーが Databricks 間の共有を使用していて、資格情報ファイルを共有していない場合は、 Unity Catalog を使用してデータにアクセスする必要があります。 手順については、「 Databricks間Delta Sharing (受信者用) を使用して共有されたデータを読み取る」を参照してください。

この例では、独立して実行できる複数のセルを含むノートブックを作成します。 代わりに、ノートブック コマンドを同じセルに追加して、順番に実行することもできます。

ステップ 1: 資格情報ファイルを DBFS に保存する (Python の手順)

このステップでは、 のPython ノートブックを使用して資格情報ファイルを保存し、チームのユーザーが共有データにアクセスできるようにします。Databricks

自分またはチームの誰かがすでに資格情報ファイルをDBFSに保存している場合は、次の手順に進んでください。

  1. テキストエディタで、資格情報ファイルを開きます。

  2. Databricks ワークスペースで、 [新規] > [ノートブック]をクリックします。

    • 名前を入力してください。

    • ノートブックのデフォルト言語を Python に設定します。

    • ノートブックに接続するクラスターを選択します。

    • [作成]をクリックします。

    ノートブックがノートブック エディターで開きます。

  3. Python または pandas を使用して共有データにアクセスするには、 delta-sharing Python コネクタをインストールします。 ノートブック エディターで、次のコマンドを貼り付けます。

    %sh pip install delta-sharing
    
  4. セルを実行します。

    delta-sharing Python ライブラリがまだインストールされていない場合は、クラスターにインストールされます。

  5. 新しいセルに次のコマンドを貼り付けます。このコマンドは、資格情報ファイルの内容を DBFS 内のフォルダーにアップロードします。 変数を次のように置き換えます。

    • <dbfs-path>: 資格情報ファイルを保存するフォルダーへのパス

    • <credential-file-contents>: 資格情報ファイルの内容。 これはファイルへのパスではなく、ファイルのコピーされた内容です。

      資格情報ファイルには、3 つのフィールドshareCredentialsVersionendpointbearerTokenを定義する JSON が含まれています。

      %scala
      dbutils.fs.put("<dbfs-path>/config.share","""
      <credential-file-contents>
      """)
      
  6. セルを実行します。

    資格情報ファイルがアップロードされたら、このセルを削除できます。 すべてのワークスペース ユーザーはDBFS DBFSから資格証明ファイルを読み取ることができ、資格証明ファイルはワークスペース内のすべてのクラスターおよびSQL ウェアハウスの で使用できます。セルを削除するには、右端のセルアクションメニューセルアクション で x をクリックします。

ステップ 2: ノートブックを使用して共有テーブルを一覧表示および読み取る

このステップでは、共有(共有テーブルとパーティションのセット) 内のテーブルを一覧表示し、テーブルをクエリします。

  1. Python を使用して、共有内のテーブルを一覧表示します。

    新しいセルに、次のコマンドを貼り付けます。 <dbfs-path> を「 ステップ 1: 資格情報ファイルを DBFS に格納する (Pythonの手順)」で作成したパスに置き換えます。

    コードが実行されると、Python はクラスター上の DBFS から資格情報ファイルを読み取ります。 パス/dbfs/の DBFS に保存されているデータにアクセスします。

    import delta_sharing
    
    client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
    
    client.list_all_tables()
    
  2. セルを実行します。

    結果は、テーブルの配列と各テーブルのメタデータです。 次の出力は、2 つのテーブルを示しています。

    Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
    

    出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

  3. 共有テーブルをクエリします。

    • Scala の使用:

      新しいセルに次のコマンドを貼り付けます。 コードが実行されると、資格情報ファイルが JVM を介して DBFS から読み取られます。

      変数を次のように置き換えます。

      • <profile-path>: 資格情報ファイルの DBFS パス。 たとえば、 /<dbfs-path>/config.share.

      • <share-name>: テーブルの share= の値。

      • <schema-name>: テーブルの schema= の値。

      • <table-name>: テーブルの name= の値。

      %scala
          spark.read.format("deltaSharing")
          .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
      

      セルを実行します。 共有テーブルをロードするたびに、ソースからの最新データが表示されます。

    • SQL の使用:

      SQL を使用してデータをクエリするには、共有テーブルからワークスペースにローカル テーブルを作成し、ローカル テーブルに対してクエリを実行します。 共有データは、ローカルテーブルに保存またはキャッシュされません。 ローカルテーブルをクエリするたびに、共有データの現在の状態が表示されます。

      新しいセルに、次のコマンドを貼り付けます。

      変数を次のように置き換えます。

      • <local-table-name>: ローカル テーブルの名前。

      • <profile-path>: 資格情報ファイルの場所。

      • <share-name>: テーブルの share= の値。

      • <schema-name>: テーブルの schema= の値。

      • <table-name>: テーブルの name= の値。

      %sql
      DROP TABLE IF EXISTS table_name;
      
      CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
      
      SELECT * FROM <local-table-name> LIMIT 10;
      

      コマンドを実行すると、共有データが直接照会されます。 テストとして、テーブルがクエリされ、最初の 10 件の結果が返されます。

    出力が空の場合、または期待するデータが含まれていない場合は、データ プロバイダーにお問い合わせください。

Apache Spark: 共有データの読み取り

Spark 3.x 以降を使用して共有データにアクセスするには、次のステップに従ってください。

これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

Delta Sharing Python コネクタと Spark コネクタをインストールする

共有データに関連するメタデータ (共有されているテーブルのリストなど) にアクセスするには、次の操作を行います。 この例では Python を使用します。

  1. delta-sharing Python コネクタをインストールします。

    pip install delta-sharing
    
  2. Apache Spark コネクタをインストールします。

Sparkを使用した共有テーブルの一覧表示

共有内のテーブルを一覧表示します。 次の例では、 <profile-path> を資格情報ファイルの場所に置き換えます。

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

結果は、テーブルの配列と各テーブルのメタデータです。 次の出力は、2 つのテーブルを示しています。

Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

Sparkを使用した共有データへのアクセス

以下の変数を置き換えて、次のコマンドを実行します。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

  • <version-as-of>:オプション。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。

  • <timestamp-as-of>:オプション。 指定されたタイムスタンプより前または指定されたタイムスタンプのバージョンでデータをロードします。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 0.6.0 以上の delta-sharing-spark が必要です。

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

以下の変数を置き換えて、次のコマンドを実行します。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

  • <version-as-of>:オプション。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。

  • <timestamp-as-of>:オプション。 指定されたタイムスタンプより前または指定されたタイムスタンプのバージョンでデータをロードします。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 0.6.0 以上の delta-sharing-spark が必要です。

spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

Sparkを使用した共有変更データ フィードへのアクセス

テーブル履歴が共有されており、ソース テーブルで変更データフィード (CDF) が有効になっている場合は、これらの変数を置き換えて次のコマンドを実行することで変更データフィードにアクセスできます。 delta-sharing-spark 0.5.0 以降が必要です。

開始点は 1 つだけ指定する必要があります。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

  • <starting-version>:オプション。 クエリーの開始バージョン。 長整数型として指定します。

  • <ending-version>:オプション。 クエリの終了バージョン。 終了バージョンが指定されていない場合、API は最新のテーブル バージョンを使用します。

  • <starting-timestamp>:オプション。 クエリーの開始タイムスタンプは、このタイムスタンプ以上で作成されたバージョンに変換されます。 yyyy-mm-dd hh:mm:ss[.fffffffff]の形式の文字列として指定します。

  • <ending-timestamp>:オプション。 クエリーの終了タイムスタンプは、このタイムスタンプ以前に作成されたバージョンに変換されます。 次の形式の文字列として指定します。 yyyy-mm-dd hh:mm:ss[.fffffffff]

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

出力が空の場合、または期待するデータが含まれていない場合は、データ プロバイダーにお問い合わせください。

Spark 構造化ストリーミングを使用した共有テーブルへのアクセス

テーブル履歴が共有されている場合は、共有データをストリーム読み取りできます。 delta-sharing-spark 0.6.0 以降が必要です。

サポートされているオプション:

  • ignoreDeletes: データを削除するトランザクションを無視します。

  • ignoreChanges: UPDATEMERGE INTODELETE (パーティション内)、 OVERWRITEなどのデータ変更操作によりソース テーブルでファイルが書き換えられた場合は、更新を再処理します。 変更されていない行は引き続き出力できます。 したがって、ダウンストリームの消費者は重複を処理できる必要があります。 削除はダウンストリームに反映されません。 ignoreChangesignoreDeletesを包含します。 したがって、 ignoreChangesを使用すると、ソース テーブルの削除や更新によってストリームが中断されることはありません。

  • startingVersion: 開始する共有テーブルのバージョン。 このバージョン (含む) 以降のすべてのテーブルの変更は、ストリーミング ソースによって読み取られます。

  • startingTimestamp: 開始するタイムスタンプ。 タイムスタンプ(含む)以降にコミットされたすべてのテーブル変更は、ストリーミング ソースによって読み取られます。 例: "2023-01-01 00:00:00.0"

  • maxFilesPerTrigger: 各マイクロバッチで考慮される新しいファイルの数。

  • maxBytesPerTrigger: 各マイクロバッチで処理されるデータの量。 このオプションは「ソフト最大値」を設定します。つまり、バッチはほぼこの量のデータを処理し、最小の入力単位がこの制限より大きい場合にはストリーミング クエリを進めるために制限を超えて処理する場合があります。

  • readChangeFeed: 共有テーブルの変更データフィードをストリーム読み取りします。

サポートされていないオプション:

  • Trigger.availableNow

構造化ストリーミングクエリのサンプル

spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Databricksでのストリーミング」も参照してください。

削除ベクトルまたは列マッピングが有効になっているテーブルを読み取る

プレビュー

この機能はパブリックプレビュー段階です。

削除ベクトルは、プロバイダーが共有 Delta テーブルで有効にできるストレージ最適化機能です。 「削除とは何ですか?」を参照してください。 。

Databricks は Delta テーブルの列マッピングもサポートしています。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

プロバイダが削除または列マッピングを有効にしてテーブルを共有している場合は、 delta-sharing-spark 3.1 以降を実行しているコンピュートを使用してテーブルを読み取ることができます。 Databricksクラスターを使用している場合は、 Databricks Runtime 14.1 以降を実行しているクラスターを使用してバッチ読み取りを実行できます。 CDF およびストリーミング クエリには、Databricks Runtime 14.2 以上が必要です。

共有テーブルのテーブル機能に基づいてresponseFormatを自動的に解決できるため、バッチ クエリをそのまま実行できます。

変更データフィード (CDF) を読み取るか、削除または列マッピングが有効になっている共有テーブルでストリーミング クエリを実行するには、追加オプションresponseFormat=deltaを設定する必要があります。

次の例は、バッチ、CDF、およびストリーミング クエリを示しています。

import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: 共有データの読み取り

pandas 0.25.3以降で共有データにアクセスするには次の手順に従ってください。

これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

Delta Sharing Python コネクタをインストールする

共有されているテーブルの一覧など、共有データに関連するメタデータにアクセスするには、 delta-sharing Python コネクタをインストールする必要があります。

pip install delta-sharing

pandas を使用した共有テーブルの一覧表示

共有内のテーブルを一覧表示するには、 <profile-path>/config.share資格情報ファイルの場所に置き換えて、次のコマンドを実行します。

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

pandas を使用して共有データにアクセスする

を使用してPandas Pythonの共有データにアクセスするには、変数を次のように置き換えて以下を実行します。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

pandas を使用した共有チェンジデータフィードへのアクセス

実行を使用して の共有テーブルの変更データフィードにアクセスするには、次のように変数を置き換えます。PandasPythonデータプロバイダーがテーブルの変更データフィードを共有したかどうかによっては、変更データフィードを利用できない場合があります。

  • <starting-version>:オプション。 クエリーの開始バージョン。

  • <ending-version>:オプション。 クエリの終了バージョン。

  • <starting-timestamp>:オプション。 クエリーの開始タイムスタンプ。 これは、このタイムスタンプ以降に作成されたバージョンに変換されます。

  • <ending-timestamp>:オプション。 クエリーの終了タイムスタンプ。 これは、このタイムスタンプ以前に作成されたバージョンに変換されます。

import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

出力が空の場合、または期待するデータが含まれていない場合は、データ プロバイダーにお問い合わせください。

Power BI: 共有データの読み取り

Power BI Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されたデータセットを検出、分析、視覚化できます。

要件

Databricksに接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の手順を実行します。

  1. 共有資格情報ファイルをテキスト エディターで開き、エンドポイント URL とトークンを取得します。

  2. Power BI Desktop を開きます。

  3. [データの取得]メニューで、 Delta Sharingを検索します。

  4. コネクタを選択し、[ 接続] をクリックします。

  5. 資格情報ファイルからコピーしたエンドポイント URL をDelta Sharing Server URLフィールドに入力します。

  6. 必要に応じて、[ 詳細オプション ] タブで、ダウンロードできる行の最大数の [行制限 ] を設定します。 これは、デフォルトで 100 万行に設定されます。

  7. OK」をクリックします。

  8. Authenticationの場合は、資格情報ファイルから取得した Windows をBearer ウイルスにコピーします。

  9. 接続」をクリックします。

Power BI Delta Sharing コネクタの制限事項

Power BI Delta Sharingコネクタには次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタは、インポートされる行の数を、Power BI Desktop の [詳細オプション] タブで設定した行制限に制限します。

Tableau: 共有データの読み取り

Tableau Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。

要件

Databricksに接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の手順を実行します。

  1. Tableau Exchangeにアクセスし、指示に従って Delta Sharing Connector をダウンロードし、適切なデスクトップ フォルダーに配置します。

  2. Tableau Desktop を開きます。

  3. コネクタページで、「Delta Sharing by Databricks」を検索します。

  4. [ 共有ファイルのアップロード] を選択し、プロバイダーによって共有された資格情報ファイルを選択します。

  5. [ データを取得] をクリックします。

  6. Data Explorerでテーブルを選択します。

  7. 必要に応じて、SQL フィルターまたは行制限を追加します。

  8. テーブル・データの取得」をクリックします。

Tableau Delta Sharing コネクタの制限

Tableau Delta Sharingコネクタには次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタはインポートされる行の数を Tableau で設定した行制限に制限します。

  • すべての列は String型として返されます。

  • SQL フィルターは、Delta Sharing サーバーがpredicateHintをサポートしている場合にのみ機能します。

新しい資格情報を要求する

資格情報のアクティブ化 URL またはダウンロードした資格情報が紛失、破損、または侵害された場合、または資格情報の有効期限が切れてもプロバイダーから新しい資格情報が送られてこない場合は、プロバイダーに連絡して新しい資格情報をリクエストしてください。