DatabricksでRocksDB状態ストアを構成する

RocksDB ベースの状態管理を有効にするには、ストリーミング クエリーを開始する前に SparkSession で次の構成を設定します。

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

RocksDB は、Delta Live Tables パイプラインで有効にできます。 「ステートフルな処理のためのパイプライン設定の最適化」を参照してください。

変更ログのチェックポイントを有効にする

Databricks Runtime 13.3 LTS 以降では、変更ログ チェックポイントを有効にして、構造化ストリーミング ワークロードのチェックポイント期間とエンドツーエンドの待機時間を短縮できます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して変更ログ チェックポイントを有効にすることをお勧めします。

従来、RocksDB 状態ストア スナップショットを作成し、チェックポイント処理中にデータ ファイルをアップロードします。 このコストを回避するために、changelog チェックポイントは、最後のチェックポイント以降に変更されたレコードのみを永続的なストレージに書き込みます。

変更ログのチェックポイントはデフォルトで無効になっています。 次の構文を使用して、SparkSession レベルで変更ログのチェックポイント処理を有効にすることができます。

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

既存のストリームで変更ログチェックポイントを有効にし、チェックポイントに格納されている状態情報を維持できます。

重要

変更ログ チェックポイントを有効にしたクエリは、Databricks Runtime 13.3 LTS 以降でのみ実行できます。 変更ログのチェックポイントを無効にして従来のチェックポイントの動作に戻すことはできますが、これらのクエリは引き続き Databricks Runtime 13.3 LTS 以降で実行する必要があります。 これらの変更を有効にするには、ジョブを再起動する必要があります。

RocksDB 状態ストア metrics

各状態演算子は、RocksDB インスタンスで実行される状態管理操作に関連するメトリクスを収集して状態ストアを監視し、ジョブの速度低下のデバッグに役立つ可能性があります。 これらのメトリクスは、状態演算子が実行されているすべてのタスクにわたって、ジョブ内の状態演算子ごとに集計 (合計) されます。 これらのメトリクスは、 StreamingQueryProgressstateOperators フィールド内の customMetrics マップの一部です。以下は、JSON 形式の StreamingQueryProgress の例です ( StreamingQueryProgress.json()を使用して取得)。

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

メトリクスの詳細な説明は次のとおりです。

メトリクス name

説明

rocksdbCommitWriteBatchLatency

メモリ内構造 (WriteBatch) のステージングされた書き込みをネイティブ RocksDB に適用するのにかかった時間 (ミリ単位)。

rocksdbCommitFlushLatency

RocksDBのメモリ内の変更をローカルディスクにフラッシュするのにかかった時間(ミリ単位)。

rocksdbCommitCompactLatency

チェックポイントのコミット中に圧縮 (オプション) にかかった時間 (ミリ秒単位)。

rocksdbCommitPauseLatency

チェックポイントコミットの一部としてバックグラウンドワーカースレッド(圧縮など)を停止するのにかかった時間(ミリ単位)。

rocksdbCommitCheckpointLatency

ネイティブRocksDBのスナップショットを取得してローカルディレクトリに書き込むのにかかった時間(ミリ単位)。

rocksdbCommitFileSyncLatencyMs

ネイティブのRocksDBスナップショット関連ファイルを外部ストレージ(チェックポイントの場所)に同期するのにかかった時間(ミリ単位)。

rocksdbGetLatency

基になるネイティブ RocksDB::Get 呼び出しごとにかかった平均時間 (ナノ単位)。

rocksdbPutCount

基になるネイティブ RocksDB::Put 呼び出しごとにかかった平均時間 (ナノ単位)。

rocksdbGetCount

ネイティブ RocksDB::Get 呼び出しの数 (書き込みのステージングに使用されるメモリ内のバッチである WriteBatch からの Gets は含まれません)。

rocksdbPutCount

ネイティブ RocksDB::Put 呼び出しの数 (書き込みのステージングに使用されるメモリ内のバッチである WriteBatch への Puts は含まれません)。

rocksdbTotalBytesReadByGet

ネイティブ RocksDB::Get 呼び出しによって読み取られた非圧縮バイト数。

rocksdbTotalBytesWriteByPut

ネイティブ RocksDB::Put 呼び出しによって書き込まれた非圧縮バイト数。

rocksdbReadBlockCacheHitCount

ローカルディスクからのデータの読み取りを回避するためにネイティブのRocksDBブロックキャッシュが使用される回数。

rocksdbReadBlockCacheMissCount

ネイティブのRocksDBブロックキャッシュが欠落し、ローカルディスクからデータを読み取る必要があった回数。

rocksdbTotalBytesReadByCompaction

ネイティブのRocksDBコンパクションプロセスによってローカルディスクから読み取られたバイト数。

rocksdbTotalBytesWriteByCompaction

ネイティブのRocksDB圧縮プロセスによってローカルディスクに書き込まれたバイト数。

rocksdbTotalCompactionLatencyMs

RocksDBの圧縮にかかった時間(ミリ単位)(バックグラウンドとコミット中に開始されたオプションの圧縮の両方)。

rocksdbWriterStallLatencyMs

バックグラウンドの圧縮またはメモリテーブルのディスクへのフラッシュのためにライターが停止した時間 (ミリ単位)。

rocksdbTotalBytesReadThroughIterator

一部のステートフル操作( flatMapGroupsWithState でのタイムアウト処理やウィンドウ集計での透かしなど)では、イテレータを介してDB内のデータ全体を読み取る必要があります。 反復子を使用して読み取られた圧縮されていないデータの合計サイズ。