DatabricksでDelta Lakeのチェンジデータフィードを使用する

チェンジデータフィードを使用すると、DatabricksはDeltaテーブルのバージョン間の行レベルの変更を追跡できます。Deltaテーブルで有効にすると、ランタイムはテーブルに書き込まれたすべてのデータの変更イベントを記録します。これには、行データと、指定した行が挿入、削除、または更新されたかどうかを示すメタデータが含まれます。

重要

チェンジデータフィードは、テーブル履歴と連携して動作し、変更情報を提供します。Deltaテーブルのクローンを作成すると別の履歴が作成されるため、クローンテーブルの変更データフィードは元のテーブルの変更データフィードと一致しません。

チェンジデータの増分処理

Databricks 、 Deltaテーブルからの変更を段階的に処理するために、チェンジデータフィード を構造化ストリーミングと組み合わせて使用することを推奨しています。 テーブルのチェンジデータフィードのバージョンを自動的に追跡するには、 Databricksの構造化ストリーミングを使用する必要があります。

Delta Live Tables 、変更データを簡単に伝播し、結果をSCD (緩やかに変化するディメンション) タイプ 1 またはタイプ 2 テーブルとして保存する機能を提供します。 APPLY CHANGES APIs参照してください: Delta Live Tablesを使用してチェンジデータキャプチャを簡素化します

テーブルからチェンジデータフィードを読み取るには、そのテーブルでチェンジデータフィードを有効にする必要があります。 「チェンジデータフィードを有効にする」を参照してください。

次の構文例に示すように、テーブルに対して ストリームを構成してチェンジデータフィードを読み取る場合は、オプションreadChangeFeedtrueに設定します。

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)
spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

デフォルトにより、ストリームは、ストリームが最初に開始されたときのテーブルの最新の情報をINSERTとして返し、将来の変更を変更データとして返します。

変更データは Delta Lake トランザクションの一部としてコミットされ、新しいデータがテーブルにコミットされると同時に使用可能になります。

必要に応じて、開始バージョンを指定できます。 「開始バージョンを指定する必要がありますか?」を参照してください。

チェンジデータフィードは、開始バージョンの指定を必要とするバッチ実行もサポートしています。 「バッチ クエリでの変更の読み取り」を参照してください。

レート制限 (maxFilesPerTriggermaxBytesPerTrigger) や excludeRegex などのオプションも、変更データの読み取り時にサポートされます。

レート制限は、開始スナップショットバージョン以外のバージョンではアトミックにすることができます。つまり、コミットバージョン全体がレート制限されるか、コミット全体が返されます。

開始バージョンを指定する必要がありますか?

特定のバージョンより前に行われた変更を無視する場合は、必要に応じて開始バージョンを指定できます。 タイムスタンプまたは Delta トランザクション ログに記録されたバージョン ID 番号を使用してバージョンを指定できます。

バッチ読み取りには開始バージョンが必要ですが、多くのバッチ パターンではオプションの終了バージョンを設定することでメリットが得られます。

チェンジデータフィードを含む構造化ストリーミング ワークロードを構成するときは、開始バージョンの指定が処理にどのような影響を与えるかを理解することが重要です。

多くのストリーミング ワークロード、特に新しいデータ処理パイプラインは、デフォルトの動作の恩恵を受けます。 デフォルト動作では、ストリームがテーブル内のすべての既存レコードをチェンジデータフィード内のINSERT操作として最初に記録するときに、最初のバッチが処理されます。

ターゲット テーブルに、特定の時点までの適切な変更が行われたすべてのレコードが既に含まれている場合は、開始バージョンを指定して、ソース テーブルの状態がINSERTイベントとして処理されるのを回避します。

次の例の構文は、チェックポイントが破損したストリーミング障害から回復します。 この例では、次の条件を前提としています。

  1. テーブル作成時に、ソース テーブルでチェンジデータフィードが有効になりました。

  2. ターゲット・ダウンストリーム表は、バージョン 75 までのすべての変更を処理しました。

  3. ソース テーブルのバージョン履歴は、バージョン 70 以降で利用できます。

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)
spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

