Structured Streaming in production

It is convenient to attach a notebook to a cluster and run your streaming queries interactively. However, when you run them in production, you are likely to want more robustness and uptime guarantees. This article discusses how to make your streaming application more fault tolerant using Databricks jobs.

Define the timing of streaming data processing

Use a trigger to define the timing of streaming data processing. When you specify a trigger interval that is too small (less than tens of seconds), the system may perform unnecessary checks to see if new data arrives. As a best practice, we recommend that you specify a tailored trigger to minimize the cost.

Recover from query failures

A production-grade streaming application must have robust failure handling. In Structured Streaming, if you enable checkpointing for a streaming query, then you can restart the query after a failure and the restarted query will continue where the failed one left off, while ensuring fault tolerance and data consistency guarantees. Hence, to make your queries fault tolerant, you must enable query checkpointing and configure Databricks jobs to restart your queries automatically after a failure.

Enable checkpointing

To enable checkpointing, set the option checkpointLocation to a DBFS or cloud storage path before you start the query. For example:

  .option("path", "dbfs://outputPath/")
  .option("checkpointLocation", "dbfs://checkpointPath")

This checkpoint location preserves all of the essential information that uniquely identifies a query. Hence, each query must have a different checkpoint location, and multiple queries should never have the same location. See the Structured Streaming Programming Guide for more details.


While checkpointLocation is a required option for most types of output sinks, some sinks, such as memory sink, may automatically generate a temporary checkpoint location on DBFS when you do not provide checkpointLocation. The temporary checkpoint locations do not ensure any fault tolerance or data consistency guarantees, and may not get cleaned up properly. As a best practice, we recommend that you always specify the checkpointLocation option.

Configure jobs to restart streaming queries on failure

You can create a Databricks job with the notebook or JAR that has your streaming queries and configure it to:

  • Always use a new cluster.

  • Always retry on failure.

Jobs have tight integration with Structured Streaming APIs and can monitor all streaming queries active in a run. This configuration ensures that if any part of the query fails, jobs automatically terminate the run (along all the other queries) and start a new run in a new cluster. The new run re-executes the notebook or JAR code and restarts all of the queries again. This is the safest way to ensure that you get back into a good state.


Notebook workflows are not supported with long-running jobs. Therefore we don’t recommend using notebook workflows in your streaming jobs.


  • Failure in any of the active streaming queries causes the active run to fail and terminate all the other streaming queries.

  • You do not need to use streamingQuery.awaitTermination() or spark.streams.awaitAnyTermination() at the end of your notebook. Jobs automatically prevent a run from completing when a streaming query is active.

Here are the details of the recommended job configuration.

  • Cluster: Set this always to use a new cluster and use the latest Spark version (or at least version 2.1). Queries started in Spark 2.1 and above are recoverable after query and Spark version upgrades.

  • Alerts: Set this if you want email notification on failures.

  • Schedule: Do not set a schedule.

  • Timeout: Do not set a timeout. Streaming queries run for an indefinitely long time.

  • Maximum concurrent runs: Set to 1. There must be only one instance of each query concurrently active.

  • Retries: Set to Unlimited.

See Jobs to understand these configurations. Here is a screenshot of a good job configuration.

Job configuration

Recover after changes in a streaming query

There are limitations on what changes in a streaming query are allowed between restarts from the same checkpoint location. Here are a few kinds of changes that are either not allowed, or the effect of the change is not well-defined. For all of them:

  • The term allowed means you can do the specified change but whether the semantics of its effect is well-defined depends on the query and the change.

  • The term not allowed means you should not do the specified change as the restarted query is likely to fail with unpredictable errors.

  • sdf represents a streaming DataFrame/Dataset generated with sparkSession.readStream.

