Delta テーブル ストリーミングの読み取りと書き込み

Delta LakeはreadStreamwriteStreamを通じてSpark構造化ストリーミングと高度に統合されています。Delta Lakeは、一般的にストリーミングシステムやファイルに関連する以下のような制限の多くにとらわれません:

  • 低遅延の取り込みによって生成された小さなファイルの合体。

  • 複数のストリーム (または並列 バッチジョブ) で "一度だけ" 処理を維持する。

  • ストリームのソースとしてファイルを使用する場合に、どのファイルが新しいかを効率的に検出します。

Delta Lakeでのストリーム静的結合に関する情報については、「ストリーム静的結合」を参照してください。

ソースとしてのDeltaテーブル

構造化ストリーミングは、テーブル Delta 増分読み取りを行います。 ストリーミング クエリーが Delta テーブルに対してアクティブになっている間、新しいテーブル バージョンがソース テーブルにコミットされると、新しいレコードはべき等に処理されます。

次のコード例は、テーブル名またはファイル パスを使用してストリーミング読み取りを構成する方法を示しています。

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")
spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

重要

Deltaテーブルのスキーマが、そのテーブルに対してストリーミング読み取りを開始した後に変更された場合、クエリは失敗します。ほとんどのスキーマの変更では、ストリームを再起動してスキーマの不一致を解決することで処理を続行することができます。

Databricks Runtime 12.2 LTS以前では、列の名前変更や削除などの非加法的な変更が行われた列マッピングが有効になっているDeltaテーブルからストリームすることはできません。 詳細については、 「列マッピングとスキーマ変更によるストリーミング」を参照してください。

入力レートの制限

マイクロバッチの制御には、以下のオプションがあります:

  • maxFilesPerTrigger:各マイクロバッチで考慮される新しいファイルの数。デフォルトは1000です。

  • maxBytesPerTrigger:各マイクロバッチで処理されるデータの量。このオプションでは「ソフトマックス」を設定します。つまり、バッチはおおよそこの量のデータを処理し、最小の入力単位がこの制限よりも大きい場合にストリーミングクエリを進めるために、制限を超えるデータを処理する可能性があります。これはデフォルトでは設定されていません。

maxFilesPerTriggermaxBytesPerTriggerを組み合わせて使用すると、マイクロバッチは maxFilesPerTriggerまたはmaxBytesPerTriggerの制限に達するまでデータを処理します。

logRetentionDuration構成によってソーステーブルのトランザクションがクリーンアップされ、ストリーミングクエリーがそれらのバージョンを処理しようとする場合、デフォルトではクエリーはデータ損失を回避できません。オプション failOnDataLossfalse に設定すると、失われたデータを無視して処理を続行できます。

Delta Lakeのチェンジデータキャプチャ(CDC)フィードのストリーミング

Delta Lake変更データフィードは、更新や削除を含むDeltaテーブルへの変更を記録します。有効にすると、変更データフィードからストリーミングし、ダウンストリームテーブルへの挿入、更新、および削除を処理するロジックを記述できます。変更データフィードのデータ出力は、それが記述するデルタテーブルとは若干異なりますが、これは、メダリオンアーキテクチャのダウンストリームテーブルにインクリメンタルな変更を伝播するための解決策となります。

重要

Databricks Runtime 12.2 LTS以前では、列の名前変更や削除などの非加法的な変更が行われた列マッピングが有効になっているDeltaテーブルに対して、データ フィードからストリームを実行することはできません。 列マッピングとスキーマ変更によるストリーミングを参照してください。

更新と削除を無視する

構造化ストリーミングは、追加でない入力を処理せず、ソースとして使用されているテーブルに何らかの変更が発生すると例外をスローします。自動的にダウンストリームに伝搬することができない変更に対処するには、主に2つの戦略があります:

  • 出力とチェックポイントを削除して、ストリームを最初から再開する。

  • 次の2つのオプションのどちらかを設定する:

    • ignoreDeletes:パーティション境界でデータを削除するトランザクションを無視する。

    • skipChangeCommits:既存のレコードを削除または変更するトランザクションを無視する。skipChangeCommitsignoreDeletesを包含します。

Databricks Runtime 12.2 LTS 以降では、 skipChangeCommits以前の設定ignoreChangesを非推奨にします。 Databricks Runtime 11.3 LTS 以前では、 ignoreChangesが唯一サポートされるオプションです。

