Delta Live Tables パイプラインの監視

この記事では、 Delta Live Tables パイプラインの組み込み モニタリング機能と可観測性機能の使用について説明します。 これらの機能は、次のようなタスクをサポートします。

クエリのパフォーマンスを検査および診断するには、「 Delta Live Tables パイプラインのクエリ履歴へのアクセス」を参照してください。 この機能はパブリック プレビュー段階です。

パイプラインイベントのEメール 通知を追加する

1 つ以上の Eメール アドレスを設定して、次の場合に通知を受け取ることができます。

  • パイプラインの更新が正常に完了しました。

  • パイプラインの更新が失敗し、再試行可能または再試行不可能なエラーが発生します。 このオプションを選択すると、すべてのパイプラインの失敗に関する通知を受け取ります。

  • パイプラインの更新が再試行不可 (致命的) エラーで失敗します。 このオプションを選択すると、再試行できないエラーが発生した場合にのみ通知を受け取ります。

  • 1 つのデータ フローが失敗します。

パイプライン を作成 または編集するときに Eメール 通知を設定するには:

  1. [ 通知の追加] をクリックします。

  2. 通知を受け取る 1 つ以上の Eメール アドレスを入力します。

  3. 各通知タイプのチェックボックスをクリックして、設定したEメールアドレスに送信します。

  4. [ 通知の追加] をクリックします。

UI ではどのようなパイプラインの詳細を使用できますか?

パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 デフォルトでは、パイプラインの詳細ページにはテーブルの最新の更新が表示されますが、ドロップダウンメニューから古い更新を選択することもできます。

詳細には、パイプライン ID、ソース コード、コンピュート コスト、製品エディション、およびパイプライン用に構成されたチャンネルが含まれます。

データセットを表形式で表示するには、[ リスト ] タブをクリックします。 リストビューでは、パイプライン内のすべてのデータセットをテーブルの行として表示でき、パイプラインの DAG が大きすぎてグラフビューで視覚化できない場合に便利です。テーブルに表示されるデータセットは、データセット名、タイプ、ステータスなどの複数のフィルターを使用して制御できます。 DAG ビジュアリゼーションに戻るには、[ グラフ] をクリックします。

[別のユーザーとして実行] ユーザーはパイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。run as ユーザーを変更するには、[アクセス許可] をクリックし、パイプラインの所有者を変更します。

データセットの詳細を表示するにはどうすればよいですか?

パイプライン グラフまたはデータセット リストでデータセットをクリックすると、データセットの詳細が表示されます。 詳細には、データセット スキーマ、データ品質メトリクス、データセットを定義するソースコードへのリンクが含まれます。

更新履歴の表示

パイプラインの更新の履歴とステータスを表示するには、上部のバーにある更新履歴ドロップダウンメニューをクリックします。

ドロップダウンメニューで更新を選択すると、更新のグラフ、詳細、イベントが表示されます。 最新の更新プログラムに戻すには、[ 最新の更新プログラムを表示] をクリックします。

ストリーミングメトリクスの表示

プレビュー

Delta Live Tables のストリーミング監視は 、パブリック プレビュー段階です。

パイプライン内の各ストリーミングフローについて、Spark 構造化ストリーミングでサポートされているデータソースApacheKafka ( 、AmazonKinesis 、Auto Loader テーブル、Delta テーブルなど) からストリーミング メトリクスを表示できます。Delta Live Tablesメトリクスは、 Delta Live Tables UI の右ペインにグラフとして表示され、バックログ秒数、バックログバイト数、バックログレコード、バックログファイルが含まれます。 グラフには分単位で集計された最大値が表示され、グラフにカーソルを合わせるとツールチップに最大値が表示されます。 データは、現在の時刻から過去 48 時間に制限されます。

Delta Live Tables チャート アイコンストリーミングメトリクスが利用可能なパイプライン内のテーブルでは、UI Graph ビューでパイプライン DAG を表示すると、 アイコンが表示されます。ストリーミング メトリクスを表示するには、 Delta Live Tables チャート アイコン をクリックして、右ペインの [フロー ] タブにストリーミング メトリクス チャートを表示します。 また、ストリーミング メトリクスのあるテーブルのみを表示するフィルターを適用するには、[ List ] をクリックし、[ Has ストリーミング メトリクス] をクリックします。

各ストリーミング ソースは、特定のメトリクスのみをサポートします。 ストリーミングソースでサポートされていないメトリクスは、UIで表示できません。 次の表は、サポートされているストリーミング ソースで使用できるメトリクスを示しています。

ソース

バックログバイト

バックログ レコード

バックログ秒数

バックログ ファイル

Kafka

Kinesis

Delta

Auto Loader

Google Pub/Sub

Delta Live Tables イベントログとは何ですか?

