Otimize queryestruturada transmitida com estado

O gerenciamento da informação de estado intermediário da query estruturada de transmissão de estado pode ajudar a evitar latência inesperada e problemas de produção.

Databricks recomenda:

  • Use instâncias otimizadas computecomo worker.

  • Defina o número de partições aleatórias para 1-2 vezes o número de núcleos nos clusters.

  • Defina a configuração spark.sql.streaming.noDataMicroBatches.enabled como false na SparkSession. Isso evita que o mecanismo de microlote transmitido processe microlotes que não contenham dados. Observe também que definir essa configuração como false pode resultar em operações com estado que aproveitam marcas d'água ou tempos limite de processamento para não obter a saída de dados até que novos dados cheguem em vez de imediatamente.

Databricks recomenda usar RocksDB com checkpoint de changelog para gerenciar o estado para transmissão com estado. Consulte Configure RocksDB armazenamento do estado em Databricks.

Observação

O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações query . Ou seja, se uma query foi iniciada com o gerenciamento default , ela não pode ser alterada sem iniciar a query do zero com um novo local de checkpoint.

Trabalhe com múltiplos operadores stateful em transmissão estruturada

No Databricks Runtime 13.1e acima, o Databricks oferece suporte avançado para operadores com estado em cargas de trabalho estruturadas de transmissão. Agora você pode encadear vários operadores com estado juntos, o que significa que você pode alimentar a saída de uma operação, como uma agregação de janela, para outra operação com estado, como uma join.

Os exemplos a seguir demonstram vários padrões que você pode usar.

Importante

As seguintes limitações existem ao trabalhar com vários operadores com estado:

  • FlatMapGroupWithState não é suportado.

  • Apenas o modo de saída de acréscimo é suportado.

Agregação de janela de tempo encadeada

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregação de janela de tempo em duas transmissões diferentes seguida de junção de janela de transmissão-transmissão

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")
val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

junção de intervalo de tempo de transmissão-transmissão seguida de agregação de janela de tempo

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()
val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rebalanceamento de estado para transmissão estruturada

O rebalanceamento de estado é habilitado por default para todas as cargas de trabalho de transmissão em Delta Live Tables. No Databricks Runtime 11.1e acima, você pode definir a seguinte opção de configuração na configuração clusters Spark para habilitar o reequilíbrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Rebalanceamento de estado beneficia pipelines de transmissão stateful estruturados que passam por eventos de redimensionamento clusters . As operações de transmissão sem estado não se beneficiam, independentemente da mudança de tamanho clusters .

Observação

compute O dimensionamento automático tem limitações ao reduzir o tamanho do cluster para cargas de trabalho de transmissão estruturada. Databricks recomenda o uso do site Delta Live Tables com autoscale aprimorado para cargas de trabalho de transmissão. Consulte Otimizar a utilização do pipeline cluster do Delta Live Tables com o Enhanced autoscale.

eventos de redimensionamento clusters fazem com que o rebalanceamento de estado seja acionado. Durante os eventos de rebalanceamento, os microlotes podem ter latência mais alta à medida que o estado é carregado do armazenamento cloud para os novos executores.

Especifique o estado inicial para mapGroupsWithState

Você pode especificar um estado inicial definido pelo usuário para processamento de estado estruturado da transmissão usando flatMapGroupsWithStateou mapGroupsWithState. Isso permite evitar o reprocessamento de dados ao iniciar uma transmissão com estado sem um ponto de verificação válido.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Exemplo de caso de uso que especifica um estado inicial para o operador flatMapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Exemplo de caso de uso que especifica um estado inicial para o operador mapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Teste a função de atualização mapGroupsWithState

A API TestGroupState permite que você teste a função de atualização de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...) e Dataset.groupByKey(...).flatMapGroupsWithState(...).

A função de atualização de estado usa o estado anterior como entrada usando um objeto do tipo GroupState. Consulte a documentação de referência do Apache Spark GroupState. Por exemplo:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}