ignoreChangesのセマンティクは、skipChangeCommitsと大きく異なります。ignoreChangesを有効にすると、UPDATEMERGE INTODELETE(パーティション内)、またはOVERWRITEなどのデータ変更操作の後に、ソーステーブル内の書き換えられたデータファイルが再出力されます。変更されていない行は、しばしば新しい行と一緒に出力されるため、ダウンストリームのコンシューマーは重複を処理できる必要があります。削除はダウンストリームに反映されません。ignoreChangesignoreDeletesを包含します。

skipChangeCommits はファイルの変更操作を完全に無視します。UPDATEMERGE INTODELETEOVERWRITEなどのデータ変更操作によってソーステーブル内で書き換えられたデータファイルは完全に無視されます。アップストリームのソーステーブルに変更を反映させるには、これらの変更を伝搬するためのロジックを別途実装する必要があります。

ignoreChanges で構成されたワークロードは、既知のセマンティクスを使用して引き続き動作しますが、Databricks では、すべての新しいワークロードに skipChangeCommits を使用することをお勧めします。ignoreChanges を使用してワークロードを skipChangeCommits に移行するには、リファクタリング ロジックが必要です。

たとえば、dateによってパーティション化された、dateuser_email、およびaction列を含むテーブルuser_eventsがあるとします。user_eventsテーブルからストリーミングしており、GDPRのためテーブルからデータを削除する必要があります。

パーティション境界で削除すると (つまり、 WHERE がパーティション列にある場合)、ファイルはすでに値でセグメント化されているため、削除によってそれらのファイルがメタデータから削除されます。 データのパーティション全体を削除する場合は、次を使用できます。

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

複数のパーティション内のデータを削除する場合 (この例では、 user_emailでフィルター処理する場合) は、次の構文を使用します。

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

UPDATE ステートメントで user_email を更新すると、問題の user_email を含むファイルが書き換えられます。変更されたデータ ファイルを無視するには、 skipChangeCommits を使用します。

初期位置の指定

次のオプションを使用すると、テーブル全体を処理せずに、Delta Lakeストリーミングソースの開始点を指定できます。

  • startingVersion: 開始する Delta Lake バージョン。 Databricks では、ほとんどのワークロードでこのオプションを省略することをお勧めします。 設定しない場合、ストリームは、その時点でのテーブルの完全なスナップショットを含む、利用可能な最新バージョンから開始されます。

    指定した場合、ストリームは、指定されたバージョン (両端を含む) から始まる Delta テーブルへのすべての変更を読み取ります。 指定したバージョンが使用できなくなった場合、ストリームは開始できません。 コミット・バージョンは、 DESCRIBE HISTORY コマンド出力の version 列から取得できます。

    最新の変更のみを返すには、 latestを指定します。

  • startingTimestamp: タイムスタンプを開始するポイントです。タイムスタンプ以降に行われたすべてのテーブル変更は、ストリーミングリーダーによって読み取られます。提供されたタイムスタンプがすべてのテーブルのコミットより前にある場合、ストリーミングの読み取りは利用可能な最も早いタイムスタンプから始まります。これは次のいずれかになります。

    • タイムスタンプ文字列。たとえば、"2019-01-01T00:00:00.000Z"などです。

    • 日付文字列。たとえば、"2019-01-01"などです。

両方のオプションを同時に設定することはできません。 これらは、新しいストリーミング クエリーを開始するときにのみ有効になります。 ストリーミングクエリーが開始され、進行状況がそのチェックポイントに記録されている場合、これらのオプションは無視されます。

重要

指定したバージョン、またはタイムスタンプからストリーミングソースを開始できますが、ストリーミングソースのスキーマは常にDeltaテーブルの最新のスキーマになります。指定されたバージョンまたはタイムスタンプ以降に、Deltaテーブルに互換性のないスキーマ変更がないことを確認する必要があります。そうしないと、ストリーミングソースが不正なスキーマでデータを読み取った際に、間違った結果を返す可能性があります。

たとえば、user_eventsというテーブルがあるとします。バージョン5以降の変更を読み取りたい場合は、次を使用します:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

2018年10月18日以降の変更を読み取りたい場合は、次を使用します。

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

データを取りこぼすことなく初期スナップショットを処理する

