Delta Live Tablesの APPLY CHANGES API を使用したチェンジデータキャプチャの簡素化

Delta Live Tables は、 APPLY CHANGES API を使用してチェンジデータキャプチャ (CDC) を簡素化します。 以前は、 MERGE INTO ステートメントは Databricks での CDC レコードの処理によく使用されていました。 ただし、 MERGE INTO では、レコードの順序が間違っているために誤った結果が生成されたり、レコードを並べ替えるために複雑なロジックが必要になったりする可能性があります。

Delta Live Tables の APPLY CHANGES API は、シーケンスが正しくないレコードを自動的に処理することで、CDC レコードの正しい処理を保証し、シーケンスのないレコードを処理するための複雑なロジックを開発する必要がなくなります。

APPLY CHANGES API は、Delta Live Tables の SQL インターフェイスと Python インターフェイスでサポートされており、SCD タイプ 1 とタイプ 2 のテーブルの更新もサポートされています。

  • レコードを直接更新するには、SCDタイプ1を使用します。更新されたレコードの履歴は保持されません。

  • SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。

構文およびその他のリファレンスについては、以下を参照してください:

この記事では、ソースデータの変更に基づいてDelta Live Tablesパイプラインのテーブルを更新する方法について説明します。Deltaテーブルの行レベルの変更情報を記録およびクエリーする方法については、「DatabricksでDelta Lake変更データフィードを使用する」を参照してください。

CDCはDelta Live Tablesでどのように実装されていますか?

ソースデータでレコードの順序付け(シーケンス)のための列を指定する必要があります。Delta Live Tables では、これはソースデータの適切な順序を表現するものであり、単調に増加する値として解釈されます。Delta Live Tables は、順序どおりに到着しなかったデータを自動的に処理します。SCD タイプ 2 の変更の場合、Delta Live Tables は適切なシーケンス値をターゲットテーブルの __START_AT 列と __END_AT 列に伝播させます。各シーケンス値のキーごとに 1 回ずつ個別に更新する必要があります。シーケンス値 NULL はサポートされていません。

Delta Live Tables で CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に APPLY CHANGES INTO ステートメントを使用して変更フィードのソース、キー、およびシーケンスを指定します。 ターゲットストリーミングテーブルを作成するには、SQL の CREATE OR REFRESH STREAMING TABLE ステートメントまたは Python の create_streaming_table() 関数を使用します。 CDC 処理を定義するステートメントを作成するには、SQL の APPLY CHANGES ステートメントまたは Python の apply_changes() 関数を使用します。 構文の詳細については、 Delta Live Tablesでの SQL を使用したチェンジデータ キャプチャまたは Delta Live Tablesでの Python を使用した チェンジデータキャプチャ を参照してください。

Delta Live TablesのCDC処理に使用されるデータオブジェクトは何ですか?

Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。

  • ターゲットテーブルに割り当てられた名前を使用するビュー。

  • Delta Live Tables が CDC 処理を管理するために使用する内部バッキングテーブル。このテーブルには、ターゲットテーブル名の前に __apply_changes_storage_ を追加した名前が付けられます。

たとえば、dlt_cdc_targetという名前のターゲットテーブルを宣言すると、メタストアにdlt_cdc_targetという名前のビューと__apply_changes_storage_dlt_cdc_targetという名前のテーブルが表示されます。ビューを作成すると、Delta Live Tablesで、順不同になっているデータの処理に必要な追加情報(トゥームストーンやバージョンなど)をフィルターで除外できるようになります。処理されたデータを表示するには、ターゲットビューに対してクエリを実行します。__apply_changes_storage_テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、運用環境での使用ではこのテーブルをクエリしないでください。テーブルにデータを手動で追加する場合、バージョン列が欠落しているため、レコードは他の変更の前にあるものとみなされます。

パイプラインが Unity Catalogに発行する場合、ユーザーは内部バッキング テーブルにアクセスできません。

Delta Live Tables CDC クエリーによって処理されたレコードに関するデータを取得する

