Configure RocksDB armazenamento do estado em Databricks

Você pode ativar o gerenciamento de estado baseado em RocksDB definindo a seguinte configuração no SparkSession antes de iniciar a query transmitida.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

O senhor pode ativar o RocksDB no pipeline Delta Live Tables. Consulte Otimizar a configuração do pipeline para processamento com estado.

Ativar ponto de verificação do changelog

Em Databricks Runtime 13.3 LTS e acima, o senhor pode ativar o checkpointing do changelog para reduzir a duração do checkpoint e a latência de ponta a ponta para cargas de trabalho de transmissão estruturada. Databricks recomenda habilitar o checkpointing do changelog para todas as consultas stateful de transmissão estruturada.

Tradicionalmente, o RocksDB armazena o estado Snapshot e upload arquivos de dados durante o checkpointing. Para evitar esse custo, o checkpoint do changelog grava apenas os registros que foram alterados desde o último checkpoint no armazenamento durável.”

O ponto de verificação do changelog está desabilitado por default. Você pode ativar o ponto de verificação do changelog no nível SparkSession usando a seguinte sintaxe:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Você pode ativar o checkpoint do changelog em uma transmissão existente e manter a informação de estado armazenada no checkpoint.

Importante

As consultas que habilitaram o checkpointing do changelog só podem ser executadas em Databricks Runtime 13.3 LTS e acima. O senhor pode desativar o checkpointing do changelog para reverter para o comportamento do checkpointing legado, mas deve continuar a executar essas consultas em Databricks Runtime 13.3 LTS ou acima. O senhor deve reiniciar o site Job para que essas alterações ocorram.

RocksDB armazenamento do estado de métricas

Cada operador de estado coleta métricas relacionadas às operações de gerenciamento de estado realizadas em sua instância do RocksDB para observar o armazenamento do estado e potencialmente ajudar na depuração da lentidão Job . Essas métricas são agregadas (soma) por operador de estado na Job em todas as tarefas em que o operador de estado está sendo executado. Essas métricas fazem parte do mapa customMetrics dentro dos campos stateOperators em StreamingQueryProgress. Veja a seguir um exemplo de StreamingQueryProgress no formato JSON (obtido usando StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

As descrições detalhadas das métricas são as seguintes:

nome da métrica

Descrição

rocksdbCommitWriteBatchLatência

O tempo (em milissegundos) levou para aplicar as gravações em estágio na estrutura da memória (WriteBatch) no RocksDB nativo.

rocksdbCommitFlushLatency

O tempo (em milissegundos) levou para liberar as alterações na memória do RocksDB para o disco local.

rocksdbCommitCompactLatency

Tempo (em milissegundos) levado para compactação (opcional) durante a commit do ponto de verificação.

rocksdbCommitPauseLatência

O tempo (em milissegundos) gastou para interromper os threads worker em segundo plano (para compactação, etc.) como parte da commit do ponto de verificação .

rocksdbCommitCheckpointLatency

O tempo (em milissegundos) levou para tirar um Snapshot do RocksDB nativo e gravá-lo em um diretório local.

rocksdbCommitFileSyncLatencyMs

O tempo (em milissegundos) levou para sincronizar os arquivos nativos relacionados Snapshot do RocksDB para um armazenamento externo (localização do ponto de verificação).

rocksdbGetLatency

Tempo médio (em nanos) gasto pela chamada RocksDB::Get nativa subjacente.

rocksdbPutCount

Tempo médio (em nanos) gasto pela chamada RocksDB::Put nativa subjacente.

rocksdbGetCount

Número de chamadas RocksDB::Get nativas (não inclui Gets de WriteBatch - em lotes de memória usados para gravações de preparação).

rocksdbPutCount

Número de chamadas RocksDB::Put nativas (não inclui Puts para WriteBatch - em lotes de memória usados para gravações de preparação).

rocksdbTotalBytesReadByGet

Número de bytes não compactados lidos por meio de chamadas RocksDB::Get nativas.

rocksdbTotalBytesWrittenByPut

Número de bytes não compactados gravados por meio de chamadas RocksDB::Put nativas.

rocksdbReadBlockCacheHitCount

Número de vezes que o cache de bloco RocksDB nativo é usado para evitar a leitura de dados do disco local.

rocksdbReadBlockCacheMissCount

Número de vezes que o cache de bloco RocksDB nativo perdeu e exigiu a leitura de dados do disco local.

rocksdbTotalBytesReadByCompaction

Número de bytes lidos do disco local pelo processo de compactação RocksDB nativo.

rocksdbTotalBytesWrittenByCompaction

Número de bytes gravados no disco local pelo processo de compactação RocksDB nativo.

rocksdbTotalCompactionLatencyMs

O tempo (em milissegundos) gastou para as compactações do RocksDB (em segundo plano e a compactação opcional iniciada durante o commit).

rocksdbWriterStallLatencyMs

Tempo (em milissegundos) que o gravador parou devido a uma compactação de fundo ou liberação das memtables para o disco.

rocksdbTotalBytesReadThroughIterator

Algumas das operações com estado (como processamento de tempo limite em flatMapGroupsWithState ou marca d'água em agregações em janela) requerem a leitura de dados inteiros no banco de dados por meio do iterador. O tamanho total dos dados não compactados lidos usando o iterador.