Monitoring Structured Streaming queries on Databricks

Databricks provides built-in monitoring for Structured Streaming applications through the Spark UI under the Streaming tab.

Distinguish Structured Streaming queries in the Spark UI

Provide your streams a unique query name by adding .queryName(<query-name>) to your writeStream code to easily distinguish which metrics belong to which stream in the Spark UI.

Push Structured Streaming metrics to external services

Streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark’s Streaming Query Listener interface. In Databricks Runtime 11.3 LTS and above, the Streaming Query Listener is available in Python and Scala.

Note

Processing latency associated with listeners can adversely impact query processing. Databricks recommends minimizing processing logic in these listeners and writing to low latency sinks such as Kafka.

The following code provides basic examples of the syntax for implementing a listener:

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Defining observable metrics in Structured Streaming

Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.

You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:

  • Batch mode: Use QueryExecutionListener.

    QueryExecutionListener is called when the query completes. Access the metrics using the QueryExecution.observedMetrics map.

  • Streaming, or micro-batch: Use StreamingQueryListener.

    StreamingQueryListener is called when the streaming query completes an epoch. Access the metrics using the StreamingQueryProgress.observedMetrics map. Databricks does not support continuous execution streaming.

For example:

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener object metrics

Metric

Description

id

Unique query ID that persists across restarts. See StreamingQuery.id().

runId

Unique query ID for every start or restart. See StreamingQuery.runId().

name

User-specified name of the query. Null if not specified.

timestamp

Timestamp for the execution of the micro-batch.

batchId

Unique ID for the current batch of data being processed. Note that in the case of retries after a failure, a given batch ID can be executed more than once. Similarly, when there is no data to be processed, the batch ID is not incremented.

numInputRows

Aggregate (across all sources) number of records processed in a trigger.

inputRowsPerSecond

Aggregate (across all sources) rate of arriving data.

processedRowsPerSecond

Aggregate (across all sources) rate at which Spark is processing data.

durationMs object

Information about the time it takes to complete various stages of the micro-batch execution process.

Metric

Description

durationMs.addBatch

Time taken to execute the microbatch. This excludes the time Spark takes to plan the microbatch.

durationMs.getBatch

Time it takes to retrieve the metadata about the offsets from the source.

durationMs.latestOffset

Latest offset consumed for the microbatch. This progress object refers to the time taken to retrieve the latest offset from sources.

durationMs.queryPlanning

Time taken to generate the execution plan.

durationMs.triggerExecution

Time taken to plan and execute the microbatch.

durationMs.walCommit

Time taken to commit the new available offsets.

eventTime object

Information about the event time value seen within the data being processed in the micro-batch. This data is used by the watermark to figure out how to trim the state for processing stateful aggregations defined in the Structured Streaming job.

Metric

Description

eventTime.avg

Average event time seen in the trigger.

eventTime.max

Maximum event time seen in the trigger.

eventTime.min

Minimum event time seen in the trigger.

eventTime.watermark

Value of the watermark used in the trigger.

stateOperators object

Information about the stateful operations that are defined in the Structured Streaming job and the aggregations that are produced from them.

Metric

Description

stateOperators.operatorName

Name of the stateful operator that the metrics relate to. For example, symmetricHashJoin, dedupe, stateStoreSave.

stateOperators.numRowsTotal

Number of rows in the state as a result of the stateful operator or aggregation.

stateOperators.numRowsUpdated

Number of rows updated in the state as a result of the stateful operator or aggregation.

stateOperators.numRowsRemoved

Number of rows removed from the state as a result of the stateful operator or aggregation.

stateOperators.commitTimeMs

Time taken to commit all updates (puts and removes) and return a new version.

stateOperators.memoryUsedBytes

Memory used by the state store.

stateOperators.numRowsDroppedByWatermark

Number of rows that are considered too late to be included in the stateful aggregation. Streaming aggregations only: Number of rows dropped post-aggregation, and not raw input rows. The number is not precise, but it can indicate that late data is being dropped.

stateOperators.numShufflePartitions

Number of shuffle partitions for this stateful operator.

stateOperators.numStateStoreInstances

Actual state store instance that the operator has initialized and maintained. In many stateful operators, this is the same as the number of partitions, but stream-stream join initializes four state store instances per partition.

stateOperators.customMetrics object

Information collected from RocksDB that captures metrics about its performance and operations with respect to the stateful values it maintains for the Structured Streaming job. For more information, see Configure RocksDB state store on Databricks.

Metric

Description

customMetrics.rocksdbBytesCopied

Number of bytes copied as tracked by the RocksDB File Manager.

customMetrics.rocksdbCommitCheckpointLatency

Time in milliseconds to take a snapshot of native RocksDB and write it to a local directory.

customMetrics.rocksdbCompactLatency

Time in milliseconds for compaction (optional) during the checkpoint commit.

customMetrics.rocksdbCommitFileSyncLatencyMs

Time in milliseconds to sync the native RocksDB snapshot related files to an external storage (checkpoint location).

customMetrics.rocksdbCommitFlushLatency

Time in milliseconds to flush the RocksDB in-memory changes to your local disk.

customMetrics.rocksdbCommitPauseLatency

Time in milliseconds to stop the background worker threads (for example, for compaction) as part of the checkpoint commit.

customMetrics.rocksdbCommitWriteBatchLatency

Time in milliseconds to apply the staged writes in in-memory structure (WriteBatch) to native RocksDB.