以下のメトリクスは、 apply changes クエリーによってキャプチャされます。

  • num_upserted_rows: 更新中にデータセットにアップサートされた出力行の数。

  • num_deleted_rows: 更新中にデータセットから削除された既存の出力行の数。

CDC以外のフローに対して出力される num_output_rows メトリクスは、 apply changes クエリーではキャプチャされません。

制限

APPLY CHANGES INTO クエリーまたは apply_changes 関数のターゲットをストリーミング テーブルのソースとして使用することはできません。APPLY CHANGES INTOクエリーまたは apply_changes 関数のターゲットから読み込むテーブルは、実体化ビュー (Materialized View) である必要があります。

DatabricksのSCDタイプ1とSCDタイプ2

次のセクションでは、次のようなソースイベントに基づいてターゲットテーブルを更新する、Delta Live Tables SCDタイプ1およびタイプ2クエリーを示す例を示します:

  1. 新しいユーザーレコードを作成してください。

  2. ユーザーレコードを削除します。

  3. ユーザーレコードを更新します。SCD タイプ 1 の例は、順序どおりではないイベントの処理を示しており、最後の UPDATE 操作が遅れて到着し、ターゲットテーブルから削除されています。

次の例は、Delta Live Tables パイプラインの構成と更新に精通していることを前提としています。「チュートリアル:最初の Delta Live Tables パイプラインを実行する」を参照してください。

これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。

以下はこれらの例の入力記録です:

ユーザーID

名前

オペレーション

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:

ユーザーID

名前

オペレーション

sequenceNum

null

null

null

TRUNCATE

3

次のすべての例には、DELETETRUNCATE操作の両方を指定するオプションが含まれていますが、これらはそれぞれオプションです。

SCDタイプ1更新の処理

次のコード例は、SCDタイプ1更新の処理を示しています:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:

ユーザーID

名前

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

追加のTRUNCATEレコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3でのTRUNCATE操作によりレコード124126が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:

ユーザーID

名前

125

Mercedes

Guadalajara

SCDタイプ2更新の処理

次のコード例は、SCDタイプ2のアップデートの処理を示しています:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:

ユーザーID

名前

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

SCD タイプ 2 クエリーでは、ターゲット表のヒストリーを追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例は、追跡から city 列を除外する方法を示しています。

次の例は、SCDタイプ2でトラックヒストリーを使用する例です:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

追加の TRUNCATE レコードを指定せずにこの例を実行すると、ターゲット表には以下のレコードが含まれます。

ユーザーID

名前

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

テストデータの作成

以下のコードは、このチュートリアルにあるサンプルクエリーで使用するサンプルデータセットを生成するために提供されています。新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報を持っていると仮定すると、ノートブックまたはDatabricks SQLを使用してこれらのステートメントを実行できます。次のコードは、Delta Live Tablesパイプラインの一部として実行することを目的としたものではありません

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

ターゲットストリーミングテーブルのデータの追加、変更、または削除

パイプラインでテーブルが Unity Catalog に発行される場合は、データ 操作言語 (DML) ステートメント (挿入、更新、削除、マージ ステートメントなど) を使用して、 APPLY CHANGES INTO ステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。

  • ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。

  • ストリーミングテーブルを更新する DML ステートメントは、Databricks Runtime 13.1 以降を使用している共有 Unity Catalog クラスターまたは SQL ウェアハウスでのみ実行できます。

  • ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を含むソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグ を設定します。 skipChangeCommits を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、マテリアライズド ビュー (追加のみの制限がない) をターゲット テーブルとして使用できます。

Delta Live Tables は指定された SEQUENCE BY 列を使用し、ターゲット テーブルの __START_AT 列と __END_AT 列 (SCD タイプ 2 の場合) に適切なシーケンス値を伝達するため、レコードの適切な順序を維持するために、DML ステートメントでこれらの列に有効な値を使用するようにする必要があります。 「 Delta Live Tables での CDC の実装方法」を参照してください。

ストリーミング テーブルでの DML ステートメントの使用の詳細については、「 ストリーミング テーブルのデータを追加、変更、または削除する」を参照してください。

次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);