Structured Streaming checkpoints
Checkpoints and write-ahead logs work together to provide processing guarantees for Structured Streaming workloads. The checkpoint tracks the information that identifies the query, including state information and processed records. When you delete the files in a checkpoint directory or change to a new checkpoint location, the next run of the query begins fresh.
Each query must have a different checkpoint location. Multiple queries should never share the same location.
Enable checkpointing for Structured Streaming queries
You must specify the checkpointLocation
option before you run a streaming query, as in the following example:
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Note
Some sinks, such as the output for display()
in notebooks and the memory
sink, automatically generate a temporary checkpoint location if you omit this option. These temporary checkpoint locations do not ensure any fault tolerance or data consistency guarantees and might not get cleaned up properly. Databricks recommends always specifying a checkpoint location for these sinks.
Recover after changes in a Structured Streaming query
There are limitations on what changes in a streaming query are allowed between restarts from the same checkpoint location. Here are a few changes that are either not allowed or the effect of the change is not well-defined. For all of them:
The term allowed means you can do the specified change but whether the semantics of its effect is well-defined depends on the query and the change.
The term not allowed means you should not do the specified change as the restarted query is likely to fail with unpredictable errors.
sdf
represents a streaming DataFrame/Dataset generated withsparkSession.readStream
.
Types of changes in Structured Streaming queries
Changes in the number or type (that is, different source) of input sources: This is not allowed.
Changes in the parameters of input sources: Whether this is allowed and whether the semantics of the change are well-defined depends on the source and the query. Here are a few examples.
Addition, deletion, and modification of rate limits is allowed:
spark.readStream.format("kafka").option("subscribe", "article")
to
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Changes to subscribed articles and files are generally not allowed as the results are unpredictable:
spark.readStream.format("kafka").option("subscribe", "article")
tospark.readStream.format("kafka").option("subscribe", "newarticle")
Changes in the trigger interval: You can change triggers between incremental batches and time intervals. See Changing trigger intervals between runs.
Changes in the type of output sink: Changes between a few specific combinations of sinks are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
File sink to Kafka sink is allowed. Kafka will see only the new data.
Kafka sink to file sink is not allowed.
Kafka sink changed to foreach, or vice versa is allowed.
Changes in the parameters of output sink: Whether this is allowed and whether the semantics of the change are well-defined depends on the sink and the query. Here are a few examples.
Changes to output directory of a file sink is not allowed:
sdf.writeStream.format("parquet").option("path", "/somePath")
tosdf.writeStream.format("parquet").option("path", "/anotherPath")
Changes to output topic is allowed:
sdf.writeStream.format("kafka").option("topic", "topic1")
tosdf.writeStream.format("kafka").option("topic", "topic2")
Changes to the user-defined foreach sink (that is, the
ForeachWriter
code) is allowed, but the semantics of the change depends on the code.
Changes in projection / filter / map-like operations: Some cases are allowed. For example:
Addition / deletion of filters is allowed:
sdf.selectExpr("a")
tosdf.where(...).selectExpr("a").filter(...)
.Changes in projections with same output schema is allowed:
sdf.selectExpr("stringColumn AS json").writeStream
tosdf.select(to_json(...).as("json")).writeStream
.Changes in projections with different output schema are conditionally allowed:
sdf.selectExpr("a").writeStream
tosdf.selectExpr("b").writeStream
is allowed only if the output sink allows the schema change from"a"
to"b"
.
Changes in stateful operations: Some operations in streaming queries need to maintain state data in order to continuously update the result. Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, DBFS, GCS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts. This means that any changes (that is, additions, deletions, or schema modifications) to the stateful operations of a streaming query are not allowed between restarts. Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery:
Streaming aggregation: For example,
sdf.groupBy("a").agg(...)
. Any change in number or type of grouping keys or aggregates is not allowed.Streaming deduplication: For example,
sdf.dropDuplicates("a")
. Any change in number or type of grouping keys or aggregates is not allowed.Stream-stream join: For example,
sdf1.join(sdf2, ...)
(i.e. both inputs are generated withsparkSession.readStream
). Changes in the schema or equi-joining columns are not allowed. Changes in join type (outer or inner) not allowed. Other changes in the join condition are ill-defined.Arbitrary stateful operation: For example,
sdf.groupByKey(...).mapGroupsWithState(...)
orsdf.groupByKey(...).flatMapGroupsWithState(...)
. Any change to the schema of the user-defined state and the type of timeout is not allowed. Any change within the user-defined state-mapping function are allowed, but the semantic effect of the change depends on the user-defined logic. If you really want to support state schema changes, then you can explicitly encode/decode your complex state data structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you can change the Avro-state-schema between query restarts as this restores the binary state.