Databricksでの構造化ストリーミングクエリーの監視
Databricks には、 [ストリーミング] タブの Spark UI を使用して、構造化ストリーミング アプリケーション用の組み込みの監視が用意されています。
構造化ストリーミング クエリを Spark UIで区別する
writeStream
コードに .queryName(<query-name>)
を追加してストリームに一意のクエリー名を付け、Spark UI でどのメトリクスがどのストリームに属しているかを簡単に区別できるようにします。
構造化ストリーミングメトリクスを外部サービスにプッシュする
ストリーミング メトリックはApache Sparkのストリーミング Query Listener インターフェイスを使用して、アラートやダッシュボードのユースケースのために外部サービスにプッシュできます。 Databricks Runtime 11.3 LTS 以降では、ストリーミング クエリ リスナーは Python と Scala で利用できます。
注
リスナーに関連する処理待機時間は、クエリー処理に悪影響を与える可能性があります。 Databricks では、これらのリスナーの処理ロジックを最小限に抑え、Kafka などの待機時間の短いシンクに書き込むことをお勧めします。
次のコードは、リスナーを実装するための構文の基本的な例を示しています。
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
構造化ストリーミングでの監視可能なメトリクスの定義
Observable メトリクスは、クエリー (DataFrame) で定義できる任意の集計関数という名前です。 DataFrame の実行が完了ポイントに達すると (つまり、バッチ クエリーが終了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリクスを含む名前付きイベントが生成されます。
これらのメトリクスは、リスナーを Spark セッションにアタッチすることで確認できます。 リスナーは実行モードによって異なります。
バッチ モード:
QueryExecutionListener
を使用します。QueryExecutionListener
は、クエリーの完了時に呼び出されます。QueryExecution.observedMetrics
マップを使用してメトリクスにアクセスします。ストリーミング、またはマイクロバッチ:
StreamingQueryListener
を使用します。StreamingQueryListener
は、ストリーミング クエリがエポックを完了したときに呼び出されます。StreamingQueryProgress.observedMetrics
マップを使用してメトリクスにアクセスします。Databricks は、連続実行ストリーミングをサポートしていません。
例:
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
StreamingQueryListener object メトリクス
メトリクス |
説明 |
---|---|
|
再起動後も保持される一意のクエリー ID。 StreamingQuery.id() を参照してください。 |
|
起動または再起動ごとに一意のクエリー ID。 StreamingQuery.runId() を参照してください。 |
|
クエリーのユーザー指定の名前。 指定されていない場合は Null。 |
|
マイクロバッチの実行のタイムスタンプ。 |
|
処理中のデータの現在のバッチの一意の ID。 失敗後に再試行する場合、特定のバッチ ID を複数回実行できることに注意してください。 同様に、処理するデータがない場合、バッチ ID はインクリメントされません。 |
|
トリガーで処理されたレコードの数を (すべてのソースにわたって) 集計します。 |
|
到着データの (すべてのソースにわたる) 集計率。 |
|
Spark がデータを処理している (すべてのソースにわたる) 集計レート。 |
期間Ms オブジェクト
マイクロバッチ実行プロセスのさまざまな段階を完了するのにかかる時間に関する情報。
メトリクス |
説明 |
---|---|
|
マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画にかかる時間は含まれません。 |
|
ソースからオフセットに関するメタデータを取得するのにかかる時間。 |
|
マイクロバッチで消費された最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を参照します。 |
|
実行プランの生成にかかった時間。 |
|
マイクロバッチの計画と実行にかかった時間。 |
|
新しい使用可能なオフセットをコミットするのにかかった時間。 |
イベント時間オブジェクト
マイクロバッチで処理されているデータ内で見られるイベント時間値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されたステートフル集計を処理するために状態をトリミングする方法を把握するために、ウォーターマークによって使用されます。
メトリクス |
説明 |
---|---|
|
トリガーに表示される平均イベント時間。 |
|
トリガーに表示される最大イベント時間。 |
|
トリガーに表示される最小イベント時間。 |
|
トリガーで使用されるウォーターマークの値。 |
状態演算子オブジェクト
構造化ストリーミング ジョブで定義されているステートフル操作と、それらから生成される集計に関する情報。
メトリクス |
説明 |
---|---|
|
メトリクスが関連するステートフル演算子の名前。 たとえば、 |
|
ステートフルな演算子または集計の結果としての状態の行数。 |
|
ステートフルな演算子または集計の結果として状態で更新された行の数。 |
|
ステートフルな演算子または集計の結果として状態から削除された行の数。 |
|
すべての更新 (書き込みと削除) をコミットし、新しいバージョンを返すのにかかった時間。 |
|
状態ストアによって使用されるメモリ。 |
|
ステートフル集計に含めるには遅すぎると見なされる行の数。 ストリーミング集計のみ: 集計後に削除された行数であり、生の入力行は削除されません。 この数値は正確ではありませんが、遅延データが削除されていることを示している可能性があります。 |
|
このステートフル演算子のシャッフルパーティションの数。 |
|
オペレーターが初期化および保守した実際の状態ストア インスタンス。 多くのステートフル オペレーターでは、これはパーティションの数と同じですが、ストリームとストリームの結合によって、パーティションごとに 4 つの状態ストア インスタンスが初期化されます。 |
stateOperators.customMetrics オブジェクト
RocksDB から収集された情報で、構造化ストリーミング ジョブに対して維持するステートフルな値に関するパフォーマンスと操作に関するメトリクスをキャプチャします。 詳細については、「 Databricks での RocksDB 状態ストアの構成」を参照してください。
メトリクス |
説明 |
---|---|
|
RocksDBファイルマネージャによって追跡されるようにコピーされたバイト数。 |
|
ネイティブ RocksDB のスナップショットを取得し、ローカル ディレクトリに書き込む時間 (ミリ秒単位)。 |
|
チェックポイントのコミット中の圧縮 (オプション) の時間 (ミリ秒単位)。 |
|
ネイティブの RocksDB スナップショット関連ファイルを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。 |
|
RocksDBのメモリ内の変更をローカルディスクにフラッシュするミリ秒単位の時間。 |
|
チェックポイント コミットの一部としてバックグラウンド ワーカー スレッド (圧縮など) を停止する時間 (ミリ秒単位)。 |
|
メモリ内構造 ( |
|
RocksDBファイルマネージャによって追跡されるようにコピーされたファイルの数。 |
|
RocksDBファイルマネージャによって追跡されるように再利用されたファイルの数。 |
|
DB への |
|
基になるネイティブ |
|
RocksDBのブロックキャッシュのどれだけが有用であるかどうか、およびローカルディスクの読み取りを回避します。 |
|
RocksDBのブロックキャッシュのどれだけが有用であるかどうか、およびローカルディスクの読み取りを回避します。 |
|
すべての SST ファイルのサイズ。 SSTは静的ソートテーブルの略で、RocksDBがデータを格納するために使用する表形式構造です。 |
|
|
|
圧縮プロセスがディスクから読み取るバイト数。 |
|
一部のステートフル操作 ( |
|
|
|
圧縮プロセスがディスクに書き込むバイト数。 |
|
RocksDB 圧縮の時間ミリ秒 (バックグラウンド圧縮と、コミット中に開始されたオプションの圧縮を含む)。 |
|
バックグラウンドフラッシュを含むフラッシュ時間。 フラッシュ操作は、MemTable がいっぱいになったときにストレージにフラッシュされるプロセスです。 MemTablesは、データがRocksDBに保存される最初のレベルです。 |
|
RocksDBファイルマネージャは、物理SSTファイルのディスクスペースの使用率と削除を管理します。 このメトリクスは、ファイルマネージャーによって報告された非圧縮 zip ファイルをバイト単位で表します。 |
ソースオブジェクト (Kafka)
メトリクス |
説明 |
---|---|
|
ストリーミングクエリーが読み取っているソースの名前。 たとえば、 |
|
ストリーミングジョブが開始された Kafka トピック内の開始オフセット番号。 |
|
マイクロバッチによって処理された最新のオフセット。 これは、進行中のマイクロバッチ実行の |
|
マイクロバッチによって計算された最新のオフセット。 調整がある場合、マイクロバッチ処理プロセスですべてのオフセットが処理されず、 |
|
このソースから処理された入力行の数。 |
|
このソースの処理のためにデータが到着する速度。 |
|
Spark がこのソースのデータを処理するレート。 |
ソース.メトリクス object (Kafka)
メトリクス |
説明 |
---|---|
|
ストリーミングクエリーが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの平均数。 |
|
クエリープロセスがサブスクライブされたトピックから消費していない推定バイト数。 |
|
ストリーミングクエリーが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最大数。 |
|
ストリーミングクエリーが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最小数。 |
シンクオブジェクト (Kafka)
メトリクス |
説明 |
---|---|
|
ストリーミング クエリーが書き込んでいるシンクの名前。 たとえば、 |
|
マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行数。 状況によっては、この値は "-1" になり、通常は "不明" と解釈できます。 |
ソースオブジェクト (Delta Lake)
メトリクス |
説明 |
---|---|
|
ストリーミングクエリーが読み取っているソースの名前。 たとえば、 |
|
このオフセットがエンコードされるシリアル化のバージョン。 |
|
読み取り元のテーブルの ID。 これは、クエリーの再起動時に設定ミスを検出するために使用されます。 |
|
現在処理しているテーブルのバージョン。 |
|
このバージョンの |
|
このオフセットが、変更の処理ではなく開始中のクエリーを示すかどうか。 新しいクエリーを開始すると、開始時にテーブルに存在するすべてのデータが処理され、次に到着した新しいデータが処理されます。 |
|
マイクロバッチクエリーによって処理された最新のオフセット。 |
|
このソースから処理された入力行の数。 |
|
このソースの処理のためにデータが到着する速度。 |
|
Spark がこのソースのデータを処理するレート。 |
|
未処理のファイル(RocksDBによって追跡されるファイル)の合計サイズ。 これは、ストリーミングソースとしての Delta と Auto Loader のバックログメトリクスです。 |
|
処理する未処理ファイルの数。 これは、ストリーミングソースとしての Delta と Auto Loader のバックログメトリクスです。 |
例:
Kafka 間ストリーミングクエリリスナーイベントの例
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
例 Delta レイクからDelta レイクへのストリーミングクエリリスナーイベント
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Delta Lakeストリーミングクエリリスナーイベント レートソースの例
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}