DatabricksでRocksDB状態ストアを構成する
RocksDB ベースの状態管理を有効にするには、ストリーミング クエリーを開始する前に SparkSession で次の構成を設定します。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
RocksDB は、Delta Live Tables パイプラインで有効にできます。 「ステートフルな処理のためのパイプライン設定の最適化」を参照してください。
変更ログのチェックポイントを有効にする
Databricks Runtime 13.3 LTS 以降では、変更ログ チェックポイントを有効にして、構造化ストリーミング ワークロードのチェックポイント期間とエンドツーエンドの待機時間を短縮できます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して変更ログ チェックポイントを有効にすることをお勧めします。
従来、RocksDB 状態ストア スナップショットを作成し、チェックポイント処理中にデータ ファイルをアップロードします。 このコストを回避するために、changelog チェックポイントは、最後のチェックポイント以降に変更されたレコードのみを永続的なストレージに書き込みます。
変更ログのチェックポイントはデフォルトで無効になっています。 次の構文を使用して、SparkSession レベルで変更ログのチェックポイント処理を有効にすることができます。
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
既存のストリームで変更ログチェックポイントを有効にし、チェックポイントに格納されている状態情報を維持できます。
重要
変更ログ チェックポイントを有効にしたクエリは、Databricks Runtime 13.3 LTS 以降でのみ実行できます。 変更ログのチェックポイントを無効にして従来のチェックポイントの動作に戻すことはできますが、これらのクエリは引き続き Databricks Runtime 13.3 LTS 以降で実行する必要があります。 これらの変更を有効にするには、ジョブを再起動する必要があります。
RocksDB 状態ストア metrics
各状態演算子は、RocksDB インスタンスで実行される状態管理操作に関連するメトリクスを収集して状態ストアを監視し、ジョブの速度低下のデバッグに役立つ可能性があります。 これらのメトリクスは、状態演算子が実行されているすべてのタスクにわたって、ジョブ内の状態演算子ごとに集計 (合計) されます。 これらのメトリクスは、 StreamingQueryProgress
の stateOperators
フィールド内の customMetrics
マップの一部です。以下は、JSON 形式の StreamingQueryProgress
の例です ( StreamingQueryProgress.json()
を使用して取得)。
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
メトリクスの詳細な説明は次のとおりです。
メトリクス name |
説明 |
---|---|
rocksdbCommitWriteBatchLatency |
メモリ内構造 (WriteBatch) のステージングされた書き込みをネイティブ RocksDB に適用するのにかかった時間 (ミリ単位)。 |
rocksdbCommitFlushLatency |
RocksDBのメモリ内の変更をローカルディスクにフラッシュするのにかかった時間(ミリ単位)。 |
rocksdbCommitCompactLatency |
チェックポイントのコミット中に圧縮 (オプション) にかかった時間 (ミリ秒単位)。 |
rocksdbCommitPauseLatency |
チェックポイントコミットの一部としてバックグラウンドワーカースレッド(圧縮など)を停止するのにかかった時間(ミリ単位)。 |
rocksdbCommitCheckpointLatency |
ネイティブRocksDBのスナップショットを取得してローカルディレクトリに書き込むのにかかった時間(ミリ単位)。 |
rocksdbCommitFileSyncLatencyMs |
ネイティブのRocksDBスナップショット関連ファイルを外部ストレージ(チェックポイントの場所)に同期するのにかかった時間(ミリ単位)。 |
rocksdbGetLatency |
基になるネイティブ |
rocksdbPutCount |
基になるネイティブ |
rocksdbGetCount |
ネイティブ |
rocksdbPutCount |
ネイティブ |
rocksdbTotalBytesReadByGet |
ネイティブ |
rocksdbTotalBytesWriteByPut |
ネイティブ |
rocksdbReadBlockCacheHitCount |
ローカルディスクからのデータの読み取りを回避するためにネイティブのRocksDBブロックキャッシュが使用される回数。 |
rocksdbReadBlockCacheMissCount |
ネイティブのRocksDBブロックキャッシュが欠落し、ローカルディスクからデータを読み取る必要があった回数。 |
rocksdbTotalBytesReadByCompaction |
ネイティブのRocksDBコンパクションプロセスによってローカルディスクから読み取られたバイト数。 |
rocksdbTotalBytesWriteByCompaction |
ネイティブのRocksDB圧縮プロセスによってローカルディスクに書き込まれたバイト数。 |
rocksdbTotalCompactionLatencyMs |
RocksDBの圧縮にかかった時間(ミリ単位)(バックグラウンドとコミット中に開始されたオプションの圧縮の両方)。 |
rocksdbWriterStallLatencyMs |
バックグラウンドの圧縮またはメモリテーブルのディスクへのフラッシュのためにライターが停止した時間 (ミリ単位)。 |
rocksdbTotalBytesReadThroughIterator |
一部のステートフル操作( |