Apply watermarks to control data processing thresholds
This article introduces the basic concepts of watermarking and provides recommendations for using watermarks in common stateful streaming operations. You must apply watermarks to stateful streaming operations to avoid infinitely expanding the amount of data kept in state, which could introduce memory issues and increase processing latencies during long-running streaming operations.
What is a watermark?
Structured Streaming uses watermarks to control the threshold for how long to continue processing updates for a given state entity. Common examples of state entities include:
Aggregations over a time window.
Unique keys in a join between two streams.
When you declare a watermark, you specify a timestamp field and a watermark threshold on a streaming DataFrame. As new data arrives, the state manager tracks the most recent timestamp in the specified field and processes all records within the lateness threshold.
The following example applies a 10 minute watermark threshold to a windowed count:
from pyspark.sql.functions import window (df .withWatermark("event_time", "10 minutes") .groupBy( window("event_time", "5 minutes"), "id") .count() )
In this example:
event_timecolumn is used to define a 10 minute watermark and a 5 minute tumbling window.
A count is collected for each
idobserved for each non-overlapping 5 minute windows.
State information is maintained for each count until the end of window is 10 minutes older than the latest observed
How do watermarks impact processing time and throughput?
Watermarks interact with output modes to control when data is written to the sink. Because watermarks reduce the total amount of state information to be processed, effective use of watermarks is essential for efficient stateful streaming throughput.
Not all output modes are supported for all stateful operations.
Watermarks and output mode for windowed aggregations
The following table details processing for queries with aggregation on a timestamp with a watermark defined:
Rows are written to the target table once the watermark threshold has passed. All writes are delayed based on the lateness threshold. Old aggregation state is dropped once the threshold has passed.
Rows are written to the target table as results are calculated, and can be updated and overwritten as new data arrives. Old aggregation state is dropped once the threshold has passed.
Aggregation state is not dropped. The target table is rewritten with each trigger.
Watermarks and output for stream-stream joins
Joins between multiple streams only support append mode, and matched records are written in each batch they are discovered. For inner joins, Databricks recommends setting a watermark threshold on each streaming data source. This allows state information to be discarded for old records. Without watermarks, Structured Streaming attempts to join every key from both sides of the join with each trigger.
Structured Streaming has special semantics to support outer joins. Watermarking is mandatory for outer joins, as it indicates when a key must be written with a null value after going unmatched. Note that while outer joins can be useful for recording records that are never matched during data processing, because joins only write to tables as append operations, this missing data is not recorded until after the lateness threshold has passed.
Control late data threshold with multiple watermark policy in Structured Streaming
When working with multiple Structured Streaming inputs, you can set multiple watermarks to control tolerance thresholds for late-arriving data. Configuring watermarks allows you to control state information and impacts latency.
A streaming query can have multiple input streams that are unioned or joined together. Each of the input streams can have a different threshold of late data that needs to be tolerated for stateful operations. Specify these thresholds using
withWatermarks("eventTime", delay) on each of the input streams. The following is an example query with stream-stream joins.
val inputStream1 = ... // delays up to 1 hour val inputStream2 = ... // delays up to 2 hours inputStream1.withWatermark("eventTime1", "1 hour") .join( inputStream2.withWatermark("eventTime2", "2 hours"), joinCondition)
While running the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations. By default, the minimum is chosen as the global watermark because it ensures that no data is accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stop receiving data due to upstream failures). In other words, the global watermark safely moves at the pace of the slowest stream and the query output is delayed accordingly.
If you want to get faster results, you can set the multiple watermark policy to choose the maximum value as the global watermark by setting the SQL configuration
max (default is
min). This lets the global watermark move at the pace of the fastest stream. However, this configuration drops data from the slowest streams. Because of this, Databricks recommends that you use this configuration judiciously.