この機能は、Databricks Runtime 11.3 LTS 以降で利用できます。 この機能は パブリック プレビュー段階です。

Deltaテーブルをストリームソースとして使用する場合、クエリはまずテーブルに存在するすべてのデータを処理します。このバージョンのDeltaテーブルは、初期スナップショットと呼ばれています。デフォルトでは、Deltaテーブルのデータファイルは、最後に更新されたファイルに基づいて処理されます。ただし、最終変更時刻は必ずしも記録イベントの時間順を表すわけではありません。

電子透かしが定義されたステートフルストリーミングクエリでは、ファイルを更新時間順に処理すると、レコードが誤った順序で処理される可能性があります。このため、電子透かしによって記録がレイトイベントとして取りこぼされる可能性があります。

次のオプションを有効にすることで、データドロップの問題を回避できます。

  • withEventTimeOrder: 初期スナップショットをイベント時間順に処理するかどうか。

イベント時間順序を有効にすると、初期スナップショットデータのイベント時間の範囲がタイムバケットに分割されます。各マイクロバッチは、時間範囲内のデータをフィルタリングすることによってバケットを処理します。maxFilesPerTrigger および maxBytesPerTrigger 構成オプションは引き続きマイクロバッチサイズの制御に適用できますが、処理の性質上、近似的な方法でのみ適用されます。

以下の図は、このプロセスを示しています。

初期スナップショット

この機能に関する重要な情報:

  • データドロップの問題は、ステートフルストリーミングクエリーの最初の Delta スナップショットがデフォルトの順序で処理される場合にのみ発生します。

  • 最初のスナップショットの処理中にストリームクエリーが開始されると、それ以降は withEventTimeOrder を変更することはできません。withEventTimeOrder を変更して再起動するには、チェックポイントを削除する必要があります。

  • withEventTimeOrder を有効にしてストリームクエリーを実行している場合、最初のスナップショット処理が完了するまで、この機能をサポートしていないDBRバージョンにダウングレードすることはできません。ダウングレードが必要な場合は、最初のスナップショットが終了するのを待つか、チェックポイントを削除してクエリーを再開してください。

  • この機能は、以下のような稀なケースではサポートされていません。

    • イベント時間列は生成された列であり、Delta ソースと電子透かしの間に非投影変換がある場合。

    • ストリームクエリーに複数の Delta ソースを持つ電子透かしがある場合。

  • イベント時間順を有効にすると、Delta の初期スナップショット処理のパフォーマンスが低下する可能性があります。

  • 各マイクロバッチは、初期スナップショットをスキャンして、対応するイベント時間範囲内のデータをフィルタリングします。 フィルター操作を高速化するには、データ スキップを適用できるように、Delta ソース列をイベント時刻として使用することをお勧めします (該当する場合は、「 Delta Lake のデータ スキップ 」を参照してください)。 さらに、イベント時間列に沿ってテーブルをパーティション分割すると、処理をさらに高速化できます。 Spark UI で、特定のマイクロ バッチに対してスキャンされたデルタ ファイルの数を確認できます。

event_time 列を持つ user_events というテーブルがあるとします。ストリーミングクエリーは集計クエリーです。初期スナップショット処理中にデータドロップが起きないようにするには、以下を使用します。

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

すべてのストリーミングクエリーに適用されるクラスター上の Spark 構成を使用してこの機能を有効にすることもできます。 spark.databricks.delta.withEventTimeOrder.enabled true

シンクとしての Delta テーブル

構造化ストリーミングを使用して Delta テーブルにデータを書き込むこともできます。テーブルに対して他のストリームやバッチクエリーが同時に実行されている場合でも、トランザクションログにより、Delta Lake で 1 回のみの処理が保証されます。

Delta Lake VACUUM 関数は、Delta Lake によって管理されていないすべてのファイルを削除しますが、_で始まるディレクトリはスキップします。<table-name>/_checkpointsなどのディレクトリ構造を使用すると、Delta テーブルの他のデータやメタデータと一緒にチェックポイントを安全に保存できます。

メトリクス

numBytesOutstandingnumFilesOutstanding とのメトリクスとして、ストリーミングクエリー処理でまだ処理されていないバイト数とファイル数を知ることができます。その他のメトリクスには以下が含まれます。

  • numNewListedFiles: このバッチのバックログを計算するためにリスト化された Delta Lake ファイルの数。

    • backlogEndOffset: バックログの計算に使用されるテーブルバージョン。

ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの [生データ] タブに次のメトリクスが表示されます。

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

