ウォーターマークを適用してデータ処理のしきい値を制御する

この記事では、透かしの基本的な概念を紹介し、一般的なステートフル ストリーミング操作で透かしを使用するための推奨事項を示します。 ステートフル ストリーミング操作にウォーターマークを適用して、状態に保持されるデータの量が無限に拡大し、実行時間の長いストリーミング操作中にメモリの問題が発生し、処理待機時間が長くならないようにする必要があります。

ウォーターマークとは何ですか?

構造化ストリーミングでは、ウォーターマークを使用して、特定の状態エンティティの更新の処理を続行する時間のしきい値を制御します。 状態エンティティの一般的な例は次のとおりです。

  • 時間枠での集計。

  • 2 つのストリーム間の結合における一意キー。

ウォーターマークを宣言するときは、ストリーミング DataFrame のタイムスタンプ フィールドとウォーターマークしきい値を指定します。 新しいデータが到着すると、ステートマネージャーは指定されたフィールドの最新のタイムスタンプを追跡し、遅延しきい値内のすべてのレコードを処理します。

次の例では、ウィンドウカウントに 10 分のウォーターマークしきい値を適用します。

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

この例では、次のようになります。

  • event_time 列は、10 分のウォーターマークと 5 分のタンブリング ウィンドウを定義するために使用されます。

  • 重複しない 5 分間のウィンドウごとに観測された id ごとにカウントが収集されます。

  • 状態情報は、ウィンドウの終わりが最新の観測 event_timeより 10 分古くなるまで、カウントごとに保持されます。

重要

ウォーターマークしきい値は、指定されたしきい値内に到着したレコードが、定義されたクエリーのセマンティクスに従って処理されることを保証します。 指定されたしきい値を超えて到着した遅延到着レコードは、クエリーメトリクスを使用して処理される可能性がありますが、これは保証されません。

ウォーターマークは処理時間とスループットにどのように影響しますか?

ウォーターマークは出力モードと相互作用して、データがシンクに書き込まれるタイミングを制御します。 ウォーターマークは処理される状態情報の総量を減らすため、ウォーターマークの効果的な使用は、効率的なステートフル ストリーミング スループットに不可欠です。

すべてのステートフル操作ですべての出力モードがサポートされているわけではありません。

ウィンドウ集計のウォーターマークと出力モード

次の表は、ウォーターマークが定義されたタイムスタンプの集計を使用したクエリーの処理の詳細を示しています。

出力モード

振舞い

追加

ウォーターマークのしきい値を超えると、行がターゲット テーブルに書き込まれます。 すべての書き込みは、遅延しきい値に基づいて遅延されます。 古い集計状態は、しきい値が過ぎると削除されます。

更新

行は、結果が計算されるときにターゲット テーブルに書き込まれ、新しいデータが到着したときに更新および上書きできます。 古い集計状態は、しきい値が過ぎると削除されます。

完成

集計状態は削除されません。 ターゲット表は、トリガーごとに書き直されます。

ストリームとストリームの結合のウォーターマークと出力

複数のストリーム間の結合は追加モードのみをサポートし、一致したレコードは検出される各バッチに書き込まれます。 内部結合の場合、Databricks では、各ストリーミング データ パラメーターに透かしのしきい値を設定することをお勧めします。 これにより、古いレコードの状態情報を破棄できます。 ウォーターマークがない場合、構造化ストリーミングは、結合の両側からすべてのキーを各トリガーで結合しようとします。

構造化ストリーミングには、外部結合をサポートするための特別なセマンティクスがあります。 ウォーターマークは、一致しなかった後にキーを null 値で書き込む必要があることを示すため、外部結合には必須です。 外部結合は、データ処理中に一致しないレコードを記録するのに役立ちますが、結合は追加操作としてテーブルに書き込むだけなので、この欠落データは遅延しきい値が経過するまで記録されないことに注意してください。

構造化ストリーミングの複数のウォーターマークポリシーを使用した遅延データのしきい値の制御

複数の構造化ストリーミング入力を操作する場合は、複数のウォーターマークを設定して、遅れて到着するデータの許容しきい値を制御できます。 ウォーターマークを構成すると、状態情報を制御し、待機時間に影響を与えることができます。

ストリーミングクエリーは、ユニオンまたは結合された複数の入力ストリームを持つことができます。 各入力ストリームは、ステートフル操作のために許容する必要がある遅延データの異なるしきい値を持つことができます。 これらのしきい値は、各入力ストリームで withWatermarks("eventTime", delay) を使用して指定します。 ストリームとストリームの結合を使用したクエリーの例を次に示します。

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

クエリーの実行中に、構造化ストリーミングは、各入力ストリームで見られる最大イベント時間を個別に追跡し、対応する遅延に基づいてウォーターマークを計算し、ステートフル操作に使用する単一のグローバルウォーターマークを選択します。 デフォルトでは、ストリームの 1 つが他のストリームより遅れた場合 (たとえば、ストリームの 1 つがアップストリーム障害のためにデータの受信を停止した場合など) に、データが遅すぎるために誤ってドロップされないようにするため、最小値がグローバル ウォーターマークとして選択されます。 つまり、グローバルウォーターマークは最も遅いストリームのペースで安全に移動し、それに応じてクエリー出力が遅延します。

より高速な結果を得るには、SQL 構成 spark.sql.streaming.multipleWatermarkPolicymax に設定することで、グローバル ウォーターマークとして最大値を選択するように複数ウォーターマーク ポリシーを設定できます (デフォルトは min)。 これにより、グローバルウォーターマークが最速ストリームのペースで移動できます。 ただし、この構成では、最も低速なストリームからデータが削除されます。 このため、Databricks では、この構成を慎重に使用することをお勧めします。

ウォーターマーク内の重複をドロップする

Databricks Runtime 13.3 LTS 以降では、一意の識別子を使用して、ウォーターマークしきい値内のレコードの重複を排除できます。

構造化ストリーミングは、一度だけ処理を保証しますが、データソースからのレコードの重複を自動的に排除することはありません。 dropDuplicatesWithinWatermark を使用して、指定したフィールドのレコードの重複排除を行い、一部のフィールド (イベント時間や到着時間など) が異なる場合でも、ストリームから重複を削除できます。

指定されたウォーターマーク内に到着した重複レコードは、確実に削除されます。 この保証は一方向でのみ厳格であり、指定されたしきい値を超えて到着した重複レコードも削除される可能性があります。 すべての重複を削除するには、重複するイベント間の最大タイムスタンプ差よりも長いウォーターマークの遅延しきい値を設定する必要があります。

dropDuplicatesWithinWatermark メソッドを使用するには、次の例のようにウォーターマークを指定する必要があります。

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])