Types of changes

  • Changes in the number or type (that is, different source) of input sources: This is not allowed.

  • Changes in the parameters of input sources: Whether this is allowed and whether the semantics of the change are well-defined depends on the source and the query. Here are a few examples.

    • Addition, deletion, and modification of rate limits is allowed:

      spark.readStream.format("kafka").option("subscribe", "article")


      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
    • Changes to subscribed articles and files are generally not allowed as the results are unpredictable: spark.readStream.format("kafka").option("subscribe", "article") to spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Changes in the type of output sink: Changes between a few specific combinations of sinks are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.

    • File sink to Kafka sink is allowed. Kafka will see only the new data.

    • Kafka sink to file sink is not allowed.

    • Kafka sink changed to foreach, or vice versa is allowed.

  • Changes in the parameters of output sink: Whether this is allowed and whether the semantics of the change are well-defined depends on the sink and the query. Here are a few examples.

    • Changes to output directory of a file sink is not allowed: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")

    • Changes to output article is allowed: sdf.writeStream.format("kafka").option("article", "somearticle") to sdf.writeStream.format("kafka").option("path", "anotherarticle")

    • Changes to the user-defined foreach sink (that is, the ForeachWriter code) is allowed, but the semantics of the change depends on the code.

  • Changes in projection / filter / map-like operations: Some cases are allowed. For example:

    • Addition / deletion of filters is allowed: sdf.selectExpr("a") to sdf.where(...).selectExpr("a").filter(...).

    • Changes in projections with same output schema is allowed: sdf.selectExpr("stringColumn AS json").writeStream to"json")).writeStream.

    • Changes in projections with different output schema are conditionally allowed: sdf.selectExpr("a").writeStream to sdf.selectExpr("b").writeStream is allowed only if the output sink allows the schema change from "a" to "b".

  • Changes in stateful operations: Some operations in streaming queries need to maintain state data in order to continuously update the result. Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, DBFS, GCS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts. This means that any changes (that is, additions, deletions, or schema modifications) to the stateful operations of a streaming query are not allowed between restarts. Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery:

    • Streaming aggregation: For example, sdf.groupBy("a").agg(...). Any change in number or type of grouping keys or aggregates is not allowed.

    • Streaming deduplication: For example, sdf.dropDuplicates("a"). Any change in number or type of grouping keys or aggregates is not allowed.

    • Stream-stream join: For example, sdf1.join(sdf2, ...) (i.e. both inputs are generated with sparkSession.readStream). Changes in the schema or equi-joining columns are not allowed. Changes in join type (outer or inner) not allowed. Other changes in the join condition are ill-defined.

    • Arbitrary stateful operation: For example, sdf.groupByKey(...).mapGroupsWithState(...) or sdf.groupByKey(...).flatMapGroupsWithState(...). Any change to the schema of the user-defined state and the type of timeout is not allowed. Any change within the user-defined state-mapping function are allowed, but the semantic effect of the change depends on the user-defined logic. If you really want to support state schema changes, then you can explicitly encode/decode your complex state data structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully.

Monitoring streaming queries

You can monitor your streaming applications through the Spark UI under the Streaming tab. By providing your streams a query name with df.writeStream.queryName(<query_name>), you can easily distinguish which metrics belong to which stream in the Spark UI.

The streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark’s Streaming Query Listener interface. The Streaming Query Listener is only available in Scala:

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

   * Called when a query is started.
   * @note This is called synchronously with
   *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
   *       that is, `onQueryStart` will be called on all listeners before
   *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
   *        Do not block in this method as it will block your query.
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

   * Called when there is some status update (ingestion rate updated, etc.)
   * @note This method is asynchronous. The status in [[StreamingQuery]] will always be
   *       latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
   *       may be changed before/when you process the event. For example, you may find [[StreamingQuery]]
   *       is terminated when you are processing `QueryProgressEvent`.
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

   * Called when a query is stopped, with or without error.
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}

The observe method

Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.

You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:

  • Batch mode: Use QueryExecutionListener.

    QueryExecutionListener is called when the query completes. Access the metrics using the QueryExecution.observedMetrics map.

  • Streaming, or micro-batch: Use StreamingQueryListener.

    StreamingQueryListener is called when the streaming query completes an epoch. Access the metrics using the StreamingQueryProgress.observedMetrics map. Databricks does not support continuous execution streaming.

For example:

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert

Configure Apache Spark scheduler pools for efficiency

By default, all queries started in a notebook run in the same fair scheduling pool. Therefore, jobs generated by triggers from all of the streaming queries in a notebook run one after another in first in, first out (FIFO) order. This can cause unnecessary delays in the queries, because they are not efficiently sharing the cluster resources.

To enable all streaming queries to execute jobs concurrently and to share the cluster efficiently, you can set the queries to execute in separate scheduler pools. For example:

// Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")

// Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")


The local property configure must be in the same notebook cell where you start your streaming query.

See Apache fair scheduler documentation for more details.

Optimize performance of stateful streaming queries

If you have stateful operations in your streaming query (for example, streaming aggregation, streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or flatMapGroupsWithState) and you want to maintain millions of keys in the state, then you may face issues related to large JVM garbage collection (GC) pauses causing high variations in the micro-batch processing times. This occurs because, by default, the state data is maintained in the JVM memory of the executors and large number of state objects puts memory pressure on the JVM causing high GC pauses.

In such cases, you can choose to use a more optimized state management solution based on RocksDB. This solution is available in Databricks Runtime. Rather than keeping the state in the JVM memory, this solution uses RocksDB to efficiently manage the state in the native memory and the local SSD (for instance types with a local SSD). Furthermore, any changes to this state are automatically saved by Structured Streaming to the checkpoint location you have provided, thus providing full fault-tolerance guarantees (the same as default state management). For instructions for configuring RocksDB as state store, see Configure RocksDB state store.

