read_pulsar streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 and above

Preview

This feature is in Public Preview.

Returns a table with records read from Pulsar.

This table-valued function only supports streaming and not batch query.

Syntax

read_pulsar ( { option_key => option_value } [, ...] )

Arguments

This function requires named parameter invocation for the option keys.

The options serviceUrl and topic are mandatory.

The descriptions of the arguments are brief here. See structured streaming Pulsar documentation for extended descriptions.

Option

Type

Default

Description

serviceUrl

STRING

Mandatory

The URI of the Pulsar service.

topic

STRING

Mandatory

The topic to read from.

predefinedSubscription

STRING

None

The predefined subscription name used by the connector to track spark application progress.

subscriptionPrefix

STRING

None

A prefix used by the connector to generate a random subscription to track spark application progress.

pollTimeoutMs

LONG

120000

The timeout for reading messages from Pulsar in milliseconds.

failOnDataLoss

BOOLEAN

true

Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).

startingOffsets

STRING

latest

The start point when a query is started, either earliest, latest, or a JSON string that specifies a specific offset. If latest, the reader reads the newest records after it starts running. If earliest, the reader reads from the earliest offset. The user can also specify a JSON string that specifies a specific offset.

startingTime

STRING

None

When specified, Pulsar source will read messages starting from the position of the specified startingTime.

The following arguments are used for authentication of the pulsar client:

Option

Type

Default

Description

pulsarClientAuthPluginClassName

STRING

None

Name of the authentication plugin.

pulsarClientAuthParams

STRING

None

Parameters for the authentication plugin.

pulsarClientUseKeyStoreTls

STRING

None

Whether to use KeyStore for tls authentication.

pulsarClientTlsTrustStoreType

STRING

None

TrustStore file type for tls authentication.

pulsarClientTlsTrustStorePath

STRING

None

TrustStore file path for tls authentication.

pulsarClientTlsTrustStorePassword

STRING

None

TrustStore password for tls authentication.

These arguments are used for configuration and authentication of pulsar admission control, pulsar admin configuration is only required when admission control is enabled(when maxBytesPerTrigger is set)

Option

Type

Default

Description

maxBytesPerTrigger

BIGINT

None

A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, admin.url also needs to be specified.

adminUrl

STRING

None

The Pulsar serviceHttpUrl configuration. Only needed when maxBytesPerTrigger is specified.

pulsarAdminAuthPlugin

STRING

None

Name of the authentication plugin.

pulsarAdminAuthParams

STRING

None

Parameters for the authentication plugin.

pulsarClientUseKeyStoreTls

STRING

None

Whether to use KeyStore for tls authentication.

pulsarAdminTlsTrustStoreType

STRING

None

TrustStore file type for tls authentication.

pulsarAdminTlsTrustStorePath

STRING

None

TrustStore file path for tls authentication.

pulsarAdminTlsTrustStorePassword

STRING

None

TrustStore password for tls authentication.

Returns

A table of pulsar records with the following schema.

  • __key STRING NOT NULL: Pulsar message key.

  • value BINARY NOT NULL: Pulsar message value.

    Note: For topics with Avro or JSON schema, instead of loading content into a binary value field, the content will be expanded to preserve the field names and field types of the Pulsar topic.

  • __topic STRING NOT NULL: Pulsar topic name.

  • __messageId BINARY NOT NULL: Pulsar message id.

  • __publishTime TIMESTAMP NOT NULL: Pulsar message publish time.

  • __eventTime TIMESTAMP NOT NULL: Pulsar message event time.

  • __messageProperties MAP<STRING, STRING>: Pulsar message properties.

Examples

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.