Ler e gravar dados Avro transmitidos

O Apache Avro é um sistema de serialização de dados comumente usado no mundo da transmissão. Uma solução típica é colocar dados no formato Avro no Apache Kafka, metadados no Confluent Schema Registry e, em seguida, executar query com uma estrutura de transmissão que se conecta ao Kafka e ao Schema Registry.

O Databricks dá suporte às funções from_avro e to_avro para criar pipelines de transmissão com dados Avro no Kafka e metadados no Schema Registry. A função to_avro codifica uma coluna como binário no formato Avro e from_avro decodifica dados binários Avro em uma coluna. Ambas as funções transformam uma coluna em outra coluna, e o tipo de dados SQL de entrada/saída pode ser um tipo complexo ou um tipo primitivo.

Observação

As funções from_avro e to_avro :

  • Estão disponíveis em Python, Scala e Java.

  • Pode ser passado para funções SQL tanto em lotes quanto query transmitidas.

Veja também o arquivo Avro fonte de dados.

Exemplo de esquema especificado manualmente

Semelhante a from_json e to_json, você pode usar from_avro e to_avro com qualquer coluna binária. Você pode especificar o esquema Avro manualmente, como no exemplo a seguir:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema exemplo

Você também pode especificar um esquema como strings JSON. Por exemplo, se /tmp/user.avsc for:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Você pode criar strings JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Em seguida, use o esquema em from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Exemplo com registro de esquema

Se seus clusters tiverem um serviço Schema Registry, from_avro poderá trabalhar com ele para que você não precise especificar o esquema Avro manualmente.

O exemplo a seguir demonstra a leitura de um tópico Kafka “t”, assumindo que a chave e o valor já estão registrados no Schema Registry como assuntos “t-key” e “t-value” dos tipos STRING e INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Para to_avro, o esquema Avro de saída default pode não corresponder ao esquema do assunto de destino no serviço Schema Registry pelos seguintes motivos:

  • O mapeamento do tipo Spark SQL para o esquema Avro não é um para um. Consulte Tipos suportados para Spark SQL -> conversão Avro.

  • Se o esquema Avro de saída convertido for do tipo de registro, o nome do registro será topLevelRecord e não haverá namespace por default.

Se o esquema de saída default de to_avro corresponder ao esquema do assunto de destino, você poderá fazer o seguinte:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Caso contrário, você deve fornecer o esquema do assunto de destino na função to_avro :

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Autenticar em um Registro de Esquema Confluente externo

Em Databricks Runtime 12.2 LTS e acima, o senhor pode se autenticar em um Confluent Schema Registry externo. Os exemplos a seguir demonstram como configurar as opções de registro do esquema para incluir credenciais de autenticação e a chave API.

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(col("key"), lit("t-key"), schema_registry_address, schema_registry_options).alias("key"),
    to_avro(col("value"), lit("t-value"), schema_registry_address, schema_registry_options, avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Usar arquivos truststore e keystore nos volumes do Unity Catalog

No Databricks Runtime 14.3 LTS e acima, o senhor pode usar arquivos truststore e keystore nos volumes do Unity Catalog para autenticar em um Confluent Schema Registry. Atualize a configuração do exemplo anterior usando a seguinte sintaxe:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Use o modo evolução do esquema com from_avro

No Databricks Runtime 14.2 e acima, você pode usar o modo de evolução do esquema com from_avro. A ativação do modo de evolução do esquema faz com que o Job gere um UnknownFieldException após detectar a evolução do esquema. A Databricks recomenda configurar o Job com o modo evolução do esquema para reiniciar automaticamente em caso de falha da tarefa. Consulte Configurar Job de transmissão estruturada para reiniciar consultas de transmissão em caso de falha.

A evolução do esquema é útil se você espera que o esquema dos seus dados de origem evolua ao longo do tempo e ingira todos os campos da sua fonte de dados. Se a sua query já especifica explicitamente quais campos query na sua fonte de dados, os campos adicionados serão ignorados independentemente da evolução do esquema.

Use a opção avroSchemaEvolutionMode para ativar a evolução do esquema. A tabela a seguir descreve as opções para o modo de evolução do esquema:

Opção

Comportamento

none

default. Ignora a evolução do esquema e o Job continua.

restart

Lança um UnknownFieldException ao detectar a evolução do esquema. Requer uma reinicialização Job .

Observação

Você pode alterar esta configuração entre Job de transmissão e reutilizar o mesmo checkpoint. Desabilitar a evolução do esquema pode resultar na eliminação de colunas.

Configurar o modo de análise

Você pode configurar o modo de análise para determinar se deseja falhar ou emitir registros nulos quando o modo de evolução do esquema estiver desabilitado e o esquema evoluir de uma forma não compatível com versões anteriores. Com configurações default , from_avro falha quando observa alterações de esquema incompatíveis.

Use a opção mode para especificar o modo de análise. A tabela a seguir descreve a opção do modo de análise:

Opção

Comportamento

FAILFAST

default. Um erro de análise gera um SparkException com um errorClass de MALFORMED_AVRO_MESSAGE.

PERMISSIVE

Um erro de análise é ignorado e um registro nulo é emitido.

Observação

Com a evolução do esquema ativada, FAILFAST só gera exceções se um registro estiver corrompido.

Exemplo usando evolução do esquema e configurando modo de análise

O exemplo a seguir demonstra a ativação da evolução do esquema e a especificação do modo de análise FAILFAST com um registro de esquema confluente:

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)