非同期進行状況追跡とは
プレビュー
この機能はパブリックプレビュー段階です。
非同期の進行状況の追跡により、構造化ストリーミング パイプラインは、マイクロバッチ内の実際のデータ処理と非同期的に並行して進行状況のチェックポイントを作成できるため、 offsetLog
と commitLog
の維持に関連する待機時間が短縮されます。
注
非同期の進行状況追跡は、 Trigger.once
トリガーまたは Trigger.availableNow
トリガーでは機能しません。 これらのトリガーでこの機能を有効にしようとすると、クエリーが失敗します。
非同期の進行状況追跡は、待機時間を短縮するためにどのように機能しますか?
構造化ストリーミングは、クエリー処理の進行状況インジケーターとしてオフセットの永続化と管理に依存しています。 オフセット管理操作は、これらの操作が完了するまでデータ処理が発生しないため、処理待機時間に直接影響します。 非同期の進行状況の追跡により、構造化ストリーミング パイプラインは、これらのオフセット管理操作の影響を受けずに進行状況をチェックポイントできます。
チェックポイントの頻度はいつ構成する必要がありますか?
ユーザーは、進行状況にチェックポイントを設定する頻度を構成できます。 チェックポイント頻度のデフォルト設定は、ほとんどのクエリーに対して良好なスループットを提供します。 頻度の構成は、オフセット管理操作が処理可能な速度よりも高いレートで発生し、オフセット管理操作のバックログがますます増加するシナリオに役立ちます。 この増大するバックログを食い止めるために、データ処理はブロックまたは低速化され、本質的に処理動作を元に戻して、非同期の進行状況追跡の利点を排除します。
注
障害回復時間は、チェックポイント間隔時間の増加とともに増加します。 障害が発生した場合、パイプラインは、前回の成功したチェックポイントの前にすべてのデータを再処理する必要があります。 ユーザーは、通常の処理中の待機時間の短縮と障害発生時の復旧時間の間のこのトレードオフを検討できます。
非同期進行状況追跡にはどのような構成が関連付けられていますか。
オプション |
値 |
デフォルト |
説明 |
---|---|---|---|
asyncProgressTrackingEnabled |
正誤確認 |
偽 |
非同期の進行状況追跡を有効または無効にする |
asyncProgressTrackingCheckpointIntervalMs |
ミリ秒 |
1000 |
オフセットと完了コミットをコミットする間隔 |
ユーザーはどのようにして非同期の進行状況追跡を有効にできますか?
ユーザーは、次のコードのようなコードを使用して、この機能を有効にすることができます。
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
非同期進行状況追跡をオフにする
非同期進行状況の追跡が有効になっている場合、フレームワークはすべてのバッチの進行状況をチェックポイントしません。 これに対処するには、非同期の進行状況追跡を無効にする前に、次の設定で少なくとも 2 つのマイクロバッチを処理します。
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", 0)
少なくとも 2 つのマイクロバッチの処理が終了したら、クエリーを停止します。 これで、非同期の進行状況の追跡を安全に無効にして、クエリーを再開できます。
この手順を完了せずに非同期の進行状況の追跡を無効にした場合、次のエラーが発生する可能性があります。
java.lang.IllegalStateException: batch x doesn't exist
ドライバー ログに、次のエラーが表示される場合があります。
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
このセクションの手順に従って非同期進行状況追跡を無効にすると、これらのエラーに対処し、ストリーミングワークロードを修復できます。