Stream processing with Apache Kafka and Databricks

This article describes how you can use Apache Kafka as either a source or a sink when running Structured Streaming workloads on Databricks.

For more Kafka, see the Kafka documentation.

Read data from Kafka

The following is an example for reading data from Kafka:

df = (spark.readStream
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")

Write data to Kafka

The following is an example for writing data to Kafka:

  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .option("checkpointLocation", "<checkpoint_path>")

Configure Kafka for Apache Spark on Databricks

Databricks provides the kafka keyword as a data format to configure connections to Kafka 0.10+.

The following are the most common configurations for Kafka:

There are multiple ways of specifying which topics to subscribe to. You should provide only one of these parameters:





A comma-separated list of topics.

The topic list to subscribe to.


Java regex string.

The pattern used to subscribe to topic(s).


JSON string {"topicA":[0,1],"topic":[2,4]}.

Specific topicPartitions to consume.

Other notable configurations:



Default Value



Comma-separated list of host:port.


[Required] The Kafka bootstrap.servers configuration. If you find there is no data from Kafka, check the broker address list first. If the broker address list is incorrect, there might not be any errors. This is because Kafka client assumes the brokers will become available eventually and in the event of network errors retry forever.


true or false.


[Optional] Whether to fail the query when it’s possible that data was lost. Queries can permanently fail to read data from Kafka due to many scenarios such as deleted topics, topic truncation before processing, and so on. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can cause false alarms. Set this option to false if it does not work as expected, or you want the query to continue processing despite data loss.


Integer >= 0, 0 = disabled.

0 (disabled)

[Optional] Minimum number of partitions to read from Kafka. You can configure Spark to use an arbitrary minimum of partitions to read from Kafka using the minPartitions option. Normally Spark has a 1-1 mapping of Kafka topicPartitions to Spark partitions consuming from Kafka. If you set the minPartitions option to a value greater than your Kafka topicPartitions, Spark will divvy up large Kafka partitions to smaller pieces. This option can be set at times of peak loads, data skew, and as your stream is falling behind to increase processing rate. It comes at a cost of initializing Kafka consumers at each trigger, which may impact performance if you use SSL when connecting to Kafka.

A Kafka consumer group ID.

not set

[Optional] Group ID to use while reading from Kafka. Use this with caution. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use specific authorized group IDs to read data. You can optionally set the group ID. However, do this with extreme caution as it can cause unexpected behavior.

  • Concurrently running queries (both, batch and streaming) with the same group ID are likely interfere with each other causing each query to read only part of the data.

  • This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer configuration to be very small.


earliest , latest


[Optional] The start point when a query is started, either “earliest” which is from the earliest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

See Structured Streaming Kafka Integration Guide for other optional configurations.

Schema for Kafka records

The schema of Kafka records is:

















The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (such as cast("string")) to explicitly deserialize the keys and values.

Retrieve Kafka metrics


Available in Databricks Runtime 8.1 and above.

You can get the average, min, and max of the number of offsets that the streaming query is behind the latest available offset among all the subscribed topics with the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics. See Reading Metrics Interactively.


Available in Databricks Runtime 9.1 and above.

Get the estimated total number of bytes that the query process has not consumed from the subscribed topics by examining the value of estimatedTotalBytesBehindLatest. This estimate is based on the batches that were processed in the last 300 seconds. The timeframe that the estimate is based on can be changed by setting the option bytesEstimateWindowLength to a different value. For example, to set it to 10 minutes:

df = (spark.readStream
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds

If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
  } ]

Use SSL to connect Databricks to Kafka

To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka., as options. For example, you specify the trust store location in the property kafka.ssl.truststore.location.

Databricks recommends that you:

  • Store your certificates in cloud object storage. You can restrict access to the certificates only to clusters that can access Kafka. See Data governance guide.

  • Store your certificate passwords as secrets in a secret scope.

The following example uses object storage locations and Databricks secrets to enable an SSL connection:

df = (spark.readStream
  .option("kafka.bootstrap.servers", ...)
  .option("", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))