Process streaming data with Delta Live Tables

Preview

This feature is in Public Preview.

Many applications require that tables be updated based on continually arriving data. However, as data sizes grow, the resources required to reprocess data with each update can become prohibitive. You can define a streaming table or view to incrementally compute continually arriving data. Streaming tables and views reduce the cost of ingesting new data and the latency at which new data is made available.

When an update is triggered for a pipeline, a streaming table or view processes only new data that has arrived since the last update. Data already processed is automatically tracked by the Delta Live Tables runtime.

Streaming ingestion from external data sources

To ingest streaming data, you must define a streaming live table from a streaming source; for example, you can read external data as a stream with the following code:

inputPath = "/databricks-datasets/structured-streaming/events/"

@dlt.table
def streaming_bronze_table():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(inputPath)
  )
CREATE OR REFRESH STREAMING LIVE TABLE streaming_bronze_table
AS SELECT * FROM cloud_files("/databricks-datasets/structured-streaming/events/", "json")

Streaming from other datasets within a pipeline

You can also stream data from other tables in the same pipeline.

@dlt.table
def streaming_silver_table():
  return dlt.read_stream("streaming_bronze_table").where(...)
CREATE OR REFRESH STREAMING LIVE TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.streaming_bronze_table)
WHERE ...

Process streaming and batch data in a single pipeline

Because streaming live tables use What is Apache Spark Structured Streaming?, a streaming live table can only process append queries; that is, queries where new rows are inserted into the source table. Processing updates from source tables, for example, merges and deletes, is not supported. To process updates, see the APPLY CHANGES INTO command.

A common streaming pattern includes the ingestion of source data to create the initial datasets in a pipeline. These initial datasets are commonly called bronze tables, and often perform simple transformations. Reprocessing inefficient formats like JSON can be prohibitive with these simple transformations, and are a perfect fit for streaming live tables.

By contrast, the final tables in a pipeline, commonly referred to as gold tables, often require complicated aggregations or read from sources that are the targets of an APPLY CHANGES INTO operation. Because these operations inherently create updates rather than appends, they are not supported as inputs to streaming live tables. These transformations are better suited for materialization as a live table.

By mixing streaming live tables and live tables into a single pipeline, you can simplify your pipeline and avoid costly re-ingestion or re-processing of raw data and have the full power of SQL to compute complex aggregations over an efficiently encoded and filtered dataset. The following example illustrates this type of mixed processing:

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("gs://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING LIVE TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "gs://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING LIVE TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Learn more about using Auto Loader to efficiently read JSON files from Google Cloud Storage for incremental processing.

Streaming joins

Delta Live Tables supports various join strategies for updating tables.

Stream-batch joins

Stream-batch joins are a good choice when denormalizing a continuous stream of append-only data with a primarily static dimension table. Each time the derived dataset is updated, new records from the stream are joined with a static snapshot of the batch table when the update started. Records added or updated in the static table are not reflected in the table until a full refresh is performed.

The following are examples of stream-batch joins:

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(read("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING LIVE TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

In continuous pipelines, the batch side of the join is regularly polled for updates with each micro-batch.

Streaming aggregation

Simple distributive aggregates like count, min, max, or sum, and algebraic aggregates like average or standard deviation can also be calculated incrementally with streaming live tables. Databricks recommends incremental aggregation for queries with a limited number of groups, for example, a query with a GROUP BY country clause. Only new input data is read with each update.