customMetrics.rocksdbFilesCopied

Number of files copied as tracked by the RocksDB File Manager.

customMetrics.rocksdbFilesReused

Number of files reused as tracked by the RocksDB File Manager.

customMetrics.rocksdbGetCount

Number of get calls to the DB (This doesn’t include gets from WriteBatch: In-memory batch used for staging writes).

customMetrics.rocksdbGetLatency

Average time in nanoseconds for the underlying native RocksDB::Get call.

customMetrics.rocksdbReadBlockCacheHitCount

How much of the block cache in RocksDB is useful or not and avoiding local disk reads.

customMetrics.rocksdbReadBlockCacheMissCount

How much of the block cache in RocksDB is useful or not and avoiding local disk reads.

customMetrics.rocksdbSstFileSize

Size of all SST files. SST stands for Static Sorted Table, which is the tabular structure RocksDB uses to store data.

customMetrics.rocksdbTotalBytesRead

Number of uncompressed bytes read by get operations.

customMetrics.rocksdbTotalBytesReadByCompaction

Number of bytes that the compaction process reads from the disk.

customMetrics.rocksdbTotalBytesReadThroughIterator

Some of the stateful operations (for example, timeout processing in FlatMapGroupsWithState and watermarking) require reading data in DB through an iterator. This metric represents the size of uncompressed data read using the iterator.

customMetrics.rocksdbTotalBytesWritten

Number of uncompressed bytes written by put operations.

customMetrics.rocksdbTotalBytesWrittenByCompaction

Number of bytes the compaction process writes to the disk.

customMetrics.rocksdbTotalCompactionLatencyMs

Time milliseconds for RocksDB compactions, including background compactions and the optional compaction initiated during the commit.

customMetrics.rocksdbTotalFlushLatencyMs

Flush time, including background flushing. Flush operations are processes by which the MemTable is flushed to storage once it’s full. MemTables are the first level where data is stored in RocksDB.

customMetrics.rocksdbZipFileBytesUncompressed

RocksDB File Manager manages the physical SST file disk space utilization and deletion. This metric represents the uncompressed zip files in bytes as reported by the File Manager.

sources object (Kafka)

Metric

Description

sources.description

Name of the source the streaming query is reading from. For example, “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.

sources.startOffset object

Starting offset number within the Kafka topic that the streaming job started at.

sources.endOffset object

Latest offset processed by the microbatch. This could be equal to latestOffset for an ongoing microbatch execution.

sources.latestOffset object

Latest offset figured by the microbatch. When there is throttling, the micro-batching process might not process all offsets, causing endOffset and latestOffset to differ.

sources.numInputRows

Number of input rows processed from this source.

sources.inputRowsPerSecond

Rate at which data is arriving for processing for this source.

sources.processedRowsPerSecond

Rate at which Spark is processing data for this source.

sources.metrics object (Kafka)

Metric

Description

sources.metrics.avgOffsetsBehindLatest

Average number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

sources.metrics.estimatedTotalBytesBehindLatest

Estimated number of bytes that the query process has not consumed from the subscribed topics.

sources.metrics.maxOffsetsBehindLatest

Maximum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

sources.metrics.minOffsetsBehindLatest

Minimum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

sink object (Kafka)

Metric

Description

sink.description

Name of the sink the streaming query is writing to. For example, “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.

sink.numOutputRows

Number of rows that were written to the output table or sink as part of the microbatch. For some situations, this value can be “-1” and generally can be interpreted as “unknown”.

sources object (Delta Lake)

Metric

Description

sources.description

Name of the source the streaming query is reading from. For example, “DeltaSource[table]”.

sources.[startOffset/endOffset].sourceVersion

Version of serialization that this offset is encoded with.

sources.[startOffset/endOffset].reservoirId

ID of the table you are reading from. This is used to detect misconfiguration when restarting a query.

sources.[startOffset/endOffset].reservoirVersion

Version of the table that you are currently processing.

sources.[startOffset/endOffset].index

Index in the sequence of AddFiles in this version. This is used to break large commits into multiple batches. This index is created by sorting on modificationTimestamp and path.

sources.[startOffset/endOffset].isStartingVersion

Whether this offset denotes a query that is starting rather than processing changes. When starting a new query, all data present in the table at the start is processed, and then new data that has arrived.

sources.latestOffset

Latest offset processed by the microbatch query.

sources.numInputRows

Number of input rows processed from this source.

sources.inputRowsPerSecond

Rate at which data is arriving for processing for this source.

sources.processedRowsPerSecond

Rate at which Spark is processing data for this source.

sources.metrics.numBytesOutstanding

Size of the outstanding files (files tracked by RocksDB) combined. This is the backlog metric for Delta and Auto Loader as the streaming source.

sources.metrics.numFilesOutstanding

Number of outstanding files to be processed. This is the backlog metric for Delta and Auto Loader as the streaming source.

sink object (Delta Lake)

Metric

Description

sink.description

Name of the sink that the streaming query writes to. For example, “DeltaSink[table]”.

sink.numOutputRows

Number of rows in this metric is “-1” because Spark can’t infer output rows for DSv1 sinks, which is the classification for the Delta Lake sink.

Examples

Example Kafka-to-Kafka StreamingQueryListener event

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Example Delta Lake-to-Delta Lake StreamingQueryListener event

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Example rate source to Delta Lake StreamingQueryListener event

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}