Subscribe to Google Pub/Sub

Databricks provides a built-in connector to subscribe to Google Pub/Sub in Databricks Runtime 13.3 LTS and above. This connector provides exactly-once processing semantics for records from the subscriber.

Note

Pub/Sub might publish duplicate records, and records might arrive to the subscriber out of order. You should write Databricks code to handle duplicate and out-of-order records.

Syntax example

If you have a Google Service Account with sufficient privileges attached to the cluster, you can use the following basic syntax for configuring a Structured Streaming read from Pub/Sub. See Google service account.

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "fe-demo-prod-dnd") // required
  .option("projectId", "fe-prod-dbx") // required
  .load()

You can also pass authorization options directly, as in the following example:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

For more configuration options, see Configure options for Pub/Sub streaming read.

Configure access to Pub/Sub

Databricks recommends using a Google Service Account (GSA) to manage connections to Pub/Sub.

When using a GSA, you do not need to provide additional authorization options to the stream directly.

Note

GSAs are not supported on compute configured with shared access mode.

Databricks recommends using secrets when providing authorization options. The following options are required to authorize a connection:

  • clientEmail

  • clientId

  • privateKey

  • privateKeyId

The following table describes the roles required for the configured credentials:

Roles

Required or optional

How it is used

roles/pubsub.viewer or roles/viewer

Required

Check if subscription exists and get subscription

roles/pubsub.subscriber

Required

Fetch data from a subscription

roles/pubsub.editor or roles/editor

Optional

Enables creation of a subscription if one doesn’t exist and also enables use of the deleteSubscriptionOnStreamStop to delete subscriptions on stream termination

Pub/Sub schema

The schema for the stream matches the records that are fetched from Pub/Sub, as described in the following table:

Field

Type

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Configure options for Pub/Sub streaming read

The following table describes the options supported for Pub/Sub. All options are configured as part of a Structured Streaming read using .option("<optionName>", "<optionValue>") syntax.

Note

Some Pub/Sub configuration options use the concept of fetches instead of micro-batches. This reflects internal implementation details, and options work similarly to corollaries in other Structured Streaming connectors, except that records are fetched and then processed.

Option

Default value

Description

numFetchPartitions

Set to one half of the number of executors present at stream initialization.

The number of parallel Spark tasks that fetch records from a subscription.

deleteSubscriptionOnStreamStop

false

If true, the subscription passed to the stream is deleted when the streaming job ends.

maxBytesPerTrigger

none

A soft limit for the batch size to be processed during each triggered micro-batch.

maxRecordsPerFetch

1000

The number of records to fetch per task before processing records.

maxFetchPeriod

10 seconds

The time duration for each task to fetch before processing records. Databricks recommends using the default value.

Incremental batch processing semantics for Pub/Sub

You can use Trigger.AvailableNow to consume available records from the Pub/Sub sources an an incremental batch.

Databricks records the timestamp when you begin a read with the Trigger.AvailableNow setting. Records processed by the batch include all previously fetched data and any newly published records with a timestamp less than the recorded stream start timestamp.

See Configuring incremental batch processing.

Monitoring streaming metrics

Structured Streaming progress metrics report the number of records fetched and ready to process, the size of the records fetched and ready to process, and the number of duplicates seen since stream start. The following is an example of these metrics:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitations

Speculative execution (spark.speculation) is not supported with Pub/Sub.