monitoramento transmitido query estruturada em Databricks

O Databricks fornece monitoramento integrado para aplicativos estruturados transmitidos por meio da Spark UI na guia transmissão .

Distinguir query estruturada transmitida na Spark UI

Forneça à sua transmissão um nome query exclusivo adicionando .queryName(<query-name>) ao seu código writeStream para distinguir facilmente quais métricas pertencem a qual transmissão na Spark UI.

Enviar métricas estruturadas de transmissão para serviços externos

As transmissões métricas podem ser enviadas para serviços externos para casos de uso de alertas ou painéis, usando a interface Query Listener de transmissão do site Apache Spark. Em Databricks Runtime 11.3 LTS e acima, o Query Listener de transmissão está disponível em Python e Scala.

Observação

A latência de processamento associada aos ouvintes pode afetar negativamente o processamento query . Databricks recomenda minimizar a lógica de processamento nesses ouvintes e gravar em coletores de baixa latência, como Kafka.

O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definindo métricas observáveis em transmissão estruturada

Métricas observáveis são funções agregadas nomeadas arbitrárias que podem ser definidas em uma query (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, finaliza uma query de lotes ou atinge uma época de transmissão), é emitido um evento nomeado que contém as métricas dos dados processados desde o último ponto de conclusão.

Você pode observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:

  • modo de lotes: Use QueryExecutionListener.

    QueryExecutionListener é chamado quando a query é concluída. Acesse as métricas usando o mapa QueryExecution.observedMetrics .

  • transmissão, ou micro-lotes: Use StreamingQueryListener.

    StreamingQueryListener é chamado quando a query transmitida completa uma época. Acesse as métricas usando o mapa StreamingQueryProgress.observedMetrics . Databricks não suporta transmissão de execução contínua.

Por exemplo:

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Análises do objeto StreamingQueryListener

Métrica

Descrição

id

ID query exclusivo que persiste durante as reinicializações. Consulte StreamingQuery.id().

runId

ID query exclusivo para cada início ou reinicialização. Consulte StreamingQuery.runId().

name

Nome da query especificado pelo usuário. Nulo se não for especificado.

timestamp

Carimbo temporal de execução dos microlotes.

batchId

ID exclusivo para os lotes atuais de dados que estão sendo processados. Observe que no caso de novas tentativas após falha, um determinado ID de lote pode ser executado mais de uma vez. Da mesma forma, quando não há dados a serem processados, o ID dos lotes não é incrementado.

numInputRows

Número agregado (em todas as fontes) de registros processados em um gatilho.

inputRowsPerSecond

Taxa agregada (em todas as fontes) de dados recebidos.

processedRowsPerSecond

Taxa agregada (em todas as fontes) na qual o Spark está processando dados.

objeto duraçãoMs

informação sobre o tempo necessário para completar as diversas etapas do processo de execução dos microlotes.

Métrica

Descrição

durationMs.addBatch

Tempo necessário para executar o microlote. Isso exclui o tempo que o Spark leva para planejar o microlote.

durationMs.getBatch

Tempo necessário para recuperar os metadados sobre as compensações da origem.

durationMs.latestOffset

Último deslocamento consumido para o microlote. Este objeto de progresso refere-se ao tempo necessário para recuperar o deslocamento mais recente das fontes.

durationMs.queryPlanning

Tempo necessário para gerar o plano de execução.

durationMs.triggerExecution

Tempo necessário para planejar e executar o microlote.

durationMs.walCommit

Tempo necessário para commit as novas compensações disponíveis.

objeto eventTime

informações sobre o valor do tempo do evento visualizado nos dados em processamento nos microlotes. Esses dados são usados pela marca d'água para descobrir como cortar o estado para processar agregações com estado definidas no Job de transmissão estruturada.

Métrica

Descrição

eventTime.avg

Tempo médio do evento visto no gatilho.

eventTime.max

Tempo máximo do evento visto no gatilho.

eventTime.min

Tempo mínimo do evento visto no gatilho.

eventTime.watermark

Valor da marca d’água utilizada no trigger.

objeto stateOperators

informações sobre as operações stateful que são definidas no Job de transmissão estruturada e as agregações que são produzidas a partir delas.

Métrica

Descrição

stateOperators.operatorName

Nome do operador com estado ao qual as métricas estão relacionadas. Por exemplo, symmetricHashJoin, dedupe, stateStoreSave.

stateOperators.numRowsTotal

Número de linhas no estado como resultado do operador ou agregação com estado.

stateOperators.numRowsUpdated

