Apache Spark DataFrames を使用したDelta Sharing共有テーブルの読み取り

この記事では、Apache Spark を使用して、 Delta Sharing を使用して共有されるデータをクエリーする構文の例を示します。 deltasharing キーワードを DataFrame 操作の形式オプションとして使用します。

共有データを照会するためのその他のオプション

また、次の例のように、メタストアに登録されている Delta Sharing カタログ内の共有テーブル名を使用するクエリーを作成することもできます。

SELECT * FROM shared_table_name
spark.read.table("shared_table_name")

Databricks での Delta Sharing の構成と、共有テーブル名を使用したデータのクエリの詳細については、「 Databricks-to-Databricks Delta Sharing (受信者用) を使用して共有されたデータを読み取る」を参照してください。

構造化ストリーミングを使用すると、共有テーブル内のレコードを段階的に処理できます。 構造化ストリーミングを使用するには、テーブルの履歴共有を有効にする必要があります。 ALTER SHARE を参照してください。 履歴の共有には、Databricks Runtime 12.2 LTS 以上が必要です。

共有テーブルでソース Delta テーブルでチェンジデータフィードが有効になっていて、共有で履歴が有効になっている場合は、構造化ストリーミングまたはバッチ操作で差分共有を読み取るときにチェンジデータフィードを使用できます。 「Databricks で Delta Lake チェンジデータフィードを使用する」を参照してください。

Delta Sharing フォーマットキーワードによる読み込み

deltasharing キーワードは、次の例に示すように、Apache Spark DataFrame の読み取り操作でサポートされています。

df = (spark.read
  .format("deltasharing")
  .load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

Delta Sharing 共有テーブルのチェンジデータフィードの読み込み

履歴が共有され、チェンジデータフィードが有効になっているテーブルの場合、 Apache Spark DataFramesを使用してチェンジデータフィード レコードを読み取ることができます。 履歴の共有には、Databricks Runtime 12.2 LTS 以上が必要です。

df = (spark.read
  .format("deltasharing")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

構造化ストリーミングを使用した Delta Sharing 共有テーブルの読み取り

履歴が共有されているテーブルの場合、共有テーブルを構造化ストリーミングのソースとして使用できます。 履歴の共有には、Databricks Runtime 12.2 LTS 以上が必要です。

streaming_df = (spark.readStream
  .format("deltasharing")
  .load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

# If CDF is enabled on the source table
streaming_cdf_df = (spark.readStream
  .format("deltasharing")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)