foreachBatch を使用して任意のデータ シンクに書き込む

この記事では、構造化ストリーミングで foreachBatch を使用して、既存のストリーミング シンクがないデータソースにストリーミング クエリーの出力を書き込む方法について説明します。

コードパターン streamingDF.writeStream.foreachBatch(...) を使用すると、ストリーミングクエリーのすべてのマイクロバッチの出力データにバッチ関数を適用できます。 foreachBatch で使用される関数は、次の 2 つのパラメーターを取ります。

  • マイクロバッチの出力データを持つ DataFrame 。

  • マイクロバッチの一意の ID。

構造化ストリーミングでの Delta Lake マージ操作には foreachBatch を使用する必要があります。 foreachBatch を使用したストリーミングクエリーからのアップサートを参照してください。

追加の DataFrame 操作を適用する

ストリーミング DataFrame sでは、Spark が増分プランの生成をサポートしていないため、多くのDataFrame およびデータセット操作はサポートされていません。 foreachBatch() を使用すると、これらの操作の一部を各マイクロバッチ出力に適用できます。たとえば、 foreachBatch() と SQL MERGE INTO 操作を使用して、ストリーミング集計の出力を更新モードで Delta テーブルに書き込むことができます。 詳細については、「 マージ先」を参照してください。

重要

  • foreachBatch() 少なくとも一度だけ書き込み保証を提供します。 ただし、関数に提供される batchId を使用して、出力を重複排除し、一度だけ保証することができます。 どちらの場合も、エンドツーエンドのセマンティクスについて自分で推論する必要があります。

  • foreachBatch() 連続処理モードでは 、基本的にストリーミングクエリーのマイクロバッチ実行に依存しているため、動作しません。連続モードでデータを書き込む場合は、代わりに foreach() を使用します。

空のデータフレームは foreachBatch() で呼び出すことができ、適切な操作を可能にするためにユーザーコードは回復力が必要です。 次に例を示します。

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0 での foreachBatch の動作の変更

共有アクセス モードで構成されたコンピュート上のDatabricks Runtime 14.0 以降では、次の動作の変更が適用されます。

  • print() コマンドは出力をドライバー ログに書き込みます。

  • 関数内の dbutils.widgets サブモジュールにはアクセスできません。

  • 関数内で参照されるファイル、モジュール、またはオブジェクトはシリアル化可能であり、Spark で使用できる必要があります。

既存のバッチデータソースの再利用

foreachBatch()を使用すると、構造化ストリーミングをサポートしていない可能性があるデータ シンクに既存のバッチ データ ライターを使用できます。次に例をいくつか示します。

他の多くのバッチデータソースは、 foreachBatch()から使用できます。 「データソースへの接続」を参照してください。

複数の場所に書き込む

ストリーミング クエリーの出力を複数の場所に書き込む必要がある場合、Databricks では、最適な並列化とスループットを得るために、複数の構造化ストリーミング ライターを使用することをお勧めします。

foreachBatch を使用して複数のシンクに書き込むと、ストリーミング書き込みの実行がシリアル化されるため、各マイクロバッチの待機時間が長くなる可能性があります。

foreachBatch を使用して複数の Delta テーブルに書き込む場合は、「foreachBatch でのべき等テーブルの書き込み」を参照してください。