read_pubsub streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 13.3 LTS and above

Returns a table with records read from Pub/Sub from a topic. Only supports streaming queries.

Note

Streaming can only be used in Delta Live Tables.

Syntax

read_pubsub( { parameter => value } [, ...])

Arguments

read_pubsub requires named parameter invocation.

The only required arguments are subscriptionId, projectId, and topicId. All other arguments are optional.

For full argument descriptions, see Configure options for Pub/Sub streaming read.

Databricks recommends using secrets when providing authorization options. See secret function.

For details on configuring access to Pub/Sub, see Configure access to Pub/Sub.

Parameter

Type

Description

subscriptionId

STRING

Required, the unique identifier assigned to a Pub/Sub subscription.

projectId

STRING

Required, the Google Cloud project ID associated with the Pub/Sub topic.

topicId

STRING

Required, the ID or name of the Pub/Sub topic to subscribe to.

clientEmail

STRING

The email address associated with a service account for authentication.

clientId

STRING

The client ID associated with the service account for authentication.

privateKeyId

STRING

The ID of the private key associated with the service account.

privateKey

STRING

The private key associated with the service account for authentication.

These arguments are used for further fine-tuning when reading from Pub/Sub:

Parameter

Type

Description

numFetchPartitions

STRING

Optional with default number of executors. The number of parallel Spark tasks that fetch records from a subscription.

deleteSubscriptionOnStreamStop

BOOLEAN

Optional with default false. If set to true, the subscription passed to the stream is deleted when the streaming job ends.

maxBytesPerTrigger

STRING

A soft limit for the batch size to be processed during each triggered micro-batch. The default is ‘none’.

maxRecordsPerFetch

STRING

The number of records to fetch per task before processing records. The default is ‘1000’.

maxFetchPeriod

STRING

The time duration for each task to fetch before processing records. The default is ’10s’.

Returns

A table of Pub/Sub records with the following schema. The attributes column could be null but all other columns are not null.

Name

Data type

Nullable

Standard

Description

messageId

STRING

No

Unique identifier for the Pub/Sub message.

payload

BINARY

No

The content of the Pub/Sub message.

attributes

STRING

Yes

Key-value pairs representing the attributes of the Pub/Sub message. This is a json-encoded string.

publishTimestampInMillis

BIGINT

No

The timestamp when the message was published, in milliseconds.

sequenceNumber

BIGINT

No

The unique identifier of the record within its shard.

Examples

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => app-events-1234,
                projectId => app-events-project,
                topicId => app-events-topic,
                clientEmail => secret(app-events, clientEmail),
                clientId => secret(app-events, clientId),
        privateKeyId => secret(app-events, privateKeyId),
                privateKey => secret(app-events, privateKey)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => app-events-1234,
                projectId => app-events-project,
                topicId => app-events-topic
);

The data would now need to be queried from the testing.streaming_table for further analysis.

Erroneous queries:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => app-events-1234,
                projectId => app-events-project
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => app-events-1234,
                projectId => app-events-project,
                topicId => app-events-topic,
                maxRecordsPerFetchLimit => 1000001
);