Monitoring Structured Streaming queries on Databricks
Databricks provides built-in monitoring for Structured Streaming applications through the Spark UI under the Streaming tab.
Distinguish Structured Streaming queries in the Spark UI
Provide your streams a unique query name by adding .queryName(<query-name>)
to your writeStream
code to easily distinguish which metrics belong to which stream in the Spark UI.
Push Structured Streaming metrics to external services
Streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark’s Streaming Query Listener interface. In Databricks Runtime 11.3 LTS and above, the Streaming Query Listener is available in Python and Scala.
Note
Processing latency associated with listeners can adversely impact query processing. Databricks recommends minimizing processing logic in these listeners and writing to low latency sinks such as Kafka.
The following code provides basic examples of the syntax for implementing a listener:
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Defining observable metrics in Structured Streaming
Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.
You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:
Batch mode: Use
QueryExecutionListener
.QueryExecutionListener
is called when the query completes. Access the metrics using theQueryExecution.observedMetrics
map.Streaming, or micro-batch: Use
StreamingQueryListener
.StreamingQueryListener
is called when the streaming query completes an epoch. Access the metrics using theStreamingQueryProgress.observedMetrics
map. Databricks does not support continuous execution streaming.
For example:
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
StreamingQueryListener object metrics
Metric |
Description |
---|---|
|
Unique query ID that persists across restarts. See StreamingQuery.id(). |
|
Unique query ID for every start or restart. See StreamingQuery.runId(). |
|
User-specified name of the query. Null if not specified. |
|
Timestamp for the execution of the micro-batch. |
|
Unique ID for the current batch of data being processed. Note that in the case of retries after a failure, a given batch ID can be executed more than once. Similarly, when there is no data to be processed, the batch ID is not incremented. |
|
Aggregate (across all sources) number of records processed in a trigger. |
|
Aggregate (across all sources) rate of arriving data. |
|
Aggregate (across all sources) rate at which Spark is processing data. |
durationMs object
Information about the time it takes to complete various stages of the micro-batch execution process.
Metric |
Description |
---|---|
|
Time taken to execute the microbatch. This excludes the time Spark takes to plan the microbatch. |
|
Time it takes to retrieve the metadata about the offsets from the source. |
|
Latest offset consumed for the microbatch. This progress object refers to the time taken to retrieve the latest offset from sources. |
|
Time taken to generate the execution plan. |
|
Time taken to plan and execute the microbatch. |
|
Time taken to commit the new available offsets. |
eventTime object
Information about the event time value seen within the data being processed in the micro-batch. This data is used by the watermark to figure out how to trim the state for processing stateful aggregations defined in the Structured Streaming job.
Metric |
Description |
---|---|
|
Average event time seen in the trigger. |
|
Maximum event time seen in the trigger. |
|
Minimum event time seen in the trigger. |
|
Value of the watermark used in the trigger. |
stateOperators object
Information about the stateful operations that are defined in the Structured Streaming job and the aggregations that are produced from them.
Metric |
Description |
---|---|
|
Name of the stateful operator that the metrics relate to. For example, |
|
Number of rows in the state as a result of the stateful operator or aggregation. |
|
Number of rows updated in the state as a result of the stateful operator or aggregation. |
|
Number of rows removed from the state as a result of the stateful operator or aggregation. |
|
Time taken to commit all updates (puts and removes) and return a new version. |
|
Memory used by the state store. |
|
Number of rows that are considered too late to be included in the stateful aggregation. Streaming aggregations only: Number of rows dropped post-aggregation, and not raw input rows. The number is not precise, but it can indicate that late data is being dropped. |
|
Number of shuffle partitions for this stateful operator. |
|
Actual state store instance that the operator has initialized and maintained. In many stateful operators, this is the same as the number of partitions, but stream-stream join initializes four state store instances per partition. |
stateOperators.customMetrics object
Information collected from RocksDB that captures metrics about its performance and operations with respect to the stateful values it maintains for the Structured Streaming job. For more information, see Configure RocksDB state store on Databricks.
Metric |
Description |
---|---|
|
Number of bytes copied as tracked by the RocksDB File Manager. |
|
Time in milliseconds to take a snapshot of native RocksDB and write it to a local directory. |
|
Time in milliseconds for compaction (optional) during the checkpoint commit. |
|
Time in milliseconds to sync the native RocksDB snapshot related files to an external storage (checkpoint location). |
|
Time in milliseconds to flush the RocksDB in-memory changes to your local disk. |
|
Time in milliseconds to stop the background worker threads (for example, for compaction) as part of the checkpoint commit. |
|
Time in milliseconds to apply the staged writes in in-memory structure ( |
|
Number of files copied as tracked by the RocksDB File Manager. |
|
Number of files reused as tracked by the RocksDB File Manager. |
|
Number of |
|
Average time in nanoseconds for the underlying native |
|
How much of the block cache in RocksDB is useful or not and avoiding local disk reads. |
|
How much of the block cache in RocksDB is useful or not and avoiding local disk reads. |
|
Size of all SST files. SST stands for Static Sorted Table, which is the tabular structure RocksDB uses to store data. |
|
Number of uncompressed bytes read by |
|
Number of bytes that the compaction process reads from the disk. |
|
Some of the stateful operations (for example, timeout processing in |
|
Number of uncompressed bytes written by |
|
Number of bytes the compaction process writes to the disk. |
|
Time milliseconds for RocksDB compactions, including background compactions and the optional compaction initiated during the commit. |
|
Flush time, including background flushing. Flush operations are processes by which the MemTable is flushed to storage once it’s full. MemTables are the first level where data is stored in RocksDB. |
|
RocksDB File Manager manages the physical SST file disk space utilization and deletion. This metric represents the uncompressed zip files in bytes as reported by the File Manager. |
sources object (Kafka)
Metric |
Description |
---|---|
|
Name of the source the streaming query is reading from. For example, |
|
Starting offset number within the Kafka topic that the streaming job started at. |
|
Latest offset processed by the microbatch. This could be equal to |
|
Latest offset figured by the microbatch. When there is throttling, the micro-batching process might not process all offsets, causing |
|
Number of input rows processed from this source. |
|
Rate at which data is arriving for processing for this source. |
|
Rate at which Spark is processing data for this source. |
sources.metrics object (Kafka)
Metric |
Description |
---|---|
|
Average number of offsets that the streaming query is behind the latest available offset among all the subscribed topics. |
|
Estimated number of bytes that the query process has not consumed from the subscribed topics. |
|
Maximum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics. |
|
Minimum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics. |
sink object (Kafka)
Metric |
Description |
---|---|
|
Name of the sink the streaming query is writing to. For example, |
|
Number of rows that were written to the output table or sink as part of the microbatch. For some situations, this value can be “-1” and generally can be interpreted as “unknown”. |
sources object (Delta Lake)
Metric |
Description |
---|---|
|
Name of the source the streaming query is reading from. For example, |
|
Version of serialization that this offset is encoded with. |
|
ID of the table you are reading from. This is used to detect misconfiguration when restarting a query. |
|
Version of the table that you are currently processing. |
|
Index in the sequence of |
|
Whether this offset denotes a query that is starting rather than processing changes. When starting a new query, all data present in the table at the start is processed, and then new data that has arrived. |
|
Latest offset processed by the microbatch query. |
|
Number of input rows processed from this source. |
|
Rate at which data is arriving for processing for this source. |
|
Rate at which Spark is processing data for this source. |
|
Size of the outstanding files (files tracked by RocksDB) combined. This is the backlog metric for Delta and Auto Loader as the streaming source. |
|
Number of outstanding files to be processed. This is the backlog metric for Delta and Auto Loader as the streaming source. |
sink object (Delta Lake)
Metric |
Description |
---|---|
|
Name of the sink that the streaming query writes to. For example, |
|
Number of rows in this metric is “-1” because Spark can’t infer output rows for DSv1 sinks, which is the classification for the Delta Lake sink. |
Examples
Example Kafka-to-Kafka StreamingQueryListener event
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Example Delta Lake-to-Delta Lake StreamingQueryListener event
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Example rate source to Delta Lake StreamingQueryListener event
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}