Processamento de transmissão com o Apache Kafka e o Databricks

Este artigo descreve como você pode usar o Apache Kafka como fonte ou coletor quando executar cargas de trabalho de Structured Streaming no Databricks.

Para mais informações sobre o Kafka, consulte a documentação do Kafka.

Leia os dados da Kafka

Veja a seguir um exemplo de leitura de transmissão do Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

O Databricks também oferece suporte à semântica de leitura em lote para fontes de dados do Kafka, conforme mostrado no exemplo a seguir:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Para carregamento incremental de lotes, a Databricks recomenda usar Kafka com Trigger.AvailableNow. Consulte Configurando o processamento de lotes incrementais.

No Databricks Runtime 13.1e acima, a Databricks fornece uma função SQL para ler dados do Kafka. A transmissão com SQL é compatível somente com o Delta Live Tables ou com tabelas de transmissão no Databricks SQL. Consulte função read_kafka table-valued.

Configurar o leitor de transmissão estruturada do Kafka

O Databricks fornece a palavra-chave kafka como formato de dados para configurar conexões com o Kafka 0.10+.

A seguir estão as configurações mais comuns para o Kafka:

Há várias maneiras de especificar quais tópicos assinar. Você deve fornecer somente um destes parâmetros:

Opção

Valor

Descrição

assinar

Uma lista de tópicos separados por vírgula.

A lista de tópicos para se inscrever.

subscribePattern

String Java regex.

O padrão usado para assinar tópicos.

atribuir

Sequência JSON {"topicA":[0,1],"topic":[2,4]}.

Tópico específico Partições a serem consumidas.

Outras configurações notáveis:

Opção

Valor

Valor padrão

Descrição

kafka.bootstrap.servers

Lista separada por vírgula de host:port.

vazio

[Obrigatório] A configuração do Kafka bootstrap.servers. Se você descobrir que não há dados do Kafka, verifique primeiro a lista de endereços do broker. Se a lista de endereços do despachante estiver incorreta, poderá não haver erros. Isso ocorre porque o cliente Kafka presume que os corretores acabarão ficando disponíveis em algum momento e, no caso de erros de rede, ficará tentando indefinidamente.

failOnDataLoss

true ou false.

true

[Opcional] Se a consulta deve falhar quando houver a possibilidade de os dados terem sido perdidos. As consultas podem falhar definitivamente na leitura de dados do Kafka devido a vários cenários, como tópicos excluídos, truncamento de tópicos antes do processamento e assim por diante. Tentamos estimar de forma conservadora se os dados foram possivelmente perdidos ou não. Às vezes, isso pode causar alarmes falsos. Defina essa opção como false se não funcionar como esperado ou se você quiser que a consulta continue sendo processada apesar da perda de dados.

minPartitions

Inteiro >= 0, 0 = desabilitado.

0 (desativado)

[Opcional] Número mínimo de partições para ler de Kafka. Você pode configurar o Spark para utilizar um mínimo arbitrário de partições para ler a partir do Kafka utilizando a opção minPartitions. Normalmente, o Spark tem um mapeamento 1-1 de Kafka topicPartitions para partições Spark consumindo do Kafka. Se você definir a opção minPartitions com um valor maior do que seu tipicPartitions do Kafka, o Spark dividirá partições grandes do Kafka em partes menores. Essa opção pode ser definida em momentos de pico de carga, distorção de dados e à medida que a transmissão ficar atrasada para aumentar a taxa de processamento. Há o custo de inicializar os consumidores do Kafka em cada gatilho, o que pode afetar o desempenho se você usar SSL na conexão com o Kafka.

kafka.group.id

Uma ID do grupo de consumidores Kafka.

não definido

[Opcional] ID do grupo a ser usado durante a leitura do Kafka. Use com cautela. Por padrão, cada consulta gera uma ID de grupo exclusiva para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores que não enfrente interferência de nenhum outro consumidor e, portanto, possa ler todas as partições de seus tópicos assinados. Em alguns cenários (por exemplo, autorização baseada em grupo Kafka), convém usar IDs de grupo autorizados específicos para ler dados. Opcionalmente, você pode definir a ID do grupo. No entanto, faça isso com extrema cautela, pois pode causar um comportamento inesperado.

  • As execuções simultâneas de consultas (em lote e em transmissão) com a mesma ID de grupo provavelmente interferem entre si, fazendo com que cada consulta leia apenas parte dos dados.

  • Isso também pode ocorrer quando as consultas são iniciadas/reiniciadas em rápida sucessão. Para minimizar esses problemas, defina a configuração de consumidor do Kafka session.timeout.ms de modo que seja muito pequena.

