構造化ストリーミングの運用に関する考慮事項

この記事には、Databricks のジョブを使用して構造化ストリーミング ワークロードをスケジュールするための推奨事項が含まれています。

Databricks では、常に次のことを行うことをお勧めします。

  • displaycount などの結果を返す不要なコードをノートブックから削除します。

  • 汎用コンピュートを使用して構造化ストリーミング ワークロードを実行しないでください。 常にジョブ コンピュートを使用してストリームをジョブとしてスケジュールします。

  • Continuousモードを使用してジョブをスケジュールします。

  • コンピュート for 構造化ストリーミング ジョブの自動スケーリングを有効にしないでください。

一部のワークロードには、次のような利点があります。

Databricks では、構造化ストリーミング ワークロードの本番運用インフラストラクチャの管理の複雑さを軽減するための Delta Live Tables を導入しました。 Databricks では、新しい構造化ストリーミング パイプラインに Delta Live Tables を使用することをお勧めします。 「Delta Live Tables とは」を参照してください。

コンピュートのオートスケーリングは、構造化ストリーミングワークロードのクラスターサイズのスケールダウンに対して制限があります。Databricksでは、ストリーミングワークロードの強化オートスケーリングでDelta Live Tablesを使用することを推奨しています。「強化オートスケールを使用してDelta Live Tablesパイプラインのクラスター使用率を最適化する」を参照してください。

障害を想定するようにストリーミング ワークロードを設計する

Databricks では、失敗時に自動的に再起動するようにストリーミング ジョブを常に構成することをお勧めします。 スキーマの進化を含む一部の機能では、構造化ストリーミング ワークロードが自動的に再試行されるように構成されていることを前提としています。 「失敗時にストリーミング クエリを再開するための構造化ストリーミング ジョブの構成」を参照してください。

foreachBatch などの一部の操作では、exactly-once ではなく at-once の保証が提供されます。これらの操作では、処理パイプラインがべき等であることを確認する必要があります。 「foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。

クエリが再開されると、前回の実行中に計画されたマイクロバッチが処理されます。 メモリ不足エラーが原因でジョブが失敗した場合、またはマイクロバッチが大きすぎるためにジョブを手動でキャンセルした場合は、マイクロバッチを正常に処理するためにコンピュートをスケールアップする必要があります。

実行間で構成を変更した場合、これらの構成は計画された最初の新しいバッチに適用されます。 構造化ストリーミング クエリの変更後の回復を参照してください。

ジョブはいつ再試行されますか?

Databricks ジョブの一部として複数のタスクをスケジュールできます。 連続トリガーを使用してジョブを構成する場合、タスク間の依存関係を設定することはできません。

次のいずれかの方法を使用して、1 つのジョブで複数のストリームをスケジュールすることを選択できます。

  • 複数のタスク: 連続トリガーを使用してストリーミング ワークロードを実行する複数のタスクを持つジョブを定義します。

  • 複数のクエリ: 1 つのタスクのソース コードで複数のストリーミング クエリを定義します。

これらの戦略を組み合わせることもできます。 次の表では、これらのアプローチを比較しています。

複数のタスク

複数のクエリ

コンピュートはどのように共有されますか?

Databricks では、各ストリーミング タスクに適切なサイズでジョブ コンピュートをデプロイすることをお勧めします。 必要に応じて、タスク間でコンピュートを共有できます。

すべてのクエリは同じコンピュートを共有します。 オプションで、 スケジューラー・プールに照会を割り当てることができます。

再試行はどのように処理されますか?

すべてのタスクは、ジョブが再試行される前に失敗する必要があります。

クエリが失敗した場合、タスクは再試行します。

失敗時にストリーミングクエリーを再開するように構造化ストリーミングジョブを構成する

Databricks では、すべてのストリーミング ワークロードを継続的トリガーを使用して構成することをお勧めします。 「ジョブの継続的な実行」を参照してください。

連続トリガーは、デフォルトで次の動作を提供します。

  • ジョブの複数の並列実行を防止します。

  • 前の実行が失敗したときに、新しい実行を開始します。

  • 再試行に指数バックオフを使用します。

Databricks 、ワークフローをスケジュールする際には、常に All-purpose コンピュートではなく、ジョブ コンピュートを使用することをお勧めします。 ジョブの失敗と再試行時に、新しいコンピュート リソースがデプロイされます。

streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()を使用する必要はありません。ジョブは、ストリーミング クエリがアクティブなときに実行が自動的に完了しないようにします。

複数のストリーミング クエリにスケジューラ プールを使用する

同じソース コードから複数のストリーミング クエリを実行するときに、クエリにコンピュート容量を割り当てるようにスケジュール プールを構成できます。

デフォルトでは、ノートブックで開始されたすべてのクエリは、同じ公正なスケジューリングプールで実行されます。 ノートブック内のすべてのストリーミング クエリのトリガーによって生成された Apache Spark ジョブは、"先入れ先出し" (FIFO) の順序で 1 つずつ実行されます。 これにより、クエリがクラスター リソースを効率的に共有しないため、クエリに不必要な遅延が発生する可能性があります。

スケジューラ プールを使用すると、コンピュート リソースを共有する構造化ストリーミング クエリを宣言できます。

次の例では、 query1 を専用プールに割り当て、 query2query3 はスケジューラ プールを共有します。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

ローカル プロパティの構成は、ストリーミング クエリを開始するのと同じノートブック セルに存在する必要があります。

詳細については Apache 公正なスケジューラのドキュメント を参照してください。