Número de linhas atualizadas no estado como resultado do operador ou agregação com estado.

stateOperators.numRowsRemoved

Número de linhas removidas do estado como resultado do operador ou agregação com estado.

stateOperators.commitTimeMs

Tempo necessário para commit todas as atualizações (colocações e remoções) e retornar uma nova versão.

stateOperators.memoryUsedBytes

Memória utilizada pelo armazenamento do estado.

stateOperators.numRowsDroppedByWatermark

Número de linhas que são consideradas tarde demais para serem incluídas na agregação com estado. somente agregações de transmissão: número de linhas eliminadas após a agregação, e não linhas de entrada brutas. O número não é preciso, mas pode indicar que dados atrasados estão sendo descartados.

stateOperators.numShufflePartitions

Número de partições aleatórias para este operador com estado.

stateOperators.numStateStoreInstances

Instância real de armazenamento do estado que o operador inicializou e manteve. Em muitos operadores stateful, este é o mesmo que o número de partições, mas transmissão-transmissão join inicializa quatro instâncias de armazenamento do estado por partição.

stateOperators.customMetrics objeto

informações coletadas do RocksDB que capturam métricas sobre seu desempenho e operações com relação aos valores stateful que mantém para a transmissão estruturada Job. Para obter mais informações, consulte Configurar o armazenamento do estado do RocksDB no Databricks.

Métrica

Descrição

customMetrics.rocksdbBytesCopied

Número de bytes copiados conforme rastreado pelo RocksDB File Manager.

customMetrics.rocksdbCommitCheckpointLatency

Tempo em milissegundos para tirar um Snapshot do RocksDB nativo e gravá-lo em um diretório local.

customMetrics.rocksdbCompactLatency

Tempo em milissegundos para compactação (opcional) durante o commit do ponto de verificação.

customMetrics.rocksdbCommitFileSyncLatencyMs

Tempo em milissegundos para sincronizar os arquivos nativos relacionados Snapshot do RocksDB com um armazenamento externo (local do ponto de verificação).

customMetrics.rocksdbCommitFlushLatency

Tempo em milissegundos para liberar as alterações na memória do RocksDB em seu disco local.

customMetrics.rocksdbCommitPauseLatency

Tempo em milissegundos para interromper os threads worker em segundo plano (por exemplo, para compactação) como parte do commit do ponto de verificação.

customMetrics.rocksdbCommitWriteBatchLatency

Tempo em milissegundos para aplicar as gravações preparadas na estrutura da memória (WriteBatch) ao RocksDB nativo.

customMetrics.rocksdbFilesCopied

Número de arquivos copiados rastreados pelo RocksDB File Manager.

customMetrics.rocksdbFilesReused

Número de arquivos reutilizados conforme rastreado pelo RocksDB File Manager.

customMetrics.rocksdbGetCount

Número de chamadas get para o banco de dados (isso não inclui gets de WriteBatch: lotes na memória usados para gravações temporárias).

customMetrics.rocksdbGetLatency

Tempo médio em nanossegundos para a chamada RocksDB::Get nativa subjacente.

customMetrics.rocksdbReadBlockCacheHitCount

Quanto do cache de blocos no RocksDB é útil ou não e evita leituras de disco local.

customMetrics.rocksdbReadBlockCacheMissCount

Quanto do cache de blocos no RocksDB é útil ou não e evita leituras de disco local.

customMetrics.rocksdbSstFileSize

Tamanho de todos os arquivos SST. SST significa Static Sorted Table, que é a estrutura tabular que o RocksDB usa para armazenar dados.

customMetrics.rocksdbTotalBytesRead

Número de bytes descompactados lidos por operações get .

customMetrics.rocksdbTotalBytesReadByCompaction

Número de bytes que o processo de compactação lê do disco.

customMetrics.rocksdbTotalBytesReadThroughIterator