startingOffsets

earliest , latest

latest

[Opcional] O ponto inicial quando uma consulta é iniciada, seja "earliest", que é a partir dos primeiros deslocamentos, ou uma cadeia de caracteres json especificando um deslocamento inicial para cada TopicPartition. No json, -2 como deslocamento pode ser usado para fazer referência ao mais antigo, -1 ao mais recente. Observação: para consultas em lote, a opção latest (implicitamente ou com -1 em json) não é permitida. Para consultas de transmissão, isso só se aplica quando uma nova consulta for iniciada, e essa retomada sempre continuará de onde a consulta parou. As partições recém-descobertas durante uma consulta começarão o mais cedo possível.

Consulte o Guia de integração de transmissão estruturada do Kafka para obter outras configurações opcionais.

Esquema de registros do Kafka

O esquema dos registros do Kafka é:

Coluna

Tipo

chave

binário

valor

binário

tópico

string

partição

int

deslocamento

long

timestamp

long

timestampType

int

O key e o value são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer. Utilize operações do DataFrame (como cast("string")) para desserializar explicitamente as chaves e valores.

Gravar dados no Kafka

Veja a seguir um exemplo de uma gravação em transmissão para o Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

O Databricks também oferece suporte à semântica de gravação em lote nos coletores de dados do Kafka, conforme mostrado no exemplo a seguir:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Configurar o gravador de transmissão estruturada do Kafka

Importante

O Databricks Runtime 13.1 e o acima incluem uma versão mais recente da biblioteca kafka-clients que permite gravações idempotentes pelo site default. Se um sink do Kafka usar a versão 2.8.0 ou abaixo com ACLs configuradas, mas sem IDEMPOTENT_WRITE ativado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException:. Cannot execute transactional method because we are in an error state.

Para solucionar esse erro, atualize para a versão 2.8.0 ou superior do Kafka ou defina .option(“kafka.enable.idempotence”, “false”) ao configurar o gravador de transmissão estruturada.

O esquema fornecido ao DataStreamWriter interage com o sink do Kafka. Você pode usar os seguintes campos:

Nome da coluna

Obrigatório ou opcional

Tipo

key

opcional

STRING ou BINARY

value

obrigatório

STRING ou BINARY

headers

opcional

ARRAY

topic

Opcional (ignorado se topic estiver definido como opção de writer)

STRING

partition

opcional

INT

Veja a seguir as opções comuns definidas ao gravar no Kafka:

Opção

Valor

Valor padrão

Descrição

kafka.boostrap.servers

Uma lista delimitada por vírgulas de <host:port>

nenhum

[Obrigatório] A configuração do Kafka bootstrap.servers.

topic

STRING

não definido

[Opcional] Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico existentes nos dados.

includeHeaders

BOOLEAN

false

[Opcional] Se os cabeçalhos do Kafka devem ser incluídos na linha.

Consulte o Guia de integração de transmissão estruturada do Kafka para obter outras configurações opcionais.

Recuperar métricas de Kafka

Você pode obter a média, o mínimo e o máximo do número de deslocamentos em que a consulta de transmissão estiver atrás do último deslocamento disponível entre todos os tópicos inscritos com as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Consulte Leitura interativa de métricas.

Observação

Disponível no Databricks Runtime 9.1e acima.

Obtenha o número total estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos examinando o valor de estimatedTotalBytesBehindLatest. Essa estimativa baseia-se nos lotes que foram processados nos últimos 300 segundos. O período de tempo em que a estimativa se baseia pode ser alterado definindo-se a opção bytesEstimateWindowLength com um valor diferente. Por exemplo, para defini-lo com 10 minutos:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Use SSL para conectar o Databricks ao Kafka

Para ativar as conexões SSL com o Kafka, siga as instruções da documentação do Confluent Encryption and Authentication with SSL. Você pode fornecer as configurações descritas aqui, prefixadas com kafka., como opções. Por exemplo, você especifica o local do armazenamento de confiança na propriedade kafka.ssl.truststore.location.

