Buffers de protocolo de leitura e gravação
O Databricks oferece suporte nativo para serialização e desserialização entre structs do Apache Spark e buffers de protocolo (protobuf). Protobuf O suporte é implementado como um transformador Apache Spark DataFrame e pode ser usado com transmissão estruturada ou para lotes de operações.
Como desserializar e serializar buffers de protocolo
Em Databricks Runtime 12.2 LTS e acima, o senhor pode usar as funções from_protobuf
e to_protobuf
para serializar e desserializar dados. Protobuf A serialização é comumente usada em cargas de trabalho de transmissão.
A sintaxe básica das funções protobuf é semelhante para as funções de leitura e gravação. Você deve importar essas funções antes de usar.
from_protobuf
converte uma coluna binária em uma estrutura e to_protobuf
converte uma coluna de estrutura em binária. Você deve fornecer um registro de esquema especificado com o argumento options
ou um arquivo descritor identificado pelo argumento descFilePath
.
- Python
- Scala
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Os exemplos a seguir ilustram o processamento de registros protobuf binários com from_protobuf()
e a conversão da estrutura Spark SQL em protobuf binário com to_protobuf()
.
Use protobuf com o Confluent Schema Registry
O Databricks suporta o uso do Confluent Schema Registry para definir o Protobuf.
- Python
- Scala
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Autentique-se em um Confluent Schema Registry externo
Para autenticar em um Confluent Schema Registry externo, atualize as opções de registro de esquema para incluir credenciais de autenticação e a chave API.
- Python
- Scala
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Usar arquivos truststore e keystore nos volumes do Unity Catalog
Em Databricks Runtime 14.3 LTS e acima, o senhor pode usar arquivos truststore e keystore em volumes Unity Catalog para autenticar em um Confluent Schema Registry. Atualize suas opções de registro do esquema de acordo com o exemplo a seguir:
- Python
- Scala
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Usar o Protobuf com um arquivo descritor
O senhor também pode fazer referência a um arquivo descritor protobuf que esteja disponível para o seu clustering compute. Verifique se você tem as permissões adequadas para ler o arquivo, dependendo de sua localização.
- Python
- Scala
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Opções suportadas nas funções Protobuf
As seguintes opções são compatíveis com as funções Protobuf.
-
mode : Determina como os erros durante a desserialização dos registros Protobuf são tratados. Os erros podem ser causados por vários tipos de registros malformados, incluindo uma incompatibilidade entre o esquema real do registro e o esquema esperado fornecido em
from_protobuf()
.- Valores :
FAILFAST
(default): Um erro é lançado quando um registro malformado é encontrado e a tarefa falha.PERMISSIVE
: um NULL é retornado para registros malformados. Use essa opção com cuidado, pois ela pode resultar na perda de muitos registros. Isso é útil quando uma pequena fração dos registros na fonte está incorreta.
- Valores :
-
recursive.fields.max.depth : Adiciona suporte para campos recursivos. Os esquemas Spark SQL não oferecem suporte a campos recursivos. Quando essa opção não é especificada, campos recursivos não são permitidos. Para suportar campos recursivos em Protobufs, eles precisam ser expandidos para uma profundidade especificada.
-
Valores :
-
-1 (default): Campos recursivos não são permitidos.
-
0: Os campos recursivos são eliminados.
-
1: Permite um único nível de recursão.
-
[2-10]: Especifica um limite para recursão múltipla, até 10 níveis.
Definir um valor maior que 0 permite campos recursivos expandindo os campos aninhados até a profundidade configurada. Valores maiores que 10 não são permitidos para evitar a criação inadvertida de esquemas muito grandes. Se uma mensagem Protobuf tiver profundidade além do limite configurado, a estrutura Spark retornada será truncada após o limite de recursão.
-
-
Exemplo : Considere um Protobuf com o seguinte campo recursivo:
message Person { string name = 1; Person friend = 2; }
A seguir, listamos o esquema final com valores diferentes para essa configuração:
- Opção definida como 1:
STRUCT<name: STRING>
- Opção definida como 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Opção definida como 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Opção definida como 1:
-
-
converter.quaisquer.campos.para.JSON : Essa opção permite a conversão de campos Protobuf Any em JSON. Esse recurso deve ser ativado com cuidado. A conversão e o processamento de JSON são ineficientes. Além disso, o campo JSON strings perde a segurança do esquema Protobuf, tornando o processamento downstream propenso a erros.
-
Valores :
- False (default): Em tempo de execução, esses campos curinga podem conter mensagens Protobuf arbitrárias como dados binários. Em default, esses campos são tratados como uma mensagem normal de Protobuf. Ele tem dois campos com o esquema
(STRUCT<type_url: STRING, value: BINARY>)
. Em default, o campo bináriovalue
não é interpretado de forma alguma. Mas os dados binários podem não ser convenientes na prática para funcionar em alguns aplicativos. - True: a definição desse valor como True permite a conversão de campos
Any
para JSON strings em tempo de execução. Com essa opção, o binário é analisado e a mensagem Protobuf é desserializada em uma cadeia de caracteres JSON.
- False (default): Em tempo de execução, esses campos curinga podem conter mensagens Protobuf arbitrárias como dados binários. Em default, esses campos são tratados como uma mensagem normal de Protobuf. Ele tem dois campos com o esquema
-
Exemplo : Considere dois tipos de Protobuf definidos da seguinte forma:
message ProtoWithAny {
string event_name = 1;
google.protobuf.Any details = 2;
}
message Person {
string name = 1;
int32 id = 2;
}Com essa opção ativada, o esquema para
from_protobuf("col", messageName ="ProtoWithAny")
seria:STRUCT<event_name: STRING, details: STRING>
.Em tempo de execução, se o campo
details
contiver a mensagemPerson
Protobuf, o valor retornado terá a seguinte aparência:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
. -
Requisitos :
- As definições de todos os tipos possíveis de Protobuf usados nos campos
Any
devem estar disponíveis no arquivo descritor de Protobuf passado parafrom_protobuf()
. - Se o
Any
Protobuf não for encontrado, isso resultará em um erro para esse registro. - Atualmente, esse recurso não é compatível com o schema-registry.
- As definições de todos os tipos possíveis de Protobuf usados nos campos
-
-
emit.default.values : Permite a renderização de campos com valores zero ao desserializar o Protobuf para uma estrutura Spark. Essa opção deve ser usada com moderação. Geralmente, não é aconselhável depender de diferenças tão sutis na semântica.
-
Valores
- False (default): Quando um campo está vazio no Protobuf serializado, o campo resultante no Spark struct é default nulo. É mais simples não ativar essa opção e tratar
null
como o valor default. - True: quando essa opção está ativada, esses campos são preenchidos com os valores correspondentes em default.
- False (default): Quando um campo está vazio no Protobuf serializado, o campo resultante no Spark struct é default nulo. É mais simples não ativar essa opção e tratar
-
Exemplo : Considere o seguinte Protobuf com o Protobuf construído como
Person(age=0, middle_name="")
:syntax = "proto3";
message Person {
string name = 1;
int64 age = 2;
optional string middle_name = 3;
optional int64 salary = 4;
}- Com essa opção definida como False, o Spark struct após chamar
from_protobuf()
seria todo nulo:{"name": null, "age": null, "middle_name": "", "salary": null}
. Embora dois campos (age
emiddle_name
) tenham valores definidos, o site Protobuf não os inclui no formato wire, pois são valores default. - Com essa opção definida como True, a estrutura do Spark após chamar
from_protobuf()
seria:{"name": "", "age": 0, "middle_name": "", "salary": null}
. O camposalary
permanece nulo, pois está explicitamente declaradooptional
e não está definido no registro de entrada.
- Com essa opção definida como False, o Spark struct após chamar
-
-
enums.as.ints: Quando ativado, os campos enum no Protobuf são renderizados como campos inteiros no Spark.
-
Valores
- False (default)
- True: quando ativado, os campos enum no Protobuf são renderizados como campos inteiros no Spark.
-
Exemplo : Considere o seguinte Protobuf:
syntax = "proto3";
message Person {
enum Job {
NONE = 0;
ENGINEER = 1;
DOCTOR = 2;
NURSE = 3;
}
Job job = 1;
}Dada uma mensagem Protobuf como
Person(job = ENGINEER)
:- Com essa opção desativada, a estrutura Spark correspondente seria
{"job": "ENGINEER"}
. - Com essa opção ativada, a estrutura Spark correspondente seria
{"job": 1}
.
Observe que o esquema para esses campos é diferente em cada caso (inteiro em vez de default strings). Essa mudança pode afetar o esquema das tabelas downstream.
- Com essa opção desativada, a estrutura Spark correspondente seria
-
Opções de registro do esquema
As seguintes opções de registro de esquema são relevantes ao usar o registro de esquema com as funções Protobuf.
-
schema.registry.subject
- Obrigatório
- Especifica o assunto do esquema no Schema Registry, como “client-event”
-
schema.registry.address
- Obrigatório
- URL para registro do esquema, como
https://schema-registry.example.com:8081
-
schema.registry.protobuf.name
- Opcional
- Padrão:
<NONE>
. - Uma entrada de registro de esquema para um assunto pode conter várias definições de Protobuf, assim como um único arquivo
proto
. Quando essa opção não é especificada, o primeiro Protobuf é usado para o esquema. Especifique o nome da mensagem Protobuf quando ela não for a primeira na entrada. Por exemplo, considere uma entrada com duas definições Protobuf: "Person" e "Location", nessa ordem. Se a transmissão corresponder a "Local" em vez de "Pessoa", defina essa opção como "Local" (ou seu nome completo, incluindo o pacote "com.example.protos.Location").
-
schema.registry.evolução do esquema.mode
-
padrão: "restart".
-
Modos suportados:
- “reiniciar”
- “nenhum”
-
Essa opção define o modo de evolução do esquema para
from_protobuf()
. No início de uma consulta, o site Spark registra o último schema-id para o assunto fornecido. Isso determina o esquema parafrom_protobuf()
. Um novo esquema pode ser publicado no registro de esquemas após a consulta começar. Quando um novo ID de esquema é detectado em um registro de entrada, isso indica uma alteração no esquema. Essa opção determina como essa alteração no esquema é tratada:- restart (default): Aciona um
UnknownFieldException
quando um schema-id mais recente é notado. Isso encerra a consulta. Databricks recomenda configurar o Job para reiniciar em caso de falha na consulta, a fim de captar as alterações no esquema. - none : as alterações no ID do esquema são ignoradas. Os registros com schema-id mais recente são analisados com o mesmo esquema que foi observado no início da consulta. Espera-se que as definições mais recentes do Protobuf sejam compatíveis com as versões anteriores, e os novos campos são ignorados.
- restart (default): Aciona um
-
-
confluent.schema.registry.
<schema-registy-client-option>
- Opcional
- O registro de esquemas se conecta ao registro de esquemas do Confluent usando o cliente do Confluent Schema Registry. Qualquer opção de configuração suportada pelo cliente pode ser especificada com o prefixo “confluent.schema.registry”. Por exemplo, as duas configurações a seguir fornecem credenciais de autenticação “USER_INFO”:
- “confluent.schema.registry.basic.auth.credentials.source”: 'INFORMAÇÕES DO USUÁRIO'
- “confluent.schema.registry.basic.auth.user.info”: “
<KEY>
:<SECRET>
”