Streaming and incremental ingestion
Databricks uses Apache Spark Structured Streaming to back numerous product associated with ingestion workloads, including:
Auto Loader
COPY INTO
Delta Live Tables pipelines
Materialized views and streaming tables in Databricks SQL
This article discusses some of the differences between streaming and incremental batch processing semantics and provides a high-level overview of configuring ingestion workloads for the desired semantics in Databricks.
What is the difference between streaming and incremental batch ingestion?
Possible ingestion workflow configurations range from near real-time processing to infrequent incremental batch processing. Both patterns use Apache Spark Structured Streaming to power incremental processing, but have different semantics. For simplicity, this article refers to near real-time ingestion as streaming ingestion and more infrequent incremental processing as incremental batch ingestion.
Streaming ingestion
Streaming, in the context of data ingestion and table updates, refers to near real-time data processing where Databricks ingests records from source to sink in microbatches using always-on infrastructure. A streaming workload continuously ingests updates from configured data sources, unless a failure occurs that stops ingestion.
Incremental batch ingestion
Incremental batch ingestion refers to a pattern where all new records are processed from a data source in a short-lived job. Incremental batch ingestion often occurs according to a schedule, but it can also be triggered manually or based on file arrival.
Incremental batch ingestion differs from batch ingestion in that it automatically detects new records in the data source and ignores records that have already been ingested.
Ingestion with Jobs
Databricks Jobs enables you to orchestrate workflows and schedule tasks that include notebooks, libraries, Delta Live Tables pipelines, and Databricks SQL queries.
Note
You can use all Databricks compute types and task types to configure incremental batch ingestion. Streaming ingestion is only supported in production on classic jobs compute and Delta Live Tables.
Jobs have two primary modes of operation:
Continuous jobs automatically retry if they encounter a failure. This mode is intended for streaming ingestion.
Triggered jobs run tasks when triggered. Triggers include:
Time-based triggers that run jobs on a specified schedule.
File-based triggers that run jobs when files land in a specified location.
Other triggers such as REST API calls, execution of Databricks CLI commands, or clicking the Run now button in the workspace UI.
For incremental batch workloads, configure your jobs using the AvailableNow
trigger mode, as follows:
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
For streaming workloads, the default trigger interval is processingTime ="500ms"
. The following example shows how to process a micro-batch every 5 seconds:
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
Important
Serverless jobs do not support Scala, continuous mode, or time-based trigger intervals for Structured Streaming. Use classic jobs if you need near real-time ingestion semantics.
Ingestion with Delta Live Tables
Similar to Jobs, Delta Live Tables pipelines can run in either triggered or continuous mode. For near real-time streaming semantics with streaming tables, use continuous mode.
Use streaming tables to configure streaming or incremental batch ingestion from cloud object storage, Apache Kafka, Amazon Kinesis, Google Pub/Sub, or Apache Pulsar.
Ingestion with Databricks SQL
COPY INTO
provides familiar SQL syntax for incremental batch processing for data files in cloud object storage. COPY INTO
behavior is similar to patterns supported by streaming tables for cloud object storage, but not all default settings are equivalent for all supported file formats.