A Databricks recomenda que você:

O exemplo a seguir usa locais de armazenamento de objetos e segredos de Databricks para habilitar uma conexão SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

autenticação de entidade de serviço com Microsoft Entra ID (anteriormente Azure Active Directory) e Azure Event Hubs

Databricks suporta a autenticação do serviço Spark Job com Event Hubs. Essa autenticação é feita via OAuth com Microsoft Entra ID (anteriormente Azure Active Directory).

Diagrama de autenticação AAD

Databricks dá suporte à autenticação Microsoft Entra ID com ID de cliente e segredo nos seguintes ambientes compute :

  • Databricks Runtime 12.2 LTS e acima em compute configurada com modo de acesso de usuário único.

  • Databricks Runtime 14.3 LTS e acima em compute configurada com modo de acesso compartilhado.

  • Pipeline Delta Live Tables configurado sem o Unity Catalog.

O Databricks não suporta a autenticação Microsoft Entra ID com um certificado em qualquer ambiente compute ou no pipeline Delta Live Tables configurado com o Unity Catalog.

Essa autenticação não funciona em clusters compartilhados ou em Unity Catalog Delta Live Tables.

Configurando o Conector Kafka de transmissão estruturada

Para realizar a autenticação com o Microsoft Entra ID, você precisará dos seguintes valores:

  • Um ID tenant . Você pode encontrar isso na Serviço Microsoft Entra ID tab.

  • Um clientID (também conhecido como ID do aplicativo).

  • Um segredo do cliente. Depois de fazer isso, você deve adicioná-lo como um segredo ao seu Databricks Workspace. Para adicionar este segredo, consulte Gerenciamento de segredos.

  • Um tópico do EventHubs. Você pode encontrar uma lista de tópicos na seção Event Hubs na seção Entidades em uma página específica do Event Hubs Namespace . Para trabalhar com vários tópicos, pode definir a IAM role ao nível dos Event Hubs.

  • Um servidor EventHubs. Você pode encontrar isso na página de visão geral do seu namespace específico do Event Hubs:

    Namespace dos Hubs de Eventos

Além disso, para usar o Entra ID, precisamos dizer ao Kafka para usar o mecanismo OAuth SASL (SASL é um protocolo genérico e OAuth é um tipo de “mecanismo” SASL):

  • kafka.security.protocol deveria estar SASL_SSL

  • kafka.sasl.mechanism deveria estar OAUTHBEARER

  • kafka.sasl.login.callback.handler.class deve ser um nome totalmente qualificado da classe Java com um valor kafkashaded para o manipulador de retorno de chamada de login de nossa classe Kafka sombreada. Veja o exemplo a seguir para a classe exata.

Exemplo

A seguir, vejamos um exemplo em execução:

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Lidando com possíveis erros

  • opções de transmissão não são suportadas.

    Se você tentar usar esse mecanismo de autenticação em um pipeline Delta Live Tables configurado com o Unity Catalog, poderá receber o seguinte erro:

    Erro de transmissão não suportado

    Para resolver esse erro, use uma configuração compute compatível. Consulte autenticação de entidade de serviço com Microsoft Entra ID (anteriormente Azure Active Directory) e Azure Event Hubs.

  • Falha ao criar um novo KafkaAdminClient.

    Este é um erro interno que Kafka lança se alguma das seguintes opções de autenticação estiver incorreta:

    • ID do cliente (também conhecido como ID do aplicativo)

    • ID tenant

    • Servidor EventHubs

    Para resolver o erro, verifique se os valores estão corretos para essas opções.

    Além disso, você poderá ver esse erro se modificar as opções de configuração fornecidas por default no exemplo (que foram solicitadas a não modificar), como kafka.security.protocol.

  • Não há registros sendo retornados

    Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, verá o seguinte na IU.

    Nenhuma mensagem de resultados

    Esta mensagem significa que a autenticação foi bem sucedida, mas os EventHubs não devolveram quaisquer dados. Algumas razões possíveis (embora de forma alguma exaustivas) são:

    • Você especificou o tópico EventHubs errado.

    • A opção de configuração default do Kafka para startingOffsets é latest e você ainda não está recebendo nenhum dado por meio do tópico. Você pode definir startingOffsetstoearliest para começar a ler os dados a partir dos primeiros deslocamentos do Kafka.