ステートフルクエリー の非同期状態チェックポイント

Databricks Runtime 10.4 LTS 以降で利用できます。

非同期状態チェックポイント処理では、ストリーミング クエリーの保証が一度だけ維持されますが、状態の更新時にボトルネックになっている一部の構造化ストリーミングステートフル ワークロードの全体的な待機時間を短縮できます。 これは、状態のチェックポイント処理が完了するのを待たずに、前のマイクロバッチの計算が完了するとすぐに次のマイクロバッチの処理を開始することによって実現されます。 次の表は、同期チェックポイント処理と非同期チェックポイント処理のトレードオフを比較したものです。

特性

同期チェックポイント

非同期チェックポイント処理

レイテンシー

各マイクロバッチの待ち時間が長くなります。

マイクロバッチがオーバーラップできるため、レイテンシーが短縮されます。

再起動

最後のバッチのみを再実行する必要があるため、高速リカバリ。

micro-バッチよりも再起動遅延が大きいため、再実行が必要になる場合があります。

非同期状態チェックポイント処理の恩恵を受ける可能性のあるストリーミング ジョブの特性を次に示します。

  • ジョブに1つ以上のステートフル操作(集約、 flatMapGroupsWithStatemapGroupsWithState、ストリームストリーム結合など)がある

  • 状態チェックポイントの待機時間は、バッチ実行の全体的な待機時間の主な要因の 1 つです。 この情報は、 ストリーミングクエリ進行状況 イベントで見つけることができます。 これらのイベントは、Spark ドライバーの log4j ログにも表示されます。 ストリーミング クエリの進行状況と、バッチ実行の全体的な待機時間に対する状態チェックポイントの影響を確認する方法の例を次に示します。

    •  {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "...",
         "batchId" : 0,
         "durationMs" : {
           "...",
           "triggerExecution" : 547730,
           "..."
         },
         "stateOperators" : [ {
           "...",
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
           "..."
         }]
       }
      
    • 上記のクエリー進捗イベントの状態チェックポイント遅延分析

      • バッチ期間 (durationMs.triggerDuration) は約 547 秒です。

      • 状態ストアのコミット待機時間 (stateOperations[0].commitTimeMs) は約 3,186 秒です。 コミット待機時間は、状態ストアを含むタスク間で集計されます。 この場合、そのようなタスクは64個あります(stateOperators[0].numShufflePartitions)。

      • 状態演算子を含む各タスクは、チェックポイントに平均 50 秒 (3,186/64) かかりました。 これは、バッチ期間に寄与する余分な待機時間です。 64 個のタスクすべてが同時に実行されていると仮定すると、チェックポイント ステップはバッチ期間の約 9% (50 秒 / 547 秒) に寄与します。 最大並列タスク数が 64 未満の場合、パーセンテージはさらに高くなります。

非同期状態チェックポイントの 有効化

非同期状態のチェックポイント処理には、 RocksDB ベースの状態ストア を使用する必要があります。 次の構成を設定します。

spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

非同期チェックポイント処理 の制約事項と要件

コンピュートの自動スケーリングには、構造化ストリーミング ワークロードのクラスター サイズをスケールダウンする制限があります。 Databricks 、ストリーミング ワークロードに拡張オートスケールを備えたDelta Live Tablesを使用することをお勧めします。 「拡張オートスケールDelta Live Tables Pipeline のクラスター使用率を最適化する」を参照してください。

  • 1 つ以上のストアで非同期チェックポイントに障害が発生すると、クエリーは失敗します。 同期チェックポイント モードでは、チェックポイントはタスクの一部として実行され、Spark はクエリが失敗する前にタスクを複数回再試行します。 このメカニズムは、非同期状態のチェックポイント処理には存在しません。 ただし、Databricks ジョブの再試行を使用すると、このようなエラーを自動的に 再試行できます。

  • 非同期チェックポイントは、マイクロバッチの実行間で状態ストアの場所が変更されない場合に最適です。 クラスターのサイズ変更と非同期状態チェックポイントの組み合わせは、クラスターのサイズ変更イベントの一部としてノードが追加または削除されると状態ストア インスタンスが再配布される可能性があるため、うまく機能しない可能性があります。

  • 非同期状態チェックポイントは、RocksDB 状態ストア プロバイダーの実装でのみサポートされます。 既定のメモリ内状態ストアの実装では、サポートされていません。