Other recommended configurations for best performance:

  • Use compute-optimized instances as workers. For example, Google Cloud n1-highcpu-32 instances.

  • Set the number of shuffle partitions to 1-2 times number of cores in the cluster.

  • Set the spark.sql.streaming.noDataMicroBatches.enabled configuration to false in the SparkSession. This prevents the streaming micro-batch engine from processing micro-batches that do not contain data. Note also that setting this configuration to false could result in stateful operations that leverage watermarks or processing time timeouts to not get data output until new data arrives instead of immediately.

Regarding performance benefits, RocksDB-based state management can maintain 100 times more state keys than the default one. For example, in a Spark cluster with Google Cloud n1-highcpu-32 instances as workers, the default state management can maintain up to 1-2 million state keys per executor after which the JVM GC starts affecting performance significantly. In contrast, the RocksDB-based state management can easily maintain 100 million state keys per executor without any GC issues.


The state management scheme cannot be changed between query restarts. That is, if a query has been started with the default management, then it cannot changed without starting the query from scratch with a new checkpoint location.

Configure RocksDB state store

You can enable RockDB-based state management by setting the following configuration in the SparkSession before starting the streaming query.


RocksDB state store metrics

Each state operator collects metrics related to the state management operations performed on its RocksDB instance to observe the state store and potentially help in debugging job slowness. These metrics are aggregated (sum) per state operator in job across all tasks where the state operator is running. These metrics are part of the customMetrics map inside the stateOperators fields in StreamingQueryProgress. The following is an example of StreamingQueryProgress in JSON form (obtained using StreamingQueryProgress.json()).

  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
  } ],
  "sources" : [ {
  } ],
  "sink" : {

Detailed descriptions of the metrics are as follows:

Metric name



Time (in millis) took for applying the staged writes in in-memory structure (WriteBatch) to native RocksDB.


Time (in millis) took for flushing the RocksDB in-memory changes to local disk.


Time (in millis) took for compaction (optional) during the checkpoint commit.


Time (in millis) took for stopping the background worker threads (for compaction etc.) as part of the checkpoint commit.


Time (in millis) took for taking a snapshot of native RocksDB and write it to a local directory.


Time (in millis) took for syncing the native RocksDB snapshot related files to an external storage (checkpoint location).


Average time (in nanos) took per the underlying native RocksDB::Get call.


Average time (in nanos) took per the underlying native RocksDB::Put call.


Number of native RocksDB::Get calls (doesn’t include Gets from WriteBatch - in memory batch used for staging writes).


Number of native RocksDB::Put calls (doesn’t include Puts to WriteBatch - in memory batch used for staging writes).


Number of uncompressed bytes read through native RocksDB::Get calls.


Number of uncompressed bytes written through native RocksDB::Put calls.


Number of times the native RocksDB block cache is used to avoid reading data from local disk.


Number of times the native RocksDB block cache missed and required reading data from local disk.


Number of bytes read from the local disk by the native RocksDB compaction process.


Number of bytes written to the local disk by the native RocksDB compaction process.


Time (in millis) took for RocksDB compactions (both background and the optional compaction initiated during the commit).


Time (in millis) the writer has stalled due to a background compaction or flushing of the memtables to disk.


Some of the stateful operations (such as timeout processing in flatMapGroupsWithState or watermarking in windowed aggregations) requires reading entire data in DB through iterator. The total size of uncompressed data read using the iterator.

Asynchronous state checkpointing


Available in Databricks Runtime 10.3 and above.

Enable asynchronous state checkpointing in stateful streaming queries with large state updates to potentially reduce the end-to-end micro-batch latency.

Structured Streaming currently uses synchronous checkpointing, which means that every micro-batch will ensure that all the state updates made in the batch are backed up in cloud storage (called “checkpoint location”) before starting the next batch. If a stateful streaming query fails, all micro-batches except the last micro-batch are guaranteed to be checkpointed. Hence, on restart, only the last batch may need to be re-executed. However, this fast recovery with synchronous checkpointing comes at the cost of higher latency for each micro-batch.

Streaming State Checkpointing Modes

Asynchronous state checkpointing attempts to perform the checkpointing asynchronously so that the micro-batch execution doesn’t have to wait for the checkpoint to complete. In other words, the next micro-batch can start as soon as the computation of the previous micro-batch has been completed. Internally, however, the offset metadata (also saved in the checkpoint location) tracks whether the state checkpointing has been completed for a micro-batch. On query restart, more than one micro-batch may need to be re-executed - the last micro-batch whose computation was incomplete, as well as the one micro-batch before it whose state checkpointing was incomplete. And you get the same fault-tolerance guarantees (that is, exactly-once guarantees with an idempotent sink) as that of synchronous checkpointing.

To summarize, for stateful streaming queries bottlenecked on state updates, enabling asynchronous state checkpointing can reduce end-to-end latencies without sacrificing any fault-tolerance guarantees, but with a minor cost of higher restart delays.

Identifying target workloads

Following are characteristics of the streaming jobs that can potentially benefit from asynchronous state checkpointing.

  • Job has one or more stateful operations (e.g., aggregation, [flat]MapGroupsWithState, stream-stream joins)

  • State checkpoint latency is one of the major contributors to overall batch execution latency. This information can be found in the StreamingQueryProgress events. These events are found in log4j logs on Spark driver as well. Here is an example of streaming query progress and how to find the state checkpoint impact on the overall batch execution latency.

    •  {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "batchId" : 0,
         "durationMs" : {
           "triggerExecution" : 547730,
         "stateOperators" : [ {
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
    • State checkpoint latency analysis of above query progress event

      • Batch duration (durationMs.triggerDuration) is around 547 secs.

      • State store commit latency (stateOperations[0].commitTimeMs) is around 3,186 secs. Commit latency is aggregated across tasks containing a state store. In this case there are 64 such tasks (stateOperators[0].numShufflePartitions).

      • Each task containing state operator took an average of 50 sec (3,186/64) for checkpoint. This is an extra latency that is contributed to the batch duration. Assuming all 64 tasks are running concurrently, checkpoint step contributed around 9% (50 secs / 547 secs) of the batch duration. The percentage gets even higher when the max concurrent tasks is less than 64.

Enabling asynchronous state checkpointing

Set following configuration in streaming job. Async checkpointing needs a state store implementation that supports async commits. Currently only the RocksDB based state store implementation supports it.




  • Any failure in an asynchronous checkpoint at any one or more stores fails the query. In synchronous checkpointing mode, the checkpoint is executed as part of the task and Spark retries the task multiple times before failing the query. This mechanism is not present with asynchronous state checkpointing. However, using the Databricks job retries, such failures can be automatically retried.

  • Auto-scaling in combination with asynchronous state checkpointing may not work well: asynchronous checkpointing works best when the state store locations are not changed between micro-batch executions. With auto-scaling enabled, the state stores instance may get re-distributed as nodes are added or deleted as part of the auto-scaling.

  • Asynchronous state checkpointing is supported only in the RocksDB state store provider implementation. The default in-memory state store implementation does not support it.

Multiple watermark policy

A streaming query can have multiple input streams that are unioned or joined together. Each of the input streams can have a different threshold of late data that needs to be tolerated for stateful operations. You specify these thresholds using withWatermarks("eventTime", delay) on each of the input streams. For example, consider a query with stream-stream joins

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
    inputStream2.withWatermark("eventTime2", "2 hours"),

While executing the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations. By default, the minimum is chosen as the global watermark because it ensures that no data is accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stop receiving data due to upstream failures). In other words, the global watermark will safely move at the pace of the slowest stream and the query output will be delayed accordingly.

In some cases you may want to get faster results even if it means dropping data from the slowest stream. You can set the multiple watermark policy to choose the maximum value as the global watermark by setting the SQL configuration spark.sql.streaming.multipleWatermarkPolicy to max (default is min). This lets the global watermark move at the pace of the fastest stream. However, as a side effect data from the slower streams will be aggressively dropped. Hence, Databricks recommends that you use this configuration judiciously.

Visualize a Structured Streaming DataFrame

You can use the display function to visualize a Structured Streaming DataFrame in real time. While the trigger and checkpointLocation parameters are optional, as a best practice Databricks recommends that you always specify them in production.

import org.apache.spark.sql.streaming.Trigger

val streaming_df = spark.readStream.format("rate").load()
display(streaming_df.groupBy().count(), trigger = Trigger.ProcessingTime("5 seconds"), checkpointLocation = "dbfs:/<checkpoint-path>")
streaming_df = spark.readStream.format("rate").load()
display(streaming_df.groupBy().count(), processingTime = "5 seconds", checkpointLocation = "dbfs:/<checkpoint-path>")

For more information, see Structured Streaming DataFrames.

State Operator flatMapGroupsWithState Improvements

Specify initial state

You can specify a user defined initial state for structured streaming stateful processing using [flat]MapGroupsWithState operator. This allows you to avoid redoing the entire processing.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Example use case that specifies an initial state to the flatMapGroupsWithState operator:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Example use case that specifies an initial state to the mapGroupsWithState operator:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Test state update function

The TestGroupState API enables you to test the state update function used for Dataset.groupByKey(...).mapGroupsWithState(...) and Dataset.groupByKey(...).flatMapGroupsWithState(...).

The state update function takes the previous state as input using an object of type GroupState. See the Apache Spark GroupState reference documentation. For example:

import org.apache.spark.sql.streaming._

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...


  updateState(userId, actions, prevState)