Algumas das operações com estado (por exemplo, processamento de tempo limite em FlatMapGroupsWithState e marca d'água) exigem a leitura de dados no banco de dados por meio de um iterador. Estas métricas representam o tamanho dos dados não compactados lidos usando o iterador.

customMetrics.rocksdbTotalBytesWritten

Número de bytes descompactados gravados por operações put .

customMetrics.rocksdbTotalBytesWrittenByCompaction

Número de bytes que o processo de compactação grava no disco.

customMetrics.rocksdbTotalCompactionLatencyMs

Milissegundos de tempo para compactações RocksDB, incluindo compactações em segundo plano e a compactação opcional iniciada durante o commit.

customMetrics.rocksdbTotalFlushLatencyMs

Tempo de descarga, incluindo descarga de fundo. As operações de liberação são processos pelos quais o MemTable é liberado para armazenamento quando estiver cheio. MemTables são o primeiro nível onde os dados são armazenados no RocksDB.

customMetrics.rocksdbZipFileBytesUncompressed

RocksDB File Manager gerencia a utilização e exclusão do espaço em disco do arquivo SST físico. Estas métricas representam os arquivos zip descompactados em bytes conforme relatado pelo Gerenciador de Arquivos.

objeto de fontes (Kafka)

Métrica

Descrição

sources.description

Nome da fonte de onde a query de transmissão está lendo. Por exemplo, “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.

sources.startOffset objeto

Número de deslocamento inicial dentro do tópico Kafka em que o Job de transmissão começa.

sources.endOffset objeto

Último deslocamento processado pelo microlote. Isso pode ser igual a latestOffset para uma execução contínua de microlote.

sources.latestOffset objeto

Último deslocamento calculado pelo microlote. Quando há limitação, o processo de microlotes pode não processar todas as compensações, fazendo com que endOffset e latestOffset sejam diferentes.

sources.numInputRows

Número de linhas de entrada processadas desta origem.

sources.inputRowsPerSecond

Taxa na qual os dados chegam para processamento para esta origem.

sources.processedRowsPerSecond

Taxa na qual o Spark está processando dados para esta fonte.

objeto fontes.métricas (Kafka)

Métrica

Descrição

sources.metrics.avgOffsetsBehindLatest

Número médio de deslocamentos que a query de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

sources.metrics.estimatedTotalBytesBehindLatest

Número estimado de bytes que o processo query não consumiu dos tópicos inscritos.

sources.metrics.maxOffsetsBehindLatest

Número máximo de deslocamentos que a query de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

sources.metrics.minOffsetsBehindLatest

Número mínimo de deslocamentos que a query de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

objeto coletor (Kafka)

Métrica

Descrição

sink.description

Nome do coletor no qual a query de transmissão está sendo gravada. Por exemplo, “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.

sink.numOutputRows

Número de linhas que foram gravadas na tabela de saída ou coletor como parte do microlote. Para algumas situações, este valor pode ser “-1” e geralmente pode ser interpretado como “desconhecido”.

objeto de fontes (Delta Lake)

Métrica

Descrição

sources.description

Nome da fonte de onde a query de transmissão está lendo. Por exemplo, “DeltaSource[table]”.

sources.[startOffset/endOffset].sourceVersion

Versão da serialização com a qual este deslocamento é codificado.

sources.[startOffset/endOffset].reservoirId

ID da tabela da qual você está lendo. Isso é usado para detectar configurações incorretas ao reiniciar uma query.

sources.[startOffset/endOffset].reservoirVersion

Versão da tabela que você está processando atualmente.

sources.[startOffset/endOffset].index

Indexe na sequência de AddFiles nesta versão. Isso é usado para dividir grandes commit em vários lotes. Este índice é criado classificando modificationTimestamp e path.

sources.[startOffset/endOffset].isStartingVersion

Se este deslocamento denota uma query que está iniciando em vez de processar alterações. Ao iniciar uma nova query, todos os dados presentes na tabela no início são processados, e em seguida os novos dados que chegaram.

sources.latestOffset

Último deslocamento processado pela query de microlote.

sources.numInputRows

Número de linhas de entrada processadas desta origem.

sources.inputRowsPerSecond

Taxa na qual os dados chegam para processamento para esta origem.

sources.processedRowsPerSecond

Taxa na qual o Spark está processando dados para esta fonte.

sources.metrics.numBytesOutstanding

Tamanho dos arquivos pendentes (arquivos rastreados pelo RocksDB) combinados. Esta é a documentação do backlog para Delta e Auto Loader como fonte de transmissão.

sources.metrics.numFilesOutstanding

Número de arquivos pendentes a serem processados. Esta é a documentação do backlog para Delta e Auto Loader como fonte de transmissão.

objeto coletor (Delta Lake)

Métrica

Descrição

sink.description

Nome do coletor no qual a query de transmissão grava. Por exemplo, “DeltaSink[table]”.

sink.numOutputRows

O número de linhas nestas análises é “-1” porque o Spark não pode inferir linhas de saída para coletores DSv1, que é a classificação para o coletor Delta Lake.

Exemplos

Exemplo de evento StreamingQueryListener Kafka para Kafka

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exemplo de evento Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Exemplo de fonte de taxa para evento Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}