非同期進行状況追跡とは

プレビュー

この機能はパブリックプレビュー段階です。

非同期の進行状況の追跡により、構造化ストリーミング パイプラインは、マイクロバッチ内の実際のデータ処理と非同期的に並行して進行状況のチェックポイントを作成できるため、 offsetLogcommitLogの維持に関連する待機時間が短縮されます。

非同期進捗追跡

非同期の進行状況追跡は、 Trigger.once トリガーまたは Trigger.availableNow トリガーでは機能しません。 これらのトリガーでこの機能を有効にしようとすると、クエリーが失敗します。

非同期の進行状況追跡は、待機時間を短縮するためにどのように機能しますか?

構造化ストリーミングは、クエリー処理の進行状況インジケーターとしてオフセットの永続化と管理に依存しています。 オフセット管理操作は、これらの操作が完了するまでデータ処理が発生しないため、処理待機時間に直接影響します。 非同期の進行状況の追跡により、構造化ストリーミング パイプラインは、これらのオフセット管理操作の影響を受けずに進行状況をチェックポイントできます。

チェックポイントの頻度はいつ構成する必要がありますか?

ユーザーは、進行状況にチェックポイントを設定する頻度を構成できます。 チェックポイント頻度のデフォルト設定は、ほとんどのクエリーに対して良好なスループットを提供します。 頻度の構成は、オフセット管理操作が処理可能な速度よりも高いレートで発生し、オフセット管理操作のバックログがますます増加するシナリオに役立ちます。 この増大するバックログを食い止めるために、データ処理はブロックまたは低速化され、本質的に処理動作を元に戻して、非同期の進行状況追跡の利点を排除します。

障害回復時間は、チェックポイント間隔時間の増加とともに増加します。 障害が発生した場合、パイプラインは、前回の成功したチェックポイントの前にすべてのデータを再処理する必要があります。 ユーザーは、通常の処理中の待機時間の短縮と障害発生時の復旧時間の間のこのトレードオフを検討できます。

非同期進行状況追跡にはどのような構成が関連付けられていますか。

オプション

デフォルト

説明

asyncProgressTrackingEnabled

正誤確認

非同期の進行状況追跡を有効または無効にする

asyncProgressTrackingCheckpointIntervalMs

ミリ秒

1000

オフセットと完了コミットをコミットする間隔

ユーザーはどのようにして非同期の進行状況追跡を有効にできますか?

ユーザーは、次のコードのようなコードを使用して、この機能を有効にすることができます。

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

非同期進行状況追跡 をオフにする

非同期進行状況の追跡が有効になっている場合、フレームワークはすべてのバッチの進行状況をチェックポイントしません。 これに対処するには、非同期の進行状況追跡を無効にする前に、次の設定で少なくとも 2 つのマイクロバッチを処理します。

  • .option("asyncProgressTrackingEnabled", "true")

  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

少なくとも 2 つのマイクロバッチの処理が終了したら、クエリーを停止します。 これで、非同期の進行状況の追跡を安全に無効にして、クエリーを再開できます。

この手順を完了せずに非同期の進行状況の追跡を無効にした場合、次のエラーが発生する可能性があります。

java.lang.IllegalStateException: batch x doesn't exist

ドライバー ログに、次のエラーが表示される場合があります。

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

このセクションの手順に従って非同期進行状況追跡を無効にすると、これらのエラーに対処し、ストリーミングワークロードを修復できます。

非同期進捗追跡 の制限事項

この機能には、次の制限があります。

  • 非同期進行状況追跡は、Kafka をシンクとして使用するステートレス パイプラインでのみサポートされます。

  • 非同期の進行状況追跡では、バッチのオフセット範囲が障害発生時に変更できるため、一度だけエンドツーエンドの処理は保証されません。 Kafkaなどの一部のシンクは、一度だけ正確に保証することはありません。