ジョブを使用して構造化ストリーミングクエリの失敗から回復する

構造化ストリーミングは、ストリーミング クエリにフォールト トレランスとデータの一貫性を提供します。Databricks ジョブを使用すると、構造化ストリーミング クエリが障害時に自動的に再起動するように簡単に構成できます。 ストリーミング クエリのチェックポイントを有効にすると、失敗後にクエリを再開できます。 再開されたクエリは、失敗したクエリが中断したところから続行されます。

構造化ストリーミングのチェックポイントを有効にするクエリー

Databricks では、クエリーを開始する前に、クラウド ストレージ パスに checkpointLocation オプションを常に指定することをお勧めします。 例えば:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

このチェックポイントの場所には、クエリーを識別する重要な情報がすべて保持されます。 各クエリーには、異なるチェックポイントの場所が必要です。 複数のクエリーが同じ場所を持つことはできません。 詳細については、「 構造化ストリーミング プログラミング ガイド」を参照してください。

ほとんどの種類の出力シンクには checkpointLocation が必要ですが、メモリ シンクなどの一部のシンクでは、 checkpointLocationを指定しない場合、一時的なチェックポイントの場所が自動的に生成される場合があります。 これらの一時的なチェックポイントの場所は、フォールト トレランスやデータの整合性の保証を保証するものではなく、適切にクリーンアップされない可能性があります。 常に checkpointLocationを指定することで、潜在的な落とし穴を回避します。

失敗時にストリーミングクエリーを再開するように構造化ストリーミングジョブを構成する

ストリーミング クエリを含むノートブックまたは JAR を使用して Databricksジョブを作成し、次のように構成できます。

  • 常に新しいクラスターを使用します。

  • 失敗した場合は常に再試行してください。

スキーマ進化を使用してストリーミング ワークロードを構成する場合、ジョブの失敗時に自動的に再起動することが特に重要です。 スキーマ進化は、スキーマの変更が検出されたときに予期されるエラーを発生させ、ジョブの再開時に新しいスキーマを使用してデータを適切に処理することにより、 Databricks上で機能します。 Databricks では、スキーマ進化を伴うクエリを含むストリーミング タスクを常に Databricks ジョブで自動的に再起動するように構成することをお勧めします。

ジョブは構造化ストリーミングAPIsと緊密に統合されており、実行中にアクティブなすべてのストリーミング クエリを監視できます。 この構成により、クエリの一部が失敗した場合、ジョブは(他のすべてのクエリとともに)実行を自動的に終了し、新しいクラスターで新しい実行を開始します。 これにより、ノートブックまたは JAR コードが再実行され、すべてのクエリが再開されます。 これは、良好な状態に戻るための最も安全な方法です。

  • アクティブなストリーミングクエリーのいずれかで障害が発生すると、アクティブな実行が失敗し、他のすべてのストリーミングクエリーが終了します。

  • ノートブックの最後で streamingQuery.awaitTermination()spark.streams.awaitAnyTermination() を使用する必要はありません。 ジョブは、ストリーミング クエリーがアクティブなときに実行が完了しないようにします。

  • Databricks では、構造化ストリーミング ノートブックを調整するときに、 %rundbutils.notebook.run() の代わりにジョブを使用することをお勧めします。 「 別のノートブックから Databricks ノートブックを実行する」を参照してください。

推奨されるジョブ構成の例を次に示します。

  • クラスター: 新しいクラスターを使用し、最新の Spark バージョン (またはバージョン 2.1 以降) を使用するには、常にこれを設定します。 Spark 2.1 以降で開始されたクエリーは、クエリーおよび Spark のバージョンアップ後に復元できます。

  • 通知: 失敗時にEmail通知が必要な場合は、これを設定します。

  • スケジュール: スケジュールを設定しないでください

  • タイムアウト: タイムアウトを設定しません。 ストリーミングクエリーは無期限に実行されます。

  • 最大並列実行数: 1 に設定します。 各クエリーのインスタンスが同時にアクティブになっている必要があります。

  • 再試行: 無制限に設定します。

これらの構成を理解するには、「Databricks ジョブの作成と実行」を参照してください。

構造化ストリーミングクエリーの変更後の回復

同じチェックポイントの場所からの再起動の間に許可されるストリーミングクエリーの変更には制限があります。 ここでは、許可されていない、または変更の影響が明確に定義されていない変更をいくつか紹介します。 それらすべてについて:

  • 許可 という用語は、指定された変更を実行できることを意味しますが、その効果のセマンティクスが明確に定義されているかどうかは、クエリーと変更によって異なります。

  • 許可されない という用語は、再起動されたクエリーが予期しないエラーで失敗する可能性があるため、指定された変更を行わないことを意味します。

  • sdf は、 sparkSession.readStream で生成されたストリーミング DataFrame/データセットを表します。

