Optimize stateful processing in Delta Live Tables with watermarks

To effectively manage the data kept in state, use watermarks when performing stateful stream processing in Delta Live Tables, including aggregations, joins, and deduplication. This article describes how to use watermarks in your Delta Live Tables queries and includes examples of the recommended operations.

Note

To ensure queries that perform aggregations are processed incrementally and not fully recomputed with each update, you must use watermarks.

What is a watermark?

In stream processing, a watermark is an Apache Spark feature that can define a time-based threshold for processing data when performing stateful operations such as aggregations. Data arriving is processed until the threshold is reached, at which point the time window defined by the threshold is closed. Watermarks can be used to avoid problems during query processing, mainly when processing larger datasets or long-running processing. These problems can include high latency in producing results and even out-of-memory (OOM) errors because of the amount of data kept in state during processing. Because streaming data is inherently unordered, watermarks also support correctly calculating operations like time-window aggregations.

To learn more about using watermarks in stream processing, see Watermarking in Apache Spark Structured Streaming and Apply watermarks to control data processing thresholds.

How do you define a watermark?

You define a watermark by specifying a timestamp field and a value representing the time threshold for late data to arrive. Data is considered late if it arrives after the defined time threshold. For example, if the threshold is defined as 10 minutes, records arriving after the 10-minute threshold might be dropped.

Because records that arrive after the defined threshold might be dropped, selecting a threshold that meets your latency vs. correctness requirements is important. Choosing a smaller threshold results in records being emitted sooner but also means late records are more likely to be dropped. A larger threshold means a longer wait but possibly more completeness of data. Because of the larger state size, a larger threshold might also require additional computing resources. Because the threshold value depends on your data and processing requirements, testing and monitoring your processing is important to determine an optimal threshold.

You use the withWatermark() function in Python to define a watermark. In SQL, use the WATERMARK clause to define a watermark:

withWatermark("timestamp", "3 minutes")
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Use watermarks with stream-stream joins

For stream-stream joins, you must define a watermark on both sides of the join and a time interval clause. Because each join source has an incomplete view of the data, the time interval clause is required to tell the streaming engine when no further matches can be made. The time interval clause must use the same fields used to define the watermarks.

Because there might be times when each stream requires different thresholds for watermarks, the streams do not need to have the same thresholds. To avoid missing data, the streaming engine maintains one global watermark based on the slowest stream.

The following example joins a stream of ad impressions and a stream of user clicks on ads. In this example, a click must occur within 3 minutes of the impression. After the 3-minute time interval passes, rows from the state that can no longer be matched are dropped.

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf
CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Perform windowed aggregations with watermarks

A common stateful operation on streaming data is a windowed aggregation. Windowed aggregations are similar to grouped aggregations, except that aggregate values are returned for the set of rows that are part of the defined window.

A window can be defined as a certain length, and an aggregation operation can be performed on all rows that are part of that window. Spark Streaming supports three types of windows:

  • Tumbling (fixed) windows: A series of fixed-sized, non-overlapping, and contiguous time intervals. An input record belongs to only a single window.

  • Sliding windows: Similar to tumbling windows, sliding windows are fixed-sized, but windows can overlap, and a record can fall into multiple windows.

When data arrives past the end of the window plus the length of the watermark, no new data is accepted for the window, the result of the aggregation is emitted, and the state for the window is dropped.

The following example calculates a sum of impressions every 5 minutes using a fixed window. In this example, the select clause uses the alias impressions_window, and then the window itself is defined as part of the GROUP BY clause. The window must be based on the same timestamp column as the watermark, the clickTimestamp column in this example.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

A similar example in Python to calculate profit over hourly fixed windows:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    dlt.read_stream("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplicate streaming records

Structured Streaming has exactly-once processing guarantees but does not automatically de-duplicate records from data sources. For example, because many message queues have at-least once guarantees, duplicate records should be expected when reading from one of these message queues. You can use the dropDuplicatesWithinWatermark() function to de-duplicate records on any specified field, removing duplicates from a stream even if some fields differ (such as event time or arrival time). You must specify a watermark to use the dropDuplicatesWithinWatermark() function. All duplicate data that arrives within the time range specified by the watermark are dropped.

Ordered data is important because out-of-order data causes the watermark value to jump ahead incorrectly. Then, when older data arrives, it is considered late and dropped. Use the withEventTimeOrder option to process the initial snapshot in order based on the timestamp specified in the watermark. The withEventTimeOrder option can be declared in the code defining the dataset or in the pipeline settings using spark.databricks.delta.withEventTimeOrder.enabled. For example:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Note

The withEventTimeOrder option is supported only with Python.

In the following example, data is processed ordered by clickTimestamp, and records arriving within 5 seconds of each other that contain duplicate userId and clickAdId columns are dropped.

clicksDedupDf = (
  spark.readStream
    .option("withEventTimeOrder", "true")
    .table(rawClicks)
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimize pipeline configuration for stateful processing

To help prevent production issues and excessive latency, Databricks recommends enabling RocksDB-based state management for your stateful stream processing, particularly if your processing requires saving a large amount of intermediate state. To enable the RocksDB state store, see Enable RocksDB state store for Delta Live Tables.