Delta Live Tables イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データリネージなど、パイプラインに関連するすべての情報が含まれています。 イベント ログを使用して、データパイプラインの状態を追跡、理解、および監視できます。

イベント ログのエントリは、Delta Live Tables ユーザー インターフェイス、Delta Live Tables API で表示するか、イベント ログを直接クエリすることで表示できます。 このセクションでは、イベント ログを直接クエリする方法に焦点を当てます。

また、イベントがログに記録されたときに実行するカスタムアクション (アラートの送信など) を イベントフックとともに定義することもできます。

イベントログスキーマ

次の表では、イベント ログのスキーマについて説明します。 これらのフィールドの一部には、 details フィールドなど、クエリーを実行するために解析が必要な JSON データが含まれています。 Databricks では、JSON フィールドを解析するための : 演算子がサポートされています。 : (コロン記号) 演算子を参照してください。

説明

id

イベント ログ レコードの一意の識別子。

sequence

イベントを識別して順序付けるためのメタデータを含む JSON ドキュメント。

origin

イベントのオリジンのメタデータ (クラウドプロバイダー、クラウドプロバイダーリージョン、user_idpipeline_id、パイプラインが作成された場所 (DBSQLまたはWORKSPACE) を示すpipeline_typeなど) を含む JSON ドキュメント。

timestamp

イベントが記録された時刻。

message

イベントを説明する人間が判読できるメッセージ。

level

イベントの種類 (たとえば、 INFOWARNERRORMETRICSなど)。

error

エラーが発生した場合は、エラーを説明する詳細。

details

イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用されるプライマリ フィールドです。

event_type

イベントの種類。

maturity_level

イベント スキーマの安定性。 可能な値は次のとおりです。

  • STABLE: スキーマは安定しており、変更されません。

  • NULL: スキーマは安定しており、変更されません。 maturity_level 項目が追加される前にレコードが作成された場合、値は NULL になる可能性があります (リリース 2022.37)。

  • EVOLVING: スキーマは安定しておらず、変更される可能性があります。

  • DEPRECATED: スキーマは非推奨であり、Delta Live Tables ランタイムはいつでもこのイベントの生成を停止できます。

イベントログのクエリー

イベント ログの場所とイベント ログをクエリーするインターフェイスは、パイプラインが Hive metastore と Unity Catalogのどちらを使用するように構成されているかによって異なります。

Hive metastore

パイプラインが テーブルを Hive metastoreに発行する場合、イベント ログは storage の場所の下の /system/events に格納されます。たとえば、パイプライン storage 設定を /Users/username/dataとして構成した場合、イベント ログは DBFS の /Users/username/data/system/events パスに格納されます。

storage 設定を構成していない場合、既定のイベント ログの場所は DBFS で /pipelines/<pipeline-id>/system/events されます。たとえば、パイプラインの ID が 91de5e48-35ed-11ec-8d3d-0242ac130003の場合、ストレージの場所は /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/eventsです。

ビュー を作成すると、イベント ログのクエリを簡略化できます。 次の例では、 event_log_rawという一時ビューを作成します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

<event-log-path> をイベント ログの場所に置き換えます。

パイプライン実行の各インスタンスは、 更新と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリーを実行して、最新の更新プログラムの識別子を検索し、 latest_update_id 一時ビューに保存します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

イベント ログは、Databricks ノートブックまたは SQL エディターでクエリーできます。 ノートブックまたは SQL エディターを使用して、イベント ログの例クエリーを実行します。

Unity Catalog

パイプラインが Unity Catalogにテーブルを発行する場合は、 event_log テーブル値関数 (TVF) を使用してパイプラインのイベント ログをフェッチする必要があります。パイプラインのイベント ログを取得するには、パイプライン ID またはテーブル名を TVF に渡します。 たとえば、ID 04c78631-3dd7-4856-b2a6-7d84e9b2638bのパイプラインのイベント ログ レコードを取得するには、次のようにします。

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

テーブルを作成または所有しているパイプラインのイベントログレコードを取得するには my_catalog.my_schema.table1:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

TVF を呼び出すには、共有クラスターまたは SQLウェアハウスを使用する必要があります。 たとえば、共有クラスターにアタッチされたノートブックを使用したり、SQLウェアハウスに接続された SQL エディター を使用したりできます。

パイプラインのイベントのクエリを簡略化するために、パイプラインの所有者は event_log TVF のビューを作成できます。 次の例では、パイプラインのイベント ログに対するビューを作成します。 このビューは、この記事に含まれているイベント ログのクエリーの例で使用されます。

event_log TVF はパイプライン所有者のみが呼び出すことができ、 event_log TVF 上に作成されたビューはパイプライン所有者のみがクエリーできます。ビューを他のユーザーと共有することはできません。

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

<pipeline-ID> を Delta Live Tables パイプラインの一意の識別子に置き換えます。 ID は、 Delta Live Tables UI の [パイプラインの詳細 ] パネルで確認できます。

