read_kafka
table-valued function
Applies to: Databricks SQL Databricks Runtime 13.3 LTS and above
Reads data from an Apache Kafka cluster and returns the data in tabular form.
Can read data from one or more Kafka topics. It supports both batch queries and streaming ingestion.
Note
Streaming can only be used in Delta Live Tables.
Arguments
This function requires named parameter invocation.
option_key
: The name of the option to configure. You must use backticks (`) for options that contain dots (.
).option_value
: A constant expression to set the option. Accepts literals and scalar functions.
Returns
Records read from an Apache Kafka cluster with the following schema:
key BINARY
: The key of the Kafka record.value BINARY NOT NULL
: The value of the Kafka record.topic STRING NOT NULL
: The name of the Kafka topic the record is read from.partition INT NOT NULL
: The ID of the Kafka partition the record is read from.offset BIGINT NOT NULL
: The offset number of the record in the KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: A timestamp value for the record. ThetimestampType
column defines what this timestamp corresponds to.timestampType INTEGER NOT NULL
: The type of the timestamp specified in thetimestamp
column.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Header values provided as part of the record (if enabled).
Examples
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events',
startingOffsets => 'earliest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
);
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Options
You can find a detailed list of options in the Apache Spark documentation.
Required options
Provide the option below for connecting to your Kafka cluster.
Option |
---|
Type: A comma-separated list of host/port pairs pointing to Kafka cluster. Default value: None |
Provide only one of the options below to configure which Kafka topics to pull data from.
Option |
---|
Type: A JSON string that contains the specific topic-partitions to consume from.
For example, for Default value: None |
Type: A comma-separated list of Kafka topics to read from. Default value: None |
Type: A regular expression matching topics to subscribe to. Default value: None |
Miscellaneous options
read_kafka
can be used in batch queries and in streaming queries. The options below specify which type of query they apply to.
Option |
---|
Type: The offsets to read until for a batch query, either Default value: |
Type: A JSON string specifying an ending timestamp to read until for each
TopicPartition. The timestamps need to be provided as a long value of the
timestamp in milliseconds since Default value: None |
Type: A string value of the timestamp in milliseconds since
Default value: None |
Type: Whether to include the Kafka headers in the row. Default value: |
Type: Any Kafka consumer specific options can be passed in with the Note: You should not set the following options with this function:
Default value: None |
Type: Rate limit on the maximum number of offsets or rows processed per trigger interval. The specified total number of offsets will be proportionally split across TopicPartitions. Default value: None |
Type: The start point when a query is started, either Note: For batch queries, latest (either implicitly or by using -1 in JSON) is not allowed. For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest. Default value: |
Type: A JSON string specifying a starting timestamp for each TopicPartition.
The timestamps need to be provided as a long value of the timestamp in
milliseconds since Note: For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest. Default value: None |
Type: This strategy is used when the specified starting offset by timestamp (either global or per partition) doesn’t match with the offset Kafka returned. The available strategies are:
Default value: |
Type: A string value of the timestamp in milliseconds since
Note: For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start earliest. Default value: None |
Note
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. The behavior varies across options if Kafka doesn’t return the matched offset - check the description of each option.
Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes
, and doesn’t interpret or reason about the value. For more details on KafkaConsumer.offsetsForTimes
, please refer to the documentation). Also, the meaning of timestamp here can vary according to the Kafka configuration (log.message.timestamp.type
). For details, see Apache Kafka documentation.