DatabricksでDelta Lakeの変更データフィードを使用する

変更データフィードを使用すると、DatabricksはDeltaテーブルのバージョン間の行レベルの変更を追跡できます。Deltaテーブルで有効にすると、ランタイムはテーブルに書き込まれたすべてのデータの変更イベントを記録します。これには、行データと、指定した行が挿入、削除、または更新されたかどうかを示すメタデータが含まれます。

バッチクエリーで変更イベントを読み取るには、Spark SQL、Apache Spark DataFrames、および構造化ストリーミングを使用します。

重要

変更データフィードは、テーブル履歴と連携して動作し、変更情報を提供します。Deltaテーブルのクローンを作成すると別の履歴が作成されるため、クローンテーブルの変更データフィードは元のテーブルの変更データフィードと一致しません。

ユースケース

変更データフィードは、既定では有効になっていません。次のユースケースは、変更データフィードを有効にすると駆動されます。

  • シルバーテーブルとゴールドテーブル:最初のMERGEUPDATEDELETEオペレーションに続く行レベルの変更のみを処理することで、Delta Lakeのパフォーマンスを向上させ、ETLとELTオペレーションを高速化、簡素化します。

  • 具体化されたビュー:基になるテーブル全体を再処理することなく、BIと分析で使用する情報の最新の集計ビューを作成し、代わりに変更が行われた場所のみを更新します。

  • 変更の送信:変更データフィードをKafkaやRDBMSなどのダウンストリームシステムに送信し、データパイプラインの後の段階で増分処理に使用できます。

  • 監査証跡テーブル:変更データフィードをDeltaテーブルとしてキャプチャすると、永続的なストレージと効率的なクエリー機能が提供され、削除が発生したタイミングや行われた更新など、時間の経過に伴うすべての変更を確認できます。

変更データフィードを有効にする

次のいずれかの方法を使用して、変更データフィードオプションを明示的に有効にする必要があります:

  • 新しいテーブルCREATE TABLEコマンドでテーブルプロパティdelta.enableChangeDataFeed = trueを設定します。

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 既存のテーブルALTER TABLEコマンドでテーブルプロパティdelta.enableChangeDataFeed = trueを設定します。

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • すべての新しいテーブル

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

重要

変更データフィードを有効にした後に行われた変更のみが記録されます。テーブルに対する過去の変更はキャプチャされません。

データストレージの変更

Databricksは、UPDATEDELETE、およびMERGEオペレーションの変更データをテーブルディレクトリの下の_change_dataフォルダーに記録します。挿入のみのオペレーションやパーティション全体の削除などの一部のオペレーションでは、Databricksはトランザクションログから直接変更データフィードを効率的に計算できるため、_change_dataディレクトリにデータが生成されません。

_change_dataフォルダー内のファイルは、テーブルの保持ポリシーに従います。したがって、VACUUMコマンドを実行すると、変更データフィードデータも削除されます。

バッチクエリーで変更を読み取る

開始と終了にバージョンまたはタイムスタンプを指定できます。開始バージョンと終了バージョン、およびタイムスタンプは、クエリーに含まれます。特定の開始バージョンから最新バージョンのテーブルへの変更を読み取るには、開始バージョンまたはタイムスタンプのみを指定します。

バージョンは整数として指定し、タイムスタンプはyyyy-MM-dd[ HH:mm:ss[.SSS]]の形式の文字列として指定します。

変更イベントが記録されているバージョンよりも古いバージョンまたはタイムスタンプを指定した場合(つまり、変更データフィードが有効になっていた場合)、変更データフィードが有効になっていないことを示すエラーがスローされます。

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")
// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

ストリーミングクエリーの変更を読み取る

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")
// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

テーブルの読み取り中に変更データを取得するには、オプションreadChangeFeedtrueに設定します。startingVersionまたはstartingTimestampは省略可能であり、指定しない場合、ストリームはストリーミング時のテーブルの最新のスナップショットをINSERTとして返し、将来の変更を変更データとして返します。レート制限(maxFilesPerTriggermaxBytesPerTrigger)やexcludeRegexなどのオプションも、変更データを読み取るときにサポートされます。

レート制限は、開始スナップショットバージョン以外のバージョンではアトミックにすることができます。つまり、コミットバージョン全体がレート制限されるか、コミット全体が返されます。

