CREATE STREAMING TABLE

Applies to: check marked yes Databricks Runtime 13.1 and above

Creates a streaming table, a Delta table with extra support for streaming or incremental data processing.

Streaming tables are only supported in Delta Live Tables. Running this command on supported Databricks Runtime compute only parses the syntax. See Implement a Delta Live Tables pipeline with SQL.

Syntax

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause } [...]

Parameters

  • REFRESH

    If specified, refreshes the table with the latest data available from the sources defined in the query. Only new data that arrives before the query starts is processed. New data that gets added to the sources during the execution of the command is ignored until the next refresh.

  • IF NOT EXISTS

    If specified and a table with the same name already exists, the statement is ignored.

    IF NOT EXISTS cannot be used together with REFRESH, which means CREATE OR REFRESH TABLE IF NOT EXISTS is not allowed.

  • table_name

    The name of the table to be created. The name must not include a temporal specification. If the name is not qualified the table is created in the current schema.

  • table_specification

    This optional clause defines the list of columns, their types, properties, descriptions, and column constraints.

    If you do not define columns in the table schema you must specify AS query.

    • column_identifier

      A unique name for the column.

      • column_type

        Specifies the data type of the column.

      • NOT NULL

        If specified the column does not accept NULL values.

      • COMMENT column_comment

        A string literal to describe the column.

      • column_constraint

        Preview

        This feature is in Public Preview.

        Adds a primary key or foreign key constraint to the column in a streaming table. Constraints are not supported for tables in the hive_metastore catalog.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Adds data quality expectations to the table. These data quality expectations can be tracked over time and accessed through the streaming table’s event log. A FAIL UPDATE expectation causes the processing to fail when both creating the table as well as refreshing the table. A DROP ROW expectation causes the entire row to be dropped if the expectation is not met.

        expectation_expr may be composed of literals, column identifiers within the table, and deterministic, built-in SQL functions or operators except:

        Also expr must not contain any subquery.

      • table_constraint

        Preview

        This feature is in Public Preview.

        Adds an informational primary key or informational foreign key constraints to a streaming table. Key constraints are not supported for tables in the hive_metastore catalog.

  • table_clauses

    Optionally specify partitioning, comments, user defined properties, and a refresh schedule for the new table. Each sub clause may only be specified once.

    • PARTITIONED BY

      An optional list of columns of the table to partition the table by.

    • COMMENT table_comment

      A STRING literal to describe the table.

    • TBLPROPERTIES

      Optionally sets one or more user defined properties.

  • AS query

    This clause populates the table using the data from query. This query must be a streaming query. This can be achieved by adding the STREAM keyword to any relation you want to process incrementally. When you specify a query and a table_specification together, the table schema specified in table_specification must contain all the columns returned by the query, otherwise you get an error. Any columns specified in table_specification but not returned by query return null values when queried.

    This clause is optional in Delta Live Tables. If this clause is not provided in Delta Live Tables, you must reference this table in an APPLY CHANGES command in your DLT pipeline. See Change data capture with SQL in Delta Live Tables.

Differences between streaming tables and other tables

Streaming tables are stateful tables, designed to handle each row only once as you process a growing dataset. Because most datasets grow continuously over time, streaming tables are good for most ingestion workloads. Streaming tables are optimal for pipelines that require data freshness and low latency. Streaming tables can also be useful for massive scale transformations, as results can be incrementally calculated as new data arrives, keeping results up to date without needing to fully recompute all source data with each update. Streaming tables are designed for data sources that are append-only.

Streaming tables accept additional commands such as REFRESH, which processes the latest data available in the sources provided in the query. Changes to the provided query only get reflected on new data by calling a REFRESH, not previously processed data. To apply the changes on existing data as well, you need to execute REFRESH TABLE <table_name> FULL to perform a FULL REFRESH. Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, as the full refresh truncates the existing data. You may not be able to recover old data if the data is no longer available in the source.

Limitations

  • Only table owners can refresh streaming tables to get the latest data.

  • ALTER TABLE commands are disallowed on streaming tables.

  • Time travel queries are not supported.

  • Evolving the table schema through DML commands like INSERT INTO, and MERGE is not supported.

  • The following commands are not supported on streaming tables:

    • CREATE TABLE ... CLONE <streaming_table>

    • COPY INTO

    • ANALYZE TABLE

    • RESTORE

    • TRUNCATE

    • GENERATE MANIFEST

    • [CREATE OR] REPLACE TABLE

  • Delta Sharing is not supported.

  • Renaming the table or changing the owner is not supported.

  • Table constraints such as PRIMARY KEY and FOREIGN KEY are not supported.

  • Generated columns, identity columns, and default columns are not supported.

Examples

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');