Assine o Google Pub/Sub
Databricks fornece um conector integrado para assinar o Google Pub/Sub em Databricks Runtime 13.3 LTS e acima. Esse conector fornece uma semântica de processamento exatamente única para registros do assinante.
Observação
O Pub/Sub pode publicar registros duplicados, e os registros podem chegar ao assinante fora de ordem. Você deve escrever código Databricks para lidar com registros duplicados e fora de ordem.
Exemplo de sintaxe
Se o senhor tiver um serviço do Google account com privilégios suficientes anexados ao cluster, poderá usar a seguinte sintaxe básica para configurar uma transmissão estruturada lida a partir do Pub/Sub. Consulte o serviço do Google account.
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "fe-demo-prod-dnd") // required
.option("projectId", "fe-prod-dbx") // required
.load()
Você também pode passar opções de autorização diretamente, como no exemplo a seguir:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Para obter mais opções de configuração, consulte Configurar opções para leitura de transmissão do Pub/Sub.
Configurar o acesso ao Pub/Sub
A Databricks recomenda usar uma account de serviço do Google (GSA) para gerenciar conexões com o Pub/Sub.
Ao usar um GSA, você não precisa fornecer opções de autorização adicionais diretamente para a transmissão.
Observação
Os GSAs não são compatíveis com compute configurada com modo de acesso compartilhado.
A Databricks recomenda a utilização de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:
clientEmail
clientId
privateKey
privateKeyId
A tabela a seguir descreve as funções necessárias para as credenciais configuradas:
Papéis |
Obrigatório ou opcional |
Como é usado |
---|---|---|
|
Obrigatório |
Verifique se existe inscrição e obtenha inscrição |
|
Obrigatório |
Buscar dados de uma inscrição |
|
Opcional |
Permite a criação de uma inscrição se ela não existir e também permite o uso de |
Esquema Pub/Sub
O esquema da transmissão corresponde aos registros buscados no Pub/Sub, conforme descrito na tabela a seguir:
campo |
Tipo |
---|---|
|
|
|
|
|
|
|
|
Configurar opções de leitura de transmissão do Pub/Sub
A tabela a seguir descreve as opções compatíveis com o Pub/Sub. Todas as opções são configuradas como parte de uma leitura de transmissão estruturada usando a sintaxe .option("<optionName>", "<optionValue>")
.
Observação
Algumas opções de configuração do Pub/Sub usam o conceito de buscas em vez de micro-lotes. Isto reflete detalhes de implementação interna, e as opções funcionam de forma semelhante aos corolários em outros conectores de transmissão estruturada, exceto que os registros são buscados e então processados.
Opção |
Valor padrão |
Descrição |
---|---|---|
|
Definido como metade do número de executores presentes na inicialização da transmissão. |
O número de tarefas paralelas do Spark que buscam registros de uma inscrição. |
|
|
Se |
|
nenhum |
Um limite flexível para o tamanho dos lotes a serem processados durante cada microlote acionado. |
|
1000 |
O número de registros a serem buscados por tarefa antes do processamento dos registros. |
|
10 segundos |
A duração de cada tarefa a ser buscada antes do processamento dos registros. Databricks recomenda usar o valor default . |
Semântica de processamento de lotes incrementais para Pub/Sub
Você pode usar Trigger.AvailableNow
para consumir registros disponíveis das fontes do Pub/Sub em lotes incrementais.
O Databricks registra o carimbo de data/hora quando você inicia uma leitura com a configuração Trigger.AvailableNow
. Os registros processados pelos lotes incluem todos os dados obtidos anteriormente e quaisquer registros recém-publicados com um carimbo de data/hora menor que o carimbo de data/hora da transmissão começar registrada.
Consulte Configurando o processamento de lotes incrementais.
métricas de monitoramento transmissão
As métricas de progresso da transmissão estruturada relatam o número de registros buscados e prontos para processamento, o tamanho dos registros buscados e prontos para processamento e o número de duplicatas vistas desde o início da transmissão. A seguir está um exemplo dessas métricas:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}