追加モード

デフォルトでは、ストリームは追加モードで実行され、新しいレコードがテーブルに追加されます。

次の例のように、テーブルにストリーミングする場合はtoTableメソッドを使用します。

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

コンプリートモード

構造化ストリーミングを使用して、テーブル全体をバッチごとに置き換えることもできます。ユースケースの一例として、集計を用いたサマリーの計算があります。

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)
spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

前述の例では、顧客ごとのイベントの総数を含むテーブルが継続的に更新されます。

レイテンシ要件がより緩やかなアプリケーションの場合は、1 回限りのトリガーでコンピューティングリソースを節約できます。これらを使用することで、特定のスケジュールでサマリー集計テーブルを更新し、最後の更新時以降に到着した新しいデータのみを処理することができます。

foreachBatch を使用したストリーミングクエリーからの Upsert

mergeforeachBatch を組み合わせて使用することで、ストリーミングクエリーから Delta テーブルに複雑な Upsert を書き込むことができます。「forEachBatch を使用して任意のデータシンクに書き込む」をご参照ください。

このパターンには、以下のように様々な適用方法があります。

  • 更新モードでストリーミング集計を書き込む:この方法は、コンプリートモードよりもはるかに効率的です。

  • データベース変更のストリームを Delta テーブルに書き込む: 変更 データを書き込むためのマージ クエリーforeachBatch で使用して、変更のストリームを Delta テーブルに継続的に適用できます。

  • 重複排除を使用してデータのストリームを Delta テーブルに書き込む: 重複除去 の挿入専用マージ クエリーforeachBatch で使用して、自動重複除去を使用して Delta テーブルにデータ (重複あり) を継続的に書き込むことができます。

  • ストリーミングクエリーを再起動すると、データの同じバッチに操作が複数回適用される可能性があるため、 foreachBatch 内の merge ステートメントがべき等であることを確認してください。

  • mergeforeachBatch で使用されている場合、ストリーミングクエリーの入力データレート (StreamingQueryProgress を通じてレポートされ、ノートブックのレートグラフに表示されます) は、ソースでデータが生成される実際のレートの倍数としてレポートされる場合があります。これは、merge が入力データを複数回読み込むため、入力メトリクスが乗算されることが理由です。これがボトルネックになる場合は、 merge の前にバッチの DataFrame をキャッシュし、 mergeの後にキャッシュを解除することができます。

次の例は、 foreachBatch 内で SQL を使用してこのタスクを実行する方法を示しています。

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

次の例のように、Delta Lake API を使用してストリーミングの Upsert を実行することも選択できます。

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

foreachBatch におけるべき等のテーブル書き込み

Databricks では、更新するシンクごとに個別のストリーミング書き込みを構成することをお勧めします。 foreachBatch を使用して複数のテーブルに書き込むと、書き込みがシリアル化されるため、並列化が短縮され、全体的な待機時間が長くなります。

Delta テーブルは、べき等内の複数のテーブルに書き込むための次のDataFrameWriterオプションforeachBatchサポートしています。

  • txnAppId: 書き込み DataFrame ごとに渡すことができる一意の文字列。 たとえば、ストリーミングクエリ ID を txnAppIdとして使用できます。

  • txnVersion: トランザクションのバージョンとして機能する単調増加の数値。

Delta Lake では、 txnAppIdtxnVersion の組み合わせを使用して、重複する書き込みを識別し、無視します。

バッチ書き込みがエラーで中断された場合、バッチを再実行すると、同じアプリケーションとバッチ ID が使用され、ランタイムが重複する書き込みを正しく識別して無視するのに役立ちます。 アプリケーション ID (txnAppId) には、ユーザーが生成した任意の一意の文字列を指定でき、ストリーム ID に関連付ける必要はありません。 「 foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。

警告

ストリーミング チェックポイントを削除し、新しいチェックポイントでクエリーを再起動する場合は、別の txnAppIdを指定する必要があります。 新しいチェックポイントは、バッチ ID 0で始まります。 Delta Lake では、バッチ ID と txnAppId を一意のキーとして使用し、既に確認されている値を持つバッチをスキップします。

次のコード例は、このパターンを示しています。

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}