この例では、新しいチェックポイントの場所も指定する必要があります。

重要

開始バージョンを指定すると、開始バージョンがテーブル履歴に存在しなくなると、ストリームは新しいチェックポイントから開始できなくなります。 Delta Lake は履歴バージョンを自動的にクリーンアップします。つまり、指定された開始バージョンはすべて最終的に削除されます。

「チェンジデータフィードを使用してテーブルの全履歴を再生できますか?」を参照してください。 。

バッチクエリーで変更を読み取る

バッチ クエリ構文を使用すると、特定のバージョンからのすべての変更を読み取ったり、指定されたバージョン範囲内の変更を読み取ったりすることができます。

バージョンは整数として指定し、タイムスタンプはyyyy-MM-dd[ HH:mm:ss[.SSS]]の形式の文字列として指定します。

開始バージョンと終了バージョンは、クエリに含まれています。 特定の開始バージョンから最新バージョンのテーブルへの変更を読み込むには、開始バージョンのみを指定します。

変更イベントが記録されているバージョンよりも古いバージョンまたはタイムスタンプを指定した場合(つまり、チェンジデータフィードが有効になっていた場合)、チェンジデータフィードが有効になっていないことを示すエラーがスローされます。

次の構文例は、バッチ読み取りで開始バージョン オプションと終了バージョン オプションを使用する方法を示しています。

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")
// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

デフォルトでは、ユーザーがテーブルの最後のコミットを超えるバージョンまたはタイムスタンプを渡すと、エラーtimestampGreaterThanLatestCommitがスローされます。Databricks Runtime 11.3 LTS以降では、ユーザーが次の構成をtrueに設定した場合、チェンジデータフィードで範囲外バージョンのケースを処理できます:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

テーブルの最後のコミットよりも新しい開始バージョンを指定した場合、またはテーブルの最後のコミットよりも新しい開始タイムスタンプを指定した場合、前述の構成が有効になっていると、空の読み取り結果が返されます。

テーブルの最終コミットより大きい終了バージョン、またはテーブルの最終コミットより新しい終了タイムスタンプを指定した場合、バッチ読み取りモードで前述の設定を有効にすると、開始バージョンから最終コミットまでのすべての変更が返されます。

チェンジデータフィードのスキーマとは?

テーブルのチェンジデータフィードから読み取ると、最新のテーブルバージョンのスキーマが使用されます。

ほとんどのスキーマ変更および展開操作は、完全にサポートされています。列マッピングが有効になっているテーブルは、すべてのユースケースをサポートしているわけではなく、異なる動作を示しています。「列マッピングが有効になっているテーブルのチェンジデータフィードの制限」を参照してください。

Deltaテーブルのスキーマのデータ列に加えて、チェンジデータフィードには、変更イベントの種類を識別するメタデータ列が含まれています:

列名

タイプ

_change_type

文字列

insert, update_preimage , update_postimage, delete (1)

_commit_version

ロング

変更を含むDeltaログまたはテーブルのバージョン。

_commit_timestamp

タイムスタンプ

コミットが作成されたときに関連付けられたタイムスタンプ。

(1)preimageは更新前の値、postimageは更新後の値。

追加された列と同じ名前の列がスキーマに含まれている場合、テーブルでチェンジデータフィードを有効にすることはできません。チェンジデータフィードを有効にする前に、テーブル内の列の名前を変更してこの競合を解決します。

チェンジデータフィードを有効にする

有効なテーブルのチェンジデータフィードのみを読み取ることができます。 次のいずれかの方法で、チェンジデータフィード オプションを明示的に有効にする必要があります。

  • 新しいテーブルCREATE TABLEコマンドでテーブルプロパティdelta.enableChangeDataFeed = trueを設定します。

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 既存のテーブルALTER TABLEコマンドでテーブルプロパティdelta.enableChangeDataFeed = trueを設定します。

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • すべての新しいテーブル

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

重要

