APPLY CHANGES API : Delta Live Tablesでチェンジデータのキャプチャを簡素化
Delta Live TablesはAPPLY CHANGES
やAPPLY CHANGES FROM SNAPSHOT
APIを用いて、チェンジデータキャプチャ (CDC )をシンプルにします。使用するインターフェースは、変更データのソースによって異なります。
APPLY CHANGES
を使用して、チェンジデータフィード (CDF) からの変更を処理します。データベース スナップショットの変更を処理するには、
APPLY CHANGES FROM SNAPSHOT
(パブリック プレビュー) を使用します。
以前は、 MERGE INTO
ステートメントは Databricks 上の CDC レコードの処理によく使用されていました。 ただし、 MERGE INTO
では、レコードの順序が正しくないために正しくない結果が生成されたり、レコードを並べ替えるために複雑なロジックが必要になったりする可能性があります。
APPLY CHANGES
API は、Delta Live Tables SQL および Python インターフェースでサポートされています。 APPLY CHANGES FROM SNAPSHOT
API は Delta Live Tables Python インターフェースでサポートされています。
APPLY CHANGES
とAPPLY CHANGES FROM SNAPSHOT
はどちらも、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新をサポートしています。
レコードを直接更新するには、SCD タイプ 1 を使用します。 更新されたレコードの履歴は保持されません。
SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。
構文およびその他のリファレンスについては、以下を参照してください:
注
この記事では、ソースデータの変更に基づいてDelta Live Tablesパイプラインのテーブルを更新する方法について説明します。Deltaテーブルの行レベルの変更情報を記録およびクエリーする方法については、「DatabricksでDelta Lake変更データフィードを使用する」を参照してください。
要件
CDC APIsを使用するには、Delta Live Tables Pro
エディションまたは Advanced
エディションを使用するようにパイプラインを構成する必要があります。
APPLY CHANGES
API で CDC はどのように実装されますか?
Delta Live Tables のAPPLY CHANGES
API は、順序が正しくないレコードを自動的に処理することで、CDC レコードが正しく処理されることを保証し、順序が正しくないレコードを処理するための複雑なロジックを開発する必要がなくなります。 レコードを順序付けるソース データ内の列を指定する必要があります。Delta Live Tables は、これをソース データの適切な順序の単調増加表現として解釈します。 Delta Live Tables は、順序どおりに到着しないデータを自動的に処理します。 SCD タイプ 2 の変更の場合、Delta Live Tables は適切なシーケンス値をターゲット テーブルの__START_AT
と__END_AT
列に伝播します。 各シーケンス値でキーごとに 1 つの異なる更新を行う必要があり、NULL シーケンス値はサポートされていません。
APPLY CHANGES
を使用して CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に SQL のAPPLY CHANGES INTO
ステートメントまたは Python のapply_changes()
関数を使用して、変更フィードのソース、キー、およびシーケンスを指定します。 ターゲット ストリーミング テーブルを作成するには、SQL のCREATE OR REFRESH STREAMING TABLE
ステートメントまたは Python のcreate_streaming_table()
関数を使用します。 SCD タイプ 1 およびタイプ 2 の処理例を参照してください。
構文の詳細については、Delta Live Tables SQL リファレンスまたはPython リファレンスを参照してください。
APPLY CHANGES FROM SNAPSHOT
API では CDC はどのように実装されますか?
プレビュー
APPLY CHANGES FROM SNAPSHOT
API はパブリック プレビュー段階です。
APPLY CHANGES FROM SNAPSHOT
一連の順序どおりのスナップショットを比較することでソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。 APPLY CHANGES FROM SNAPSHOT
は Delta Live Tables Python インターフェースでのみサポートされます。
APPLY CHANGES FROM SNAPSHOT
複数のソース タイプからのスナップショットの取り込みをサポートします。
定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットをインジェストします。
APPLY CHANGES FROM SNAPSHOT
には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするためのシンプルで合理化されたインターフェイスがあります。 パイプラインが更新されるたびに新しいスナップショットが取り込まれ、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインを連続モードで実行すると、APPLY CHANGES FROM スナップショット処理を含むフローの トリガー間隔 設定によって決定された期間に、各パイプライン更新で複数のスナップショットが取り込まれます。履歴 ID 取り込みを使用して、Oracle またはMySQLデータベースやデータウェアハウスから生成された ID などのデータベース ID を含むファイルを処理します。
APPLY CHANGES FROM SNAPSHOT
を使用して任意のソース タイプから CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に Python のapply_changes_from_snapshot()
関数を使用して、処理の実装に必要なスナップショット、キー、およびその他の引数を指定します。 定期的なスナップショットの取り込みと履歴スナップショットの取り込みの例を参照してください。
API に渡されるスナップショットは、バージョンの昇順になっている必要があります。 Delta Live Tables が順序が正しくないスナップショットを検出すると、エラーがスローされます。
構文の詳細については、Delta Live Tables Python リファレンスを参照してください。
例: CDF ソース データを使用した SCD タイプ 1 および SCD タイプ 2 の処理
次のセクションでは、次のような チェンジデータ フィードからのソース イベントに基づいてターゲット テーブルを更新するDelta Live Tables SCDタイプ 1 およびタイプ 2 クエリの例を示します。
新しいユーザー・レコードを作成します。
ユーザー・レコードを削除します。
ユーザーレコードを更新します。 SCD タイプ 1 の例では、最後の
UPDATE
操作が遅れて到着し、ターゲット テーブルから削除され、順序どおりでないイベントの処理が示されます。
次の例は、Delta Live Tables パイプラインの構成と更新に精通していることを前提としています。「チュートリアル:最初の Delta Live Tables パイプラインを実行する」を参照してください。
これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。
以下はこれらの例の入力記録です:
ユーザーID |
名前 |
市 |
オペレーション |
sequenceNum |
---|---|---|---|---|
124 |
Raul |
Oaxaca |
INSERT |
1 |
123 |
Isabel |
Monterrey |
INSERT |
1 |
125 |
Mercedes |
Tijuana |
INSERT |
2 |
126 |
Lily |
Cancun |
INSERT |
2 |
123 |
null |
null |
DELETE |
6 |
125 |
Mercedes |
Guadalajara |
UPDATE |
6 |
125 |
Mercedes |
Mexicali |
UPDATE |
5 |
123 |
Isabel |
Chihuahua |
UPDATE |
5 |
例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:
ユーザーID |
名前 |
市 |
オペレーション |
sequenceNum |
---|---|---|---|---|
null |
null |
null |
TRUNCATE |
3 |
注
次のすべての例には、 DELETE
操作と TRUNCATE
操作の両方を指定するオプションが含まれていますが、それぞれ省略可能です。
SCDタイプ1更新の処理
次の例は、SCD タイプ 1 の更新の処理を示しています。
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:
ユーザーID |
名前 |
市 |
---|---|---|
124 |
Raul |
Oaxaca |
125 |
Mercedes |
Guadalajara |
126 |
Lily |
Cancun |
追加のTRUNCATE
レコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3
でのTRUNCATE
操作によりレコード124
と126
が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:
ユーザーID |
名前 |
市 |
---|---|---|
125 |
Mercedes |
Guadalajara |
SCDタイプ2更新の処理
次の例は、SCD タイプ 2 の更新の処理を示しています。
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:
ユーザーID |
名前 |
市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
Isabel |
Monterrey |
1 |
5 |
123 |
Isabel |
Chihuahua |
5 |
6 |
124 |
Raul |
Oaxaca |
1 |
null |
125 |
Mercedes |
Tijuana |
2 |
5 |
125 |
Mercedes |
Mexicali |
5 |
6 |
125 |
Mercedes |
Guadalajara |
6 |
null |
126 |
Lily |
Cancun |
2 |
null |
SCD タイプ 2 クエリーでは、ターゲット表のヒストリーを追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例は、追跡から city
列を除外する方法を示しています。
次の例は、SCDタイプ2でトラックヒストリーを使用する例です:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
追加の TRUNCATE
レコードを指定せずにこの例を実行すると、ターゲット表には以下のレコードが含まれます。
ユーザーID |
名前 |
市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
Isabel |
Chihuahua |
1 |
6 |
124 |
Raul |
Oaxaca |
1 |
null |
125 |
Mercedes |
Guadalajara |
2 |
null |
126 |
Lily |
Cancun |
2 |
null |
テストデータの作成
以下のコードは、このチュートリアルにあるサンプルクエリで使用するためのサンプルデータセットを生成するために提供されています。 新しいスキーマを作成して新しいテーブルを作成するための適切な資格情報を持っていると仮定すると、ノートブックまたは Databricks SQL のいずれかを使用してこれらのステートメントを実行できます。 次のコードは、Delta Live Tables パイプラインの一部として実行されることを意図したものではありません。
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
例: 定期的なスナップショット処理
次の例は、 mycatalog.myschema.mytable
に保存されているテーブルのスナップショットを取り込む SCD タイプ 2 処理を示しています。 処理の結果は、 target
という名前のテーブルに書き込まれます。
mycatalog.myschema.mytable
タイムスタンプ 2024-01-01 00:00:00 のレコード
キー |
値 |
---|---|
1 |
A1 |
2 |
A2の |
mycatalog.myschema.mytable
タイムスタンプ 2024-01-01 12:00:00 のレコード
キー |
値 |
---|---|
2 |
B2 の |
3 |
A3の |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
スナップショットを処理すると、ターゲット テーブルに次のレコードが含まれます。
キー |
値 |
__START_AT |
__END_AT |
---|---|---|---|
1 |
A1 |
2024-01-01 00:00:00 |
2024-01-01 12:00:00 |
2 |
A2の |
2024-01-01 00:00:00 |
2024-01-01 12:00:00 |
2 |
B2 の |
2024-01-01 12:00:00 |
null |
3 |
A3の |
2024-01-01 12:00:00 |
null |
例: 履歴スナップショット処理
次の例は、クラウド ストレージ システムに保存されている 2 つのスナップショットからのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 処理を示しています。
timestamp
のスナップショット、保存場所 /<PATH>/filename1.csv
キー |
トラッキングカラム |
ノントラッキングカラム |
---|---|---|
1 |
A1 |
B1 の |
2 |
A2の |
B2 の |
4 |
A4 |
B4 の |
timestamp + 5
のスナップショット、保存場所 /<PATH>/filename2.csv
キー |
トラッキングカラム |
ノントラッキングカラム |
---|---|---|
2 |
a2_new |
B2 の |
3 |
A3の |
B3の |
4 |
A4 |
b4_new |
次のコード例は、これらのスナップショットを使用して SCD タイプ 2 の更新を処理する方法を示しています。
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
スナップショットを処理すると、ターゲット テーブルに次のレコードが含まれます。
キー |
トラッキングカラム |
ノントラッキングカラム |
__START_AT |
__END_AT |
---|---|---|---|---|
1 |
A1 |
B1 の |
1 |
2 |
2 |
A2の |
B2 の |
1 |
2 |
2 |
a2_new |
B2 の |
2 |
null |
3 |
A3の |
B3の |
2 |
null |
4 |
A4 |
b4_new |
1 |
null |
ターゲットストリーミングテーブルのデータの追加、変更、または削除
パイプラインでテーブルが Unity Catalog に発行される場合は、データ 操作言語 (DML) ステートメント (挿入、更新、削除、マージ ステートメントなど) を使用して、 APPLY CHANGES INTO
ステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。
注
ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
ストリーミング テーブルを更新する DML ステートメントは、Unity Catalog Databricks Runtime13.3LTS 以降を使用している共有 クラスターまたは SQL Server でのみ実行できます。
ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を含むソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグ を設定します。
skipChangeCommits
を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、マテリアライズド ビュー (追加のみの制限がない) をターゲット テーブルとして使用できます。
Delta Live Tables は指定されたSEQUENCE BY
列を使用し、適切な順序値をターゲット テーブルの__START_AT
と__END_AT
列に伝播するため (SCD タイプ 2 の場合)、レコードの適切な順序を維持するために、DML ステートメントがこれらの列に有効な値を使用するようにする必要があります。 「APPLY CHANGES API を使用して CDC を実装する方法」を参照してください。
ストリーミング テーブルでの DML ステートメントの使用の詳細については、「 ストリーミング テーブルのデータを追加、変更、または削除する」を参照してください。
次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
APPLY CHANGES
ターゲットテーブルからのチェンジデータフィードの読み取り
Databricks Runtime 15.2 以降では、他のDeltaテーブルからチェンジデータフィードを読み取るのと同じ方法で、APPLY CHANGES
クエリまたはAPPLY CHANGES FROM SNAPSHOT
クエリのターゲットであるストリーミングテーブルからチェンジデータフィードを読み取ることができます。ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、次のものが必要です。
ターゲット ストリーミング テーブルは、Unity Catalog に発行する必要があります。 「Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。
ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、 Databricks Runtime 15.2 以降を使用する必要があります。 チェンジデータフィードを別の Delta Live Tables パイプラインで読み取るには、 Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。
Delta Live Tables パイプラインで作成されたターゲット ストリーミング テーブルからチェンジデータフィードを読み取る方法は、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法です。Deltaチェンジデータフィード機能の使用について、Python やSQL の例など、詳しくは、 でのDelta Lake チェンジデータフィードの使用Databricks を参照してください。
注
チェンジデータフィードレコードには、変更イベントのタイプを識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには、通常、update_preimage
イベントと update_postimage
イベントに設定された_change_type
値が含まれます。
ただし、プライマリ・キー値の変更を含むターゲット・ストリーミング・テーブルの更新が行われた場合、 _change_type
値は異なります。 変更にプライマリ・キーの更新が含まれる場合、 _change_type
メタデータ・フィールドは insert
イベントと delete
イベントに設定されます。 プライマリ・キーの変更は、 UPDATE
ステートメントまたは MERGE
ステートメントを使用してキー・フィールドの 1 つが手動で更新された場合、または SCD タイプ 2 テーブルの場合、 __start_at
・フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。
APPLY CHANGES
クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なるプライマリ・キー値を判別します。
SCD タイプ 1 処理と Delta Live Tables Python インターフェースの場合、主キーは
apply_changes()
関数のkeys
パラメーターの値です。Delta Live Tables SQL インターフェイスの場合、主キーはAPPLY CHANGES INTO
ステートメントのKEYS
句で定義された列です。SCDタイプ 2 の場合、プライマリ・キーは
keys
パラメーターまたはKEYS
句にcoalesce(__START_AT, __END_AT)
操作からの戻り値を加えたものです。ここで、__START_AT
と__END_AT
はターゲット ストリーミング テーブルの対応するカラムです。
Delta Live Tables CDC クエリーによって処理されたレコードに関するデータを取得する
注
次のメトリクスは、 APPLY CHANGES
クエリによってのみキャプチャされ、 APPLY CHANGES FROM SNAPSHOT
クエリではキャプチャされません。
以下のメトリクスは、 APPLY CHANGES
クエリーによってキャプチャされます。
num_upserted_rows
: 更新中にデータセットにアップサートされた出力行の数。num_deleted_rows
: 更新中にデータセットから削除された既存の出力行の数。
CDC 以外のフローの出力である num_output_rows
メトリクスは、 apply changes
クエリではキャプチャされません。
Delta Live TablesのCDC処理に使用されるデータオブジェクトは何ですか?
注: 以下のデータ構造は、 APPLY CHANGES
処理にのみ適用され、 APPLY CHANGES FROM SNAPSHOT
処理には適用されません。
Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。
ターゲットテーブルに割り当てられた名前を使用するビュー。
Delta Live Tables が CDC 処理を管理するために使用する内部バッキングテーブル。このテーブルには、ターゲットテーブル名の前に
__apply_changes_storage_
を追加した名前が付けられます。
たとえば、dlt_cdc_target
という名前のターゲットテーブルを宣言すると、メタストアにdlt_cdc_target
という名前のビューと__apply_changes_storage_dlt_cdc_target
という名前のテーブルが表示されます。ビューを作成すると、Delta Live Tablesで、順不同になっているデータの処理に必要な追加情報(トゥームストーンやバージョンなど)をフィルターで除外できるようになります。処理されたデータを表示するには、ターゲットビューに対してクエリを実行します。__apply_changes_storage_
テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、運用環境での使用ではこのテーブルをクエリしないでください。テーブルにデータを手動で追加する場合、バージョン列が欠落しているため、レコードは他の変更の前にあるものとみなされます。
パイプラインが Unity Catalog に公開される場合、ユーザーは内部のバッキング テーブルにアクセスできません。