Load and process data incrementally with Delta Live Tables flows

This article explains what flows are and how you can use flows in Delta Live Tables pipelines to incrementally process data from a source to a target streaming table. In Delta Live Tables, flows are defined in two ways:

  1. A flow is defined automatically when you create a query that updates a streaming table.

  2. Delta Live Tables also provides functionality to explicitly define flows for more complex processing such as appending to a streaming table from multiple streaming sources.

This article discusses the implicit flows that are created when you define a query to update a streaming table, and then provides details on the syntax to define more complex flows.

What is a flow?

In Delta Live Tables, a flow is a streaming query that processes source data incrementally to update a target streaming table. Most Delta Live Tables datasets you create in a pipeline define the flow as part of the query and do not require explicitly defining the flow. For example, you create a streaming table in Delta Live Tables in a single DDL command instead of using separate table and flow statements to create the streaming table:

Note

This CREATE FLOW example is provided for illustrative purposes only and includes keywords that are not valid Delta Live Tables syntax.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

In addition to the default flow defined by a query, the Delta Live Tables Python and SQL interfaces provide append flow functionality. Append flow supports processing that requires reading data from multiple streaming sources to update a single streaming table. For example, you can use append flow functionality when you have an existing streaming table and flow and want to add a new streaming source that writes to this existing streaming table.

Use append flow to write to a streaming table from multiple source streams

Note

To use append flow processing, your pipeline must be configured to use the preview channel.

Use the @append_flow decorator in the Python interface or the CREATE FLOW clause in the SQL interface to write to a streaming table from multiple streaming sources. Use append flow for processing tasks such as the following:

  • Add streaming sources that append data to an existing streaming table without requiring a full refresh. For example, you might have a table combining regional data from every region you operate in. As new regions are rolled out, you can add the new region data to the table without performing a full refresh. See Example: Write to a streaming table from multiple Kafka topics.

  • Update a streaming table by appending missing historical data (backfilling). For example, you have an existing streaming table that is written to by an Apache Kafka topic. You also have historical data stored in a table that you need inserted exactly once into the streaming table, and you cannot stream the data because your processing includes performing a complex aggregation before inserting the data. See Example: Run a one-time data backfill.

  • Combine data from multiple sources and write to a single streaming table instead of using the UNION clause in a query. Using append flow processing instead of UNION allows you to update the target table incrementally without running a full refresh update. See Example: Use append flow processing instead of UNION.

The target for the records output by the append flow processing can be an existing table or a new table. For Python queries, use the create_streaming_table() function to create a target table.

Important

  • If you need to define data quality constraints with expectations, define the expectations on the target table as part of the create_streaming_table() function or on an existing table definition. You cannot define expectations in the @append_flow definition.

  • Flows are identified by a flow name, and this name is used to identify streaming checkpoints. The use of the flow name to identify the checkpoint means the following:

    • If an existing flow in a pipeline is renamed, the checkpoint does not carry over, and the renamed flow is effectively an entirely new flow.

    • You cannot reuse a flow name in a pipeline, because the existing checkpoint won’t match the new flow definition.

The following is the syntax for @append_flow:

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Example: Write to a streaming table from multiple Kafka topics

The following examples creates a streaming table named kafka_target and writes to that streaming table from two Kafka topics:

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )
CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

To learn more about the read_kafka() table-valued function used in the SQL queries, see read_kafka in the SQL language reference.

Example: Run a one-time data backfill

The following examples run a query to append historical data to a streaming table:

Note

To ensure a true one-time backfill when the backfill query is part of a pipeline that runs on a scheduled basis or continuously, remove the query after running the pipeline once. To append new data if it arrives in the backfill directory, leave the query in place.

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Example: Use append flow processing instead of UNION

Instead of using a query with a UNION clause, you can use append flow queries to combine multiple sources and write to a single streaming table. Using append flow queries instead of UNION allows you to append to a streaming table from multiple sources without running a full refresh.

The following Python example includes a query that combines multiple data sources with a UNION clause:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

The following examples replace the UNION query with append flow queries:

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")
CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );