APPLY CHANGES API : Delta Live Tablesでのチェンジデータキャプチャを簡素化
Delta Live Tables は、 APPLY CHANGES
API を使用してチェンジデータキャプチャ (CDC) を簡素化します。 以前は、 MERGE INTO
ステートメントは Databricks での CDC レコードの処理によく使用されていました。 ただし、 MERGE INTO
では、レコードの順序が間違っているために誤った結果が生成されたり、レコードを並べ替えるために複雑なロジックが必要になったりする可能性があります。
Delta Live Tables の APPLY CHANGES
API は、シーケンスが正しくないレコードを自動的に処理することで、CDC レコードの正しい処理を保証し、シーケンスのないレコードを処理するための複雑なロジックを開発する必要がなくなります。
APPLY CHANGES
API は、Delta Live Tables の SQL インターフェイスと Python インターフェイスでサポートされており、SCD タイプ 1 とタイプ 2 のテーブルの更新もサポートされています。
レコードを直接更新するには、SCDタイプ1を使用します。更新されたレコードの履歴は保持されません。
SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。
構文およびその他のリファレンスについては、以下を参照してください:
注
この記事では、ソースデータの変更に基づいてDelta Live Tablesパイプラインのテーブルを更新する方法について説明します。Deltaテーブルの行レベルの変更情報を記録およびクエリーする方法については、「DatabricksでDelta Lake変更データフィードを使用する」を参照してください。
CDCはDelta Live Tablesでどのように実装されていますか?
ソースデータでレコードの順序付け(シーケンス)のための列を指定する必要があります。Delta Live Tables では、これはソースデータの適切な順序を表現するものであり、単調に増加する値として解釈されます。Delta Live Tables は、順序どおりに到着しなかったデータを自動的に処理します。SCD タイプ 2 の変更の場合、Delta Live Tables は適切なシーケンス値をターゲットテーブルの __START_AT
列と __END_AT
列に伝播させます。各シーケンス値のキーごとに 1 回ずつ個別に更新する必要があります。シーケンス値 NULL はサポートされていません。
Delta Live Tables で CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に APPLY CHANGES INTO
ステートメントを使用して変更フィードのソース、キー、およびシーケンスを指定します。 ターゲットストリーミングテーブルを作成するには、SQL の CREATE OR REFRESH STREAMING TABLE
ステートメントまたは Python の create_streaming_table()
関数を使用します。 CDC 処理を定義するステートメントを作成するには、SQL の APPLY CHANGES
ステートメントまたは Python の apply_changes()
関数を使用します。 構文の詳細については、 Delta Live Tablesでの SQL を使用したチェンジデータ キャプチャまたは Delta Live Tablesでの Python を使用した チェンジデータキャプチャ を参照してください。
Delta Live TablesのCDC処理に使用されるデータオブジェクトは何ですか?
Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。
ターゲットテーブルに割り当てられた名前を使用するビュー。
Delta Live Tables が CDC 処理を管理するために使用する内部バッキングテーブル。このテーブルには、ターゲットテーブル名の前に
__apply_changes_storage_
を追加した名前が付けられます。
たとえば、dlt_cdc_target
という名前のターゲットテーブルを宣言すると、メタストアにdlt_cdc_target
という名前のビューと__apply_changes_storage_dlt_cdc_target
という名前のテーブルが表示されます。ビューを作成すると、Delta Live Tablesで、順不同になっているデータの処理に必要な追加情報(トゥームストーンやバージョンなど)をフィルターで除外できるようになります。処理されたデータを表示するには、ターゲットビューに対してクエリを実行します。__apply_changes_storage_
テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、運用環境での使用ではこのテーブルをクエリしないでください。テーブルにデータを手動で追加する場合、バージョン列が欠落しているため、レコードは他の変更の前にあるものとみなされます。
パイプラインが Unity Catalogに発行する場合、ユーザーは内部バッキング テーブルにアクセスできません。
Delta Live Tables CDC クエリーによって処理されたレコードに関するデータを取得する
以下のメトリクスは、 apply changes
クエリーによってキャプチャされます。
num_upserted_rows
: 更新中にデータセットにアップサートされた出力行の数。num_deleted_rows
: 更新中にデータセットから削除された既存の出力行の数。
CDC以外のフローに対して出力される num_output_rows
メトリクスは、 apply changes
クエリーではキャプチャされません。
制限
APPLY CHANGES INTO
クエリーまたは apply_changes
関数のターゲットをストリーミング テーブルのソースとして使用することはできません。APPLY CHANGES INTO
クエリーまたは apply_changes
関数のターゲットから読み込むテーブルは、実体化ビュー (Materialized View) である必要があります。
DatabricksのSCDタイプ1とSCDタイプ2
次のセクションでは、次のようなソースイベントに基づいてターゲットテーブルを更新する、Delta Live Tables SCDタイプ1およびタイプ2クエリーを示す例を示します:
新しいユーザーレコードを作成してください。
ユーザーレコードを削除します。
ユーザーレコードを更新します。SCD タイプ 1 の例は、順序どおりではないイベントの処理を示しており、最後の
UPDATE
操作が遅れて到着し、ターゲットテーブルから削除されています。
次の例は、Delta Live Tables パイプラインの構成と更新に精通していることを前提としています。「チュートリアル:最初の Delta Live Tables パイプラインを実行する」を参照してください。
これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。
以下はこれらの例の入力記録です:
ユーザーID |
名前 |
市 |
オペレーション |
sequenceNum |
---|---|---|---|---|
124 |
Raul |
Oaxaca |
INSERT |
1 |
123 |
Isabel |
Monterrey |
INSERT |
1 |
125 |
Mercedes |
Tijuana |
INSERT |
2 |
126 |
Lily |
Cancun |
INSERT |
2 |
123 |
null |
null |
DELETE |
6 |
125 |
Mercedes |
Guadalajara |
UPDATE |
6 |
125 |
Mercedes |
Mexicali |
UPDATE |
5 |
123 |
Isabel |
Chihuahua |
UPDATE |
5 |
例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:
ユーザーID |
名前 |
市 |
オペレーション |
sequenceNum |
---|---|---|---|---|
null |
null |
null |
TRUNCATE |
3 |
注
次のすべての例には、DELETE
とTRUNCATE
操作の両方を指定するオプションが含まれていますが、これらはそれぞれオプションです。
SCDタイプ1更新の処理
次のコード例は、SCDタイプ1更新の処理を示しています:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:
ユーザーID |
名前 |
市 |
---|---|---|
124 |
Raul |
Oaxaca |
125 |
Mercedes |
Guadalajara |
126 |
Lily |
Cancun |
追加のTRUNCATE
レコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3
でのTRUNCATE
操作によりレコード124
と126
が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:
ユーザーID |
名前 |
市 |
---|---|---|
125 |
Mercedes |
Guadalajara |
SCDタイプ2更新の処理
次のコード例は、SCDタイプ2のアップデートの処理を示しています:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:
ユーザーID |
名前 |
市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
Isabel |
Monterrey |
1 |
5 |
123 |
Isabel |
Chihuahua |
5 |
6 |
124 |
Raul |
Oaxaca |
1 |
null |
125 |
Mercedes |
Tijuana |
2 |
5 |
125 |
Mercedes |
Mexicali |
5 |
6 |
125 |
Mercedes |
Guadalajara |
6 |
null |
126 |
Lily |
Cancun |
2 |
null |
SCD タイプ 2 クエリーでは、ターゲット表のヒストリーを追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例は、追跡から city
列を除外する方法を示しています。
次の例は、SCDタイプ2でトラックヒストリーを使用する例です:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
追加の TRUNCATE
レコードを指定せずにこの例を実行すると、ターゲット表には以下のレコードが含まれます。
ユーザーID |
名前 |
市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
Isabel |
Chihuahua |
1 |
6 |
124 |
Raul |
Oaxaca |
1 |
null |
125 |
Mercedes |
Guadalajara |
2 |
null |
126 |
Lily |
Cancun |
2 |
null |
テストデータの作成
以下のコードは、このチュートリアルにあるサンプルクエリーで使用するサンプルデータセットを生成するために提供されています。新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報を持っていると仮定すると、ノートブックまたはDatabricks SQLを使用してこれらのステートメントを実行できます。次のコードは、Delta Live Tablesパイプラインの一部として実行することを目的としたものではありません:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
ターゲットストリーミングテーブルのデータの追加、変更、または削除
パイプラインでテーブルが Unity Catalog に発行される場合は、データ 操作言語 (DML) ステートメント (挿入、更新、削除、マージ ステートメントなど) を使用して、 APPLY CHANGES INTO
ステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。
注
ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
ストリーミング テーブルを更新する DML ステートメントは、Unity Catalog Databricks Runtime13.3LTS 以降を使用している共有 クラスターまたは SQL Server でのみ実行できます。
ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を含むソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグ を設定します。
skipChangeCommits
を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、マテリアライズド ビュー (追加のみの制限がない) をターゲット テーブルとして使用できます。
Delta Live Tables は指定された SEQUENCE BY
列を使用し、ターゲット テーブルの __START_AT
列と __END_AT
列 (SCD タイプ 2 の場合) に適切なシーケンス値を伝達するため、レコードの適切な順序を維持するために、DML ステートメントでこれらの列に有効な値を使用するようにする必要があります。 「 Delta Live Tables での CDC の実装方法」を参照してください。
ストリーミング テーブルでの DML ステートメントの使用の詳細については、「 ストリーミング テーブルのデータを追加、変更、または削除する」を参照してください。
次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);