Aplique marcas d'água para controlar os limites de processamento de dados

Este artigo apresenta os conceitos básicos de marcas d'água e fornece recomendações para o uso de marcas d'água em operações comuns de transmissão de estado. Você deve aplicar marcas d'água a operações de transmissão de estado para evitar expandir infinitamente a quantidade de dados mantidos no estado, o que pode introduzir problemas de memória e aumentar as latências de processamento durante operações de transmissão de execução longa.

O que é uma marca d'água?

a transmissão estruturada usa marcas d'água para controlar o limite de quanto tempo continuar processando atualizações para uma determinada entidade de estado. Exemplos comuns de entidades estatais incluem:

  • Agregações em uma janela de tempo.

  • key única em um join entre duas transmissões.

Ao declarar uma marca d'água, você especifica um campo de carimbo de data/hora e um limite de marca d'água em um DataFrame transmitido. À medida que novos dados chegam, o gerenciador de estado rastreia o registro de data e hora mais recente no campo especificado e processa todos os registros dentro do limite de atraso.

O exemplo a seguir aplica um limite de marca d'água de 10 minutos a uma contagem em janela:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Neste exemplo:

  • A coluna event_time é usada para definir uma marca d'água de 10 minutos e uma janela em cascata de 5 minutos.

  • Uma contagem é coletada para cada id observado para cada janela de 5 minutos não sobreposta.

  • A informação de estado é mantida para cada contagem até que o final da janela seja 10 minutos mais antigo que o último event_time observado.

Importante

Os limites de marca d'água garantem que os registros que chegam dentro do limite especificado sejam processados de acordo com a semântica da query definida. Registros atrasados que chegam fora do limite especificado ainda podem ser processados usando métricas query , mas isso não é garantido.

Como as marcas d'água afetam o tempo de processamento e a taxa de transferência?

As marcas d'água interagem com os modos de saída para controlar quando os dados são gravados no coletor. Como as marcas d'água reduzem a quantidade total de informações de estado a serem processadas, o uso eficaz de marcas d'água é essencial para uma transmissão eficiente de taxa de download.

Observação

Nem todos os modos de saída são suportados para todas as operações com monitoramento de estado.

Marcas d'água e modo de saída para agregações em janelas

A tabela a seguir detalha o processamento para query com agregação em um carimbo de data/hora com uma marca d'água definida:

Modo de saída

Comportamento

Acrescentar

As linhas são gravadas na tabela de destino assim que o limite da marca d'água é ultrapassado. Todas as gravações são atrasadas com base no limite de atraso. O antigo estado de agregação é descartado assim que o limite é ultrapassado.

Atualizar

As linhas são gravadas na tabela de destino à medida que os resultados são calculados e podem ser atualizadas e substituídas à medida que novos dados chegam. O antigo estado de agregação é descartado assim que o limite é ultrapassado.

Completo

O estado de agregação não é descartado. A tabela de destino é reescrita com cada gatilho.

Marcas d'água e saída para junções de transmissão-transmissão

join entre transmissão múltipla suporta apenas o modo de anexação, e os registros correspondentes são gravados em cada lote que são descobertos. Para join interna, Databricks recomenda definir um limite de marca d'água em cada fonte de dados transmitida. Isso permite que informações de estado sejam descartadas para registros antigos. Sem marcas d'água, a transmissão estruturada tenta join todas as key de ambos os lados da join a cada gatilho.

transmissão estruturada tem semântica especial para suportar join externa. A marca d'água é obrigatória para join externa , pois indica quando uma key deve ser gravada com um valor nulo depois de ser incomparável. Observe que, embora join externa possa ser útil para gravar registros que nunca são correspondidos durante o processamento de dados, porque join grava apenas em tabelas como operações de acréscimo, esses dados ausentes não são registrados até que o limite de atraso tenha passado.

Controle o limite de dados atrasados com política de múltiplas marcas d'água na transmissão estruturada

Ao trabalhar com várias entradas estruturadas transmitidas, você pode definir várias marcas d'água para controlar limites de tolerância para dados atrasados. A configuração de marcas d'água permite controlar as informações de estado e afeta a latência.

Uma query transmitida pode ter várias transmissões de entrada que são unidas ou unidas. Cada uma das entradas transmitidas pode ter um limite diferente de dados atrasados que precisam ser tolerados para operações com estado. Especifique esses limites usando withWatermarks("eventTime", delay) em cada transmissão de entrada. Segue um exemplo query com joins transmissão-transmissão.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Ao executar a query, a transmissão estruturada rastreia individualmente o tempo máximo do evento visto em cada transmissão de entrada, calcula as marcas d'água com base no atraso correspondente e escolhe uma única marca d'água global com elas para ser usada para operações de estado. Por default, o mínimo é escolhido como marca d'água global porque garante que nenhum dado seja descartado acidentalmente como tarde demais se uma das transmissões ficar para trás das outras (por exemplo, uma das transmissões parar de receber dados devido a falhas no upstream). Em outras palavras, a marca d'água global se move com segurança no ritmo da transmissão mais lenta e a saída query é atrasada de acordo.

Se você deseja obter resultados mais rápidos, pode definir a política de várias marcas d'água para escolher o valor máximo como marca d'água global definindo a configuração SQL spark.sql.streaming.multipleWatermarkPolicy como max (default é min). Isso permite que a marca d'água global se mova no ritmo da transmissão mais rápida. No entanto, esta configuração descarta os dados da transmissão mais lenta. Por isso, Databricks recomenda que você use essa configuração criteriosamente.

Solte duplicatas dentro da marca d'água

No Databricks Runtime 13.1e acima, você pode desduplicar registros dentro de um limite de marca d'água usando um identificador exclusivo.

a transmissão estruturada fornece garantias de processamento exatamente uma vez, mas não desduplica automaticamente os registros da fonte de dados. Você pode usar dropDuplicatesWithinWatermark para desduplicar registros em qualquer campo especificado, permitindo remover duplicatas de uma transmissão mesmo que alguns campos sejam diferentes (como horário do evento ou horário de chegada).

Registros duplicados que chegam dentro da marca d'água especificada são descartados com certeza. Essa garantia é rigorosa em apenas uma direção e os registros duplicados que chegam fora do limite especificado também podem ser descartados. Você deve definir o limite de atraso da marca d'água maior que as diferenças máximas de carimbo de data/hora entre eventos duplicados para remover todas as duplicatas.

Você deve especificar uma marca d'água para usar o método dropDuplicatesWithinWatermark , como no exemplo a seguir:

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")