デフォルトでは、ユーザーがテーブルの最後のコミットを超えるバージョンまたはタイムスタンプを渡すと、エラーtimestampGreaterThanLatestCommitがスローされます。Databricks Runtime 11.3 LTS以降では、ユーザーが次の構成をtrueに設定した場合、変更データフィードで範囲外バージョンのケースを処理できます:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

テーブルの最後のコミットよりも新しい開始バージョンを指定した場合、またはテーブルの最後のコミットよりも新しい開始タイムスタンプを指定した場合、前述の構成が有効になっていると、空の読み取り結果が返されます。

テーブルの最終コミットより大きい終了バージョン、またはテーブルの最終コミットより新しい終了タイムスタンプを指定した場合、バッチ読み取りモードで前述の設定を有効にすると、開始バージョンから最終コミットまでのすべての変更が返されます。

変更データフィードのスキーマとは?

テーブルの変更データフィードから読み取ると、最新のテーブルバージョンのスキーマが使用されます。

ほとんどのスキーマ変更および展開操作は、完全にサポートされています。列マッピングが有効になっているテーブルは、すべてのユースケースをサポートしているわけではなく、異なる動作を示しています。「列マッピングが有効になっているテーブルのデータフィード制限の変更」を参照してください。

Deltaテーブルのスキーマのデータ列に加えて、変更データフィードには、変更イベントの種類を識別するメタデータ列が含まれています:

列名

タイプ

_change_type

文字列

insert, update_preimage , update_postimage, delete (1)

_commit_version

ロング

変更を含むDeltaログまたはテーブルのバージョン。

_commit_timestamp

タイムスタンプ

コミットが作成されたときに関連付けられたタイムスタンプ。

(1)preimageは更新前の値、postimageは更新後の値。

追加された列と同じ名前の列がスキーマに含まれている場合、テーブルで変更データフィードを有効にすることはできません。変更データフィードを有効にする前に、テーブル内の列の名前を変更してこの競合を解決します。

列マッピングが有効になっているテーブルのデータフィード制限を変更する

Deltaテーブルで列マッピングを有効にすると、既存のデータのデータファイルを書き換えることなく、テーブル内の列を削除または名前変更できます。列マッピングを有効にすると、列の名前変更や削除、データ型の変更、null値の許容の変更など、付加的でないスキーマの変更を実行した後、変更データフィードに制限されます。

重要

  • バッチセマンティクスを使用して、非加法スキーマ変更が発生したトランザクション、または範囲の変更データフィードを読み取ることはできません。

  • Databricks Runtime 12.2 LTS以前では、非加法的なスキーマ変更が行われた列マッピングが有効になっているテーブルは、 チェンジデータフィード でのストリーミング読み取りをサポートしません。 列マッピングとスキーマ変更によるストリーミングを参照してください。

  • Databricks Runtime 11.3 LTS以前では、列マッピングが有効になっていて、列の名前変更または削除が行われたテーブルの チェンジデータフィード を読み取ることはできません。

Databricks Runtime 12.2 LTS以降では、非加法的なスキーマ変更が行われた列マッピングが有効になっているテーブルに対して、 チェンジデータフィード でバッチ読み取りを実行できます。 読み取り操作では、最新バージョンのテーブルのスキーマを使用する代わりに、クエリで指定されたテーブルのエンドバージョンのスキーマを使用します。 指定されたバージョン範囲が非付加スキーマの変更にまたがっている場合、クエリは失敗します。

よくある質問(FAQ)

変更データフィードを有効にする場合のオーバーヘッドはどれくらいですか?

大きな影響はありません。変更データレコードは、クエリーの実行プロセス中にインラインで生成され、通常、書き換えられたファイルの合計サイズよりもはるかに小さくなります。

変更記録の保持ポリシーとは?

変更レコードは、古いテーブルバージョンと同じ保持ポリシーに従い、指定された保持期間外の場合はVACUUMによってクリーンアップされます。

新しいレコードはいつ変更データフィードで利用可能になりますか?

変更データは、Delta Lakeトランザクションと共にコミットされ、新しいデータがテーブルで使用可能になると同時に使用可能になります。

ノートブックの例:Delta変更データフィードを使用した変更の伝播

このノートブックでは、ワクチン接種の絶対数のシルバーテーブルに加えられた変更を、ワクチン接種率のゴールドテーブルに反映する方法を示します。

データフィードノートブックを変更する

ノートブックを新しいタブで開く