O que é acompanhamento de progresso assíncrono?

Visualização

Este recurso está em visualização pública.

O acompanhamento de progresso assíncrono permite a transmissão estruturada de pipelines para verificar o progresso de forma assíncrona e paralela ao processamento de dados real dentro de um micro-lotes, reduzindo a latência associada à manutenção do offsetLog e commitLog.

Acompanhamento de progresso assíncrono

Observação

O acompanhamento de progresso assíncrono não funciona com gatilhos Trigger.once ou Trigger.availableNow . A tentativa de habilitar esse recurso com esses acionadores resulta em falha query .

Como funciona o acompanhamento de progresso assíncrono para reduzir a latência?

a transmissão estruturada depende da persistência e gerenciamento de compensações como indicadores de progresso para o processamento query . A operação de gerenciamento de compensação afeta diretamente a latência do processamento, porque nenhum processamento de dados pode ocorrer até que essas operações sejam concluídas. O acompanhamento de progresso assíncrono permite pipelines estruturados transmitidos para verificar o progresso sem ser afetado por essas operações de gerenciamento de compensação.

Quando você deve configurar a frequência do ponto de verificação?

Os usuários podem configurar a frequência com que o progresso é verificado. As configurações default para a frequência do ponto de verificação fornecem uma boa Taxa de transferência para a maioria query. A configuração da frequência é útil para cenários nos quais as operações de gerenciamento de compensação ocorrem em uma taxa mais alta do que podem ser processadas, o que cria um acúmulo cada vez maior de operações de gerenciamento de compensação. Para conter esse acúmulo crescente, o processamento de dados é bloqueado ou retardado, revertendo essencialmente o comportamento de processamento para eliminar os benefícios do acompanhamento de progresso assíncrono.

Observação

O tempo de recuperação de falhas aumenta com o aumento do tempo de intervalo do ponto de verificação. Em caso de falha, um pipeline deve reprocessar todos os dados antes do ponto de verificação anterior bem-sucedido. Os usuários podem considerar essa compensação entre menor latência durante o processamento regular e tempo de recuperação em caso de falha.

Quais configurações estão associadas ao acompanhamento de progresso assíncrono?

Opção

Valor

Padrão

Descrição

asyncProgressTrackingEnabled

verdadeiro falso

falso

ativar ou desativar acompanhamento de progresso assíncrono

asyncProgressTrackingCheckpointIntervalMs

milissegundos

1000

o intervalo em que commit compensações e commitconclusão

Como os usuários podem habilitar o acompanhamento assíncrono do progresso?

Os usuários podem usar um código semelhante ao código abaixo para habilitar esse recurso:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Desativando o acompanhamento de progresso assíncrono

Quando o acompanhamento de progresso assíncrono está ativado, o framework não verifica o progresso de cada lote. Para resolver isso, antes de desativar o acompanhamento de progresso assíncrono, processe pelo menos dois microlotes com as seguintes configurações:

  • .option("asyncProgressTrackingEnabled", "true")

  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Interrompa a query depois que pelo menos dois microlotes concluírem o processamento. Agora você pode desativar com segurança o acompanhamento de progresso assíncrono e reiniciar a query.

Se você desativou o acompanhamento de progresso assíncrono sem concluir esta passo, poderá encontrar o seguinte erro:

java.lang.IllegalStateException: batch x doesn't exist

Nos logs do driver, você pode ver o seguinte erro:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Seguir as instruções nesta seção para desabilitar o acompanhamento de progresso assíncrono permite que você resolva esses erros e repare sua carga de trabalho de transmissão.

Limitações com acompanhamento de progresso assíncrono

Este recurso tem as seguintes limitações:

  • O acompanhamento de progresso assíncrono só é suportado em pipelines sem estado ao usar Kafka como um coletor.

  • Exatamente uma vez o processamento ponta a ponta não é garantido com o acompanhamento assíncrono do andamento, pois as faixas de offset dos lotes podem ser alteradas em caso de falha. Alguns coletores, como Kafka, nunca fornecem garantias exatamente uma vez.