チェンジデータフィードを有効にした後に行われた変更のみが記録されます。 テーブルに対する過去の変更はキャプチャされません。

データストレージの変更

チェンジデータフィードを有効にすると、テーブルのストレージ コストがわずかに増加します。 変更データ レコードはクエリの実行時に生成され、通常は書き換えられたファイルの合計サイズよりもはるかに小さくなります。

Databricks は、テーブル ディレクトリの下の_change_dataフォルダーに、 UPDATEDELETE 、およびMERGE操作の変更データを記録します。 挿入のみの操作やパーティション全体の削除などの一部の操作では、 Databricksトランザクション データ フィードをトランザクション ログから直接効率的に取得できるため、_change_data ディレクトリにデータは生成されません。

_change_data フォルダー内のデータ ファイルに対するすべての読み取りは、サポートされているDelta Lake APIsを経由する必要があります。

_change_dataフォルダー内のファイルは、テーブルの保持ポリシーに従います。 VACUUMコマンドを実行すると、チェンジデータフィードのデータが削除されます。

チェンジデータフィードを使用して、テーブルの履歴全体を再生できますか?

チェンジデータフィードは、テーブルへのすべての変更を永続的に記録することを目的としているわけではありません。 チェンジデータフィードは、有効にした後に発生した変更のみを記録します。

チェンジデータフィード とDelta Lake使用すると、常にソース テーブルの完全な読み取りを再構築できます。つまり、チェンジデータフィード が有効になっているテーブルに対して新しいストリーミング読み取りを開始し、そのテーブルの現在のバージョンとその後発生したすべての変更をキャプチャできます。

チェンジデータフィード内のレコードは一時的なものとして扱い、指定された保持期間中のみアクセスできるようにする必要があります。 Deltaトランザクション ログは、テーブル バージョンとそれに対応するチェンジデータ フィード バージョンを定期的に削除します。 トランザクション ログからバージョンが削除されると、そのバージョンのトランザクションデータフィードを読み取ることができなくなります。

ユースケースでテーブルへのすべての変更の永続的な履歴を維持する必要がある場合は、増分ロジックを使用して、チェンジデータフィードから新しいテーブルにレコードを書き込む必要があります。 次のコード例は、構造化ストリーミングの増分処理を活用しながら、利用可能なデータをバッチ ワークロードとして処理するtrigger.AvailableNow使用方法を示しています。 このワークロードをメイン処理パイプラインと非同期にスケジュールして、監査目的または完全な再生可能性のために、チェンジデータフィード のバックアップを作成できます。

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)
spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

列マッピングが有効になっているテーブルのデータフィード制限を変更する

Deltaテーブルで列マッピングを有効にすると、既存のデータのデータファイルを書き換えることなく、テーブル内の列を削除または名前変更できます。列マッピングを有効にすると、列の名前変更や削除、データ型の変更、null値の許容の変更など、付加的でないスキーマの変更を実行した後、変更データフィードに制限されます。

重要

  • バッチセマンティクスを使用して、非加法スキーマ変更が発生したトランザクション、または範囲のチェンジデータフィードを読み取ることはできません。

  • Databricks Runtime 12.2 LTS以前では、非加法的なスキーマ変更が行われた列マッピングが有効になっているテーブルは、 チェンジデータフィード でのストリーミング読み取りをサポートしません。 列マッピングとスキーマ変更によるストリーミングを参照してください。

  • Databricks Runtime 11.3 LTS以前では、列マッピングが有効になっていて、列の名前変更または削除が行われたテーブルの チェンジデータフィード を読み取ることはできません。

Databricks Runtime 12.2 LTS以降では、非加法的なスキーマ変更が行われた列マッピングが有効になっているテーブルに対して、 チェンジデータフィード でバッチ読み取りを実行できます。 読み取り操作では、最新バージョンのテーブルのスキーマを使用する代わりに、クエリで指定されたテーブルのエンドバージョンのスキーマを使用します。 指定されたバージョン範囲が非付加スキーマの変更にまたがっている場合、クエリは失敗します。