Databricksでの構造化ストリーミングクエリーの監視

Databricks には、 [ストリーミング] タブの Spark UI を使用して、構造化ストリーミング アプリケーション用の組み込みの監視が用意されています。

構造化ストリーミング クエリを Spark UIで区別する

writeStream コードに .queryName(<query-name>) を追加してストリームに一意のクエリー名を付け、Spark UI でどのメトリクスがどのストリームに属しているかを簡単に区別できるようにします。

構造化ストリーミングメトリクスを外部サービスにプッシュする

ストリーミング メトリックはApache Sparkのストリーミング Query Listener インターフェイスを使用して、アラートやダッシュボードのユースケースのために外部サービスにプッシュできます。 Databricks Runtime 11.3 LTS 以降では、ストリーミング クエリ リスナーは Python と Scala で利用できます。

リスナーに関連する処理待機時間は、クエリー処理に悪影響を与える可能性があります。 Databricks では、これらのリスナーの処理ロジックを最小限に抑え、Kafka などの待機時間の短いシンクに書き込むことをお勧めします。

次のコードは、リスナーを実装するための構文の基本的な例を示しています。

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

構造化ストリーミングでの監視可能なメトリクスの定義

Observable メトリクスは、クエリー (DataFrame) で定義できる任意の集計関数という名前です。 DataFrame の実行が完了ポイントに達すると (つまり、バッチ クエリーが終了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリクスを含む名前付きイベントが生成されます。

これらのメトリクスは、リスナーを Spark セッションにアタッチすることで確認できます。 リスナーは実行モードによって異なります。

  • バッチ モード: QueryExecutionListenerを使用します。

    QueryExecutionListener は、クエリーの完了時に呼び出されます。 QueryExecution.observedMetrics マップを使用してメトリクスにアクセスします。

  • ストリーミング、またはマイクロバッチ: StreamingQueryListenerを使用します。

    StreamingQueryListener は、ストリーミング クエリがエポックを完了したときに呼び出されます。 StreamingQueryProgress.observedMetrics マップを使用してメトリクスにアクセスします。Databricks は、連続実行ストリーミングをサポートしていません。

例:

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener object メトリクス

メトリクス

説明

id

再起動後も保持される一意のクエリー ID。 StreamingQuery.id() を参照してください。

runId

起動または再起動ごとに一意のクエリー ID。 StreamingQuery.runId() を参照してください。

name

クエリーのユーザー指定の名前。 指定されていない場合は Null。

timestamp

マイクロバッチの実行のタイムスタンプ。

batchId

処理中のデータの現在のバッチの一意の ID。 失敗後に再試行する場合、特定のバッチ ID を複数回実行できることに注意してください。 同様に、処理するデータがない場合、バッチ ID はインクリメントされません。

numInputRows

トリガーで処理されたレコードの数を (すべてのソースにわたって) 集計します。

inputRowsPerSecond

到着データの (すべてのソースにわたる) 集計率。

processedRowsPerSecond

Spark がデータを処理している (すべてのソースにわたる) 集計レート。

期間Ms オブジェクト

マイクロバッチ実行プロセスのさまざまな段階を完了するのにかかる時間に関する情報。

メトリクス

説明

durationMs.addBatch

マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画にかかる時間は含まれません。

durationMs.getBatch

ソースからオフセットに関するメタデータを取得するのにかかる時間。

durationMs.latestOffset

マイクロバッチで消費された最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を参照します。

durationMs.queryPlanning

実行プランの生成にかかった時間。

durationMs.triggerExecution

マイクロバッチの計画と実行にかかった時間。

durationMs.walCommit

新しい使用可能なオフセットをコミットするのにかかった時間。

イベント時間オブジェクト

マイクロバッチで処理されているデータ内で見られるイベント時間値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されたステートフル集計を処理するために状態をトリミングする方法を把握するために、ウォーターマークによって使用されます。

メトリクス

説明

eventTime.avg

トリガーに表示される平均イベント時間。

eventTime.max

トリガーに表示される最大イベント時間。

eventTime.min

トリガーに表示される最小イベント時間。

eventTime.watermark

トリガーで使用されるウォーターマークの値。

状態演算子オブジェクト

構造化ストリーミング ジョブで定義されているステートフル操作と、それらから生成される集計に関する情報。

メトリクス

説明

stateOperators.operatorName

メトリクスが関連するステートフル演算子の名前。 たとえば、 symmetricHashJoindedupestateStoreSaveなどです。

stateOperators.numRowsTotal

ステートフルな演算子または集計の結果としての状態の行数。

stateOperators.numRowsUpdated

ステートフルな演算子または集計の結果として状態で更新された行の数。

stateOperators.numRowsRemoved

ステートフルな演算子または集計の結果として状態から削除された行の数。

stateOperators.commitTimeMs

すべての更新 (書き込みと削除) をコミットし、新しいバージョンを返すのにかかった時間。

stateOperators.memoryUsedBytes

状態ストアによって使用されるメモリ。

stateOperators.numRowsDroppedByWatermark

ステートフル集計に含めるには遅すぎると見なされる行の数。 ストリーミング集計のみ: 集計後に削除された行数であり、生の入力行は削除されません。 この数値は正確ではありませんが、遅延データが削除されていることを示している可能性があります。

stateOperators.numShufflePartitions

このステートフル演算子のシャッフルパーティションの数。

stateOperators.numStateStoreInstances

オペレーターが初期化および保守した実際の状態ストア インスタンス。 多くのステートフル オペレーターでは、これはパーティションの数と同じですが、ストリームとストリームの結合によって、パーティションごとに 4 つの状態ストア インスタンスが初期化されます。

stateOperators.customMetrics オブジェクト

RocksDB から収集された情報で、構造化ストリーミング ジョブに対して維持するステートフルな値に関するパフォーマンスと操作に関するメトリクスをキャプチャします。 詳細については、「 Databricks での RocksDB 状態ストアの構成」を参照してください。

メトリクス

説明

customMetrics.rocksdbBytesCopied

RocksDBファイルマネージャによって追跡されるようにコピーされたバイト数。

customMetrics.rocksdbCommitCheckpointLatency

ネイティブ RocksDB のスナップショットを取得し、ローカル ディレクトリに書き込む時間 (ミリ秒単位)。

customMetrics.rocksdbCompactLatency

チェックポイントのコミット中の圧縮 (オプション) の時間 (ミリ秒単位)。

customMetrics.rocksdbCommitFileSyncLatencyMs

ネイティブの RocksDB スナップショット関連ファイルを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。

customMetrics.rocksdbCommitFlushLatency

RocksDBのメモリ内の変更をローカルディスクにフラッシュするミリ秒単位の時間。

customMetrics.rocksdbCommitPauseLatency

チェックポイント コミットの一部としてバックグラウンド ワーカー スレッド (圧縮など) を停止する時間 (ミリ秒単位)。

customMetrics.rocksdbCommitWriteBatchLatency

メモリ内構造 (WriteBatch) のステージングされた書き込みをネイティブ RocksDB に適用する時間 (ミリ秒単位)。

customMetrics.rocksdbFilesCopied

RocksDBファイルマネージャによって追跡されるようにコピーされたファイルの数。

customMetrics.rocksdbFilesReused

RocksDBファイルマネージャによって追跡されるように再利用されたファイルの数。

customMetrics.rocksdbGetCount

DB へのget呼び出しの数 (これには、WriteBatchからのgetsは含まれません。 書き込みのステージングに使用されるメモリ内バッチ)。

customMetrics.rocksdbGetLatency

基になるネイティブ RocksDB::Get 呼び出しの平均時間 (ナノ秒単位)。

customMetrics.rocksdbReadBlockCacheHitCount

RocksDBのブロックキャッシュのどれだけが有用であるかどうか、およびローカルディスクの読み取りを回避します。

customMetrics.rocksdbReadBlockCacheMissCount

RocksDBのブロックキャッシュのどれだけが有用であるかどうか、およびローカルディスクの読み取りを回避します。

customMetrics.rocksdbSstFileSize

すべての SST ファイルのサイズ。 SSTは静的ソートテーブルの略で、RocksDBがデータを格納するために使用する表形式構造です。

customMetrics.rocksdbTotalBytesRead

get操作によって読み取られた非圧縮バイト数。

customMetrics.rocksdbTotalBytesReadByCompaction

圧縮プロセスがディスクから読み取るバイト数。

customMetrics.rocksdbTotalBytesReadThroughIterator

一部のステートフル操作 ( FlatMapGroupsWithState でのタイムアウト処理やウォーターマークなど) では、反復子を介して DB 内のデータを読み取る必要があります。 このメトリクスは、反復子を使用して読み取られた非圧縮データのサイズを表します。

customMetrics.rocksdbTotalBytesWritten

put操作によって書き込まれた非圧縮バイト数。

customMetrics.rocksdbTotalBytesWrittenByCompaction

圧縮プロセスがディスクに書き込むバイト数。

customMetrics.rocksdbTotalCompactionLatencyMs

RocksDB 圧縮の時間ミリ秒 (バックグラウンド圧縮と、コミット中に開始されたオプションの圧縮を含む)。

customMetrics.rocksdbTotalFlushLatencyMs

バックグラウンドフラッシュを含むフラッシュ時間。 フラッシュ操作は、MemTable がいっぱいになったときにストレージにフラッシュされるプロセスです。 MemTablesは、データがRocksDBに保存される最初のレベルです。

customMetrics.rocksdbZipFileBytesUncompressed

RocksDBファイルマネージャは、物理SSTファイルのディスクスペースの使用率と削除を管理します。 このメトリクスは、ファイルマネージャーによって報告された非圧縮 zip ファイルをバイト単位で表します。

ソースオブジェクト (Kafka)

メトリクス

説明

sources.description

ストリーミングクエリーが読み取っているソースの名前。 たとえば、 “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.

sources.startOffset オブジェクト

ストリーミングジョブが開始された Kafka トピック内の開始オフセット番号。

sources.endOffset オブジェクト

マイクロバッチによって処理された最新のオフセット。 これは、進行中のマイクロバッチ実行の latestOffset に等しい可能性があります。

sources.latestOffset オブジェクト

マイクロバッチによって計算された最新のオフセット。 調整がある場合、マイクロバッチ処理プロセスですべてのオフセットが処理されず、 endOffsetlatestOffset が異なる可能性があります。

sources.numInputRows

このソースから処理された入力行の数。

sources.inputRowsPerSecond

このソースの処理のためにデータが到着する速度。

sources.processedRowsPerSecond

Spark がこのソースのデータを処理するレート。

ソース.メトリクス object (Kafka)

メトリクス

説明

sources.metrics.avgOffsetsBehindLatest

ストリーミングクエリーが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの平均数。

sources.metrics.estimatedTotalBytesBehindLatest

クエリープロセスがサブスクライブされたトピックから消費していない推定バイト数。

sources.metrics.maxOffsetsBehindLatest

ストリーミングクエリーが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最大数。

sources.metrics.minOffsetsBehindLatest

ストリーミングクエリーが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最小数。

シンクオブジェクト (Kafka)

メトリクス

説明

sink.description

ストリーミング クエリーが書き込んでいるシンクの名前。 たとえば、 “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.

sink.numOutputRows

マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行数。 状況によっては、この値は "-1" になり、通常は "不明" と解釈できます。

ソースオブジェクト (Delta Lake)

メトリクス

説明

sources.description

ストリーミングクエリーが読み取っているソースの名前。 たとえば、 “DeltaSource[table]”.

sources.[startOffset/endOffset].sourceVersion

このオフセットがエンコードされるシリアル化のバージョン。

sources.[startOffset/endOffset].reservoirId

読み取り元のテーブルの ID。 これは、クエリーの再起動時に設定ミスを検出するために使用されます。

sources.[startOffset/endOffset].reservoirVersion

現在処理しているテーブルのバージョン。

sources.[startOffset/endOffset].index

このバージョンの AddFiles シーケンスのインデックス。 これは、大きなコミットを複数のバッチに分割するために使用されます。 このインデックスは、 modificationTimestamppathでソートすることによって作成されます。

sources.[startOffset/endOffset].isStartingVersion

このオフセットが、変更の処理ではなく開始中のクエリーを示すかどうか。 新しいクエリーを開始すると、開始時にテーブルに存在するすべてのデータが処理され、次に到着した新しいデータが処理されます。

sources.latestOffset

マイクロバッチクエリーによって処理された最新のオフセット。

sources.numInputRows

このソースから処理された入力行の数。

sources.inputRowsPerSecond

このソースの処理のためにデータが到着する速度。

sources.processedRowsPerSecond

Spark がこのソースのデータを処理するレート。

sources.metrics.numBytesOutstanding

未処理のファイル(RocksDBによって追跡されるファイル)の合計サイズ。 これは、ストリーミングソースとしての Delta と Auto Loader のバックログメトリクスです。

sources.metrics.numFilesOutstanding

処理する未処理ファイルの数。 これは、ストリーミングソースとしての Delta と Auto Loader のバックログメトリクスです。

シンクオブジェクト (Delta Lake)

メトリクス

説明

sink.description

ストリーミング クエリーが書き込むシンクの名前。 たとえば、 “DeltaSink[table]”.

sink.numOutputRows

このメトリクスの行数は、Spark が Delta Lake シンクの分類である DSv1 シンクの出力行を推論できないため、"-1" です。

例:

Kafka 間ストリーミングクエリリスナーイベントの

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

例 Delta レイクからDelta レイクへのストリーミングクエリリスナーイベント

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Delta Lakeストリーミングクエリリスナーイベント レートソースの

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}