パイプライン実行の各インスタンスは、 更新と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリーを実行して、最新の更新プログラムの識別子を検索し、 latest_update_id 一時ビューに保存します。 このビューは、この記事に含まれるイベント ログのクエリーの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

イベントログのリネージ情報のクエリ

リネージに関する情報を含むイベントのイベント・タイプは flow_definitionです。 details:flow_definition オブジェクトには、グラフ内の各リレーションシップを定義する output_datasetinput_datasets が含まれています。

次のクエリーを使用して入力データセットと出力データセットを抽出し、系列情報を表示できます。

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id

output_dataset

input_datasets

customers

null

sales_orders_raw

null

sales_orders_cleaned

["customers", "sales_orders_raw"]

sales_order_in_la

["sales_orders_cleaned"]

イベントログからのデータ品質のクエリー

パイプライン内のデータセットに対するエクスペクテーションを定義すると、データ品質メトリクスは details:flow_progress.data_quality.expectations オブジェクトに保存されます。 データ品質に関する情報を含むイベントのイベント・タイプは flow_progressです。 次の例では、最後のパイプライン更新のデータ品質メトリクスをクエリーします。

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name

dataset

expectation

passing_records

failing_records

sales_orders_cleaned

valid_order_number

4083

0

イベントログを照会してデータバックログを監視する

Delta Live Tables は、 details:flow_progress.metrics.backlog_bytes オブジェクトのバックログに存在するデータの量を追跡します。 バックログメトリクスを含むイベントのイベントタイプは flow_progressです。 次の例は、最後のパイプライン更新のクエリー バックログ メトリクスです。

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

バックログ メトリクスは、パイプラインの Databricks Runtime の種類とバージョンによっては使用できない場合があります。

サーバレスが有効になっていないパイプラインのイベントログから拡張オートスケールイベントを監視する

サーバレス コンピュートを使用しない DLT パイプラインの場合、パイプラインで拡張オートスケールが有効になっていると、イベント ログにクラスターのサイズ変更が記録されます。 enhanced オートスケールに関する情報を含むイベントのイベントタイプは autoscaleです。 クラスターのサイズ変更要求情報は、 details:autoscale オブジェクトに格納されます。 次の例では、拡張オートスケール クラスターのサイズ変更要求に対して、最後のパイプライン更新を照会します。

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

コンピュートのリソース使用率の監視

cluster_resources イベントは、クラスター内のタスクスロットの数、それらのタスクスロットの使用率、およびスケジュールを待機しているタスクの数に関するメトリクスを提供します。

拡張オートスケールが有効になっている場合、 cluster_resources イベントには、 latest_requested_num_executorsoptimal_num_executorsなどのオートスケール アルゴリズムのメトリクスも含まれます。 また、イベントでは、アルゴリズムのステータスが CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATIONなどのさまざまな状態として示されます。 この情報は、オートスケールイベントと併せて表示することで、強化されたオートスケールの全体像を把握することができます。

次の例では、最後のパイプライン更新のタスクキューサイズ履歴をクエリーします。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、最後のパイプライン更新の使用履歴をクエリーします。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、エグゼキューターのカウント履歴をクエリし、Enhanced オートスケール パイプラインでのみ使用可能なメトリクス (最新のリクエストでアルゴリズムによって要求されたエグゼキューターの数、最新のメトリクスに基づいてアルゴリズムによって推奨されるエグゼキューターの最適数、オートスケール アルゴリズムの状態など) を照会します。

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Delta Live Tables パイプラインの監査

Delta Live Tables イベント ログ レコードおよびその他の Databricks 監査ログを使用して、Delta Live テーブルでデータがどのように更新されているかを完全に把握できます。

Delta Live Tables は、パイプライン所有者の資格情報を使用して更新を実行します。 パイプライン所有者を更新することで、使用する認証情報を変更できます。 Delta Live Tables は、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインでのアクションについてユーザーを記録します。

Unity Catalog 監査イベントのリファレンスについては、「Unity Catalog イベント」を参照してください。

イベントログ内のユーザーアクションのクエリ

イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。 ユーザーアクションに関する情報を含むイベントのイベントタイプは user_actionです。

アクションに関する情報は、 details フィールドの user_action オブジェクトに保存されます。次のクエリーを使用して、ユーザー イベントの監査ログを作成します。 このクエリーで使用する event_log_raw ビューを作成するには、「 イベント ログのクエリ」を参照してください。

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'

timestamp

action

user_name

2021-05-20T19:36:03.517+0000

START

user@company.com

2021-05-20T19:35:59.913+0000

CREATE

user@company.com

2021-05-27T00:35:51.971+0000

START

user@company.com

ランタイム情報

パイプライン更新のランタイム情報 (更新の Databricks Runtime バージョンなど) を表示できます。

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'

dbr_version

11.0