構造化ストリーミングクエリーの変更の種類

  • 入力ソースの数または種類 (つまり、異なるソース) の変更: これは許可されていません。

  • 入力ソースのパラメーターの変更: これが許可されるかどうか、および変更のセマンティクスが適切に定義されているかどうかは、ソースとクエリーによって異なります。 次に例をいくつか示します。

    • レート制限の追加、削除、および変更は許可されます。

      spark.readStream.format("kafka").option("subscribe", "article")
      

      宛先

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • 購読している記事やファイルへの変更は、結果が予測できないため、通常は許可されません spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • トリガー間隔の変更: 増分バッチと時間間隔の間でトリガーを変更できます。 実行間のトリガー間隔の変更を参照してください。

  • 出力シンクの種類の変更: シンクのいくつかの特定の組み合わせ間の変更は許可されます。 これはケースバイケースで確認する必要があります。 次に例をいくつか示します。

    • Kafka シンクへのファイル シンクが許可されます。 Kafka には新しいデータのみが表示されます。

    • Kafka シンクからファイル シンクへの移動は許可されていません。

    • Kafka シンクが foreach に変更された場合、またはその逆が許可されます。

  • 出力シンクのパラメーターの変更: これが許可されるかどうか、および変更のセマンティクスが適切に定義されているかどうかは、シンクとクエリーによって異なります。 次に例をいくつか示します。

    • ファイル シンクの出力ディレクトリへの変更は許可されていません sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")

    • 出力トピックへの変更は許可されます: sdf.writeStream.format("kafka").option("topic", "topic1") sdf.writeStream.format("kafka").option("topic", "topic2")

    • ユーザー定義の foreach シンク (つまり、 ForeachWriter コード) への変更は許可されますが、変更のセマンティクスはコードによって異なります。

  • 投影法/フィルター/地図のような操作の変更:場合によっては許可されます。 例えば:

    • フィルターの追加/削除は許可されています: sdf.selectExpr("a") から sdf.where(...).selectExpr("a").filter(...)

    • 同じ出力スキーマでのプロジェクションの変更が許可されます: sdf.selectExpr("stringColumn AS json").writeStream から sdf.select(to_json(...).as("json")).writeStreamまで。

    • 出力スキーマが異なるプロジェクションの変更は条件付きで許可されます: sdf.selectExpr("a").writeStream から sdf.selectExpr("b").writeStream は、出力シンクでスキーマを "a" から "b"に変更できる場合にのみ許可されます。

  • ステートフルな操作の変更: ストリーミングクエリーの一部の操作では、結果を継続的に更新するために状態データを維持する必要があります。 構造化ストリーミングは、フォールト トレラント ストレージ (DBFS、GCS、AWS S3、Azure Blob Storage など) に状態データを自動的にチェックポイントし、再起動後に復元します。 ただし、これは、状態データのスキーマが再起動後も同じままであることを前提としています。 つまり、 ストリーミングクエリーのステートフル操作に対する変更(追加、削除、またはスキーマの変更)は、再起動の合間に許可されません。 状態の回復を確実にするために、再起動の合間にスキーマを変更してはならないステートフル操作の一覧を次に示します。

    • ストリーミング集約: たとえば、 sdf.groupBy("a").agg(...). グループ化キーまたは集計の数またはタイプの変更は許可されません。

    • ストリーミング重複排除: たとえば、 sdf.dropDuplicates("a"). グループ化キーまたは集計の数またはタイプの変更は許可されません。

    • ストリームとストリームの結合:たとえば、sdf1.join(sdf2, ...) (つまり、両方の入力が sparkSession.readStreamで生成されます)。スキーマまたは等結合列の変更は許可されません。 結合タイプ (外部または内部) の変更は許可されません。 結合条件のその他の変更は、明確に定義されていません。

    • 任意のステートフル操作: たとえば、 sdf.groupByKey(...).mapGroupsWithState(...)sdf.groupByKey(...).flatMapGroupsWithState(...). ユーザー定義状態のスキーマおよびタイムアウトのタイプに対する変更は許可されません。 ユーザー定義の状態マッピング関数内の変更は許可されますが、変更のセマンティック効果はユーザー定義ロジックによって異なります。 状態スキーマの変更を本当にサポートする場合は、スキーマの移行をサポートするエンコード/デコード スキームを使用して、複雑な状態データ構造をバイトに明示的にエンコード/デコードできます。 たとえば、状態を Avro エンコード バイトとして保存すると、バイナリ状態が復元されるため、クエリーの再起動の間に Avro-state-schema を変更できます。