ストリーミングとインクリメンタル インジェスト
Databricks では、 Apache Spark 構造化ストリーミングを使用して、インジェスト ワークロードに関連する次のような多数の製品をバックアップします。
Auto Loader
COPY INTO
Delta Liveテーブルパイプライン
Databricks SQL のマテリアライズド ビューとストリーミング テーブル
この記事では、ストリーミング バッチ処理セマンティクスと増分バッチ処理セマンティクスの違いの一部について説明し、Databricks で必要なセマンティクスのインジェスト ワークロードを構成する概要について説明します。
ストリーミングと増分バッチ取り込みの違いは何ですか?
可能なインジェストワークフロー構成は、ほぼリアルタイムの処理から頻度の低い増分バッチ処理まで多岐にわたります。 どちらのパターンも Apache Spark 構造化ストリーミングを使用してインクリメンタル処理を強化しますが、セマンティクスは異なります。 わかりやすくするために、この記事では、ほぼリアルタイムのインジェストを ストリーミング インジェスト と呼び、頻度の低いインクリメンタル処理を インクリメンタル バッチ インジェストと呼んでいます。
ジョブによる取り込み
Databricks ジョブを使用すると、ノートブック、ライブラリ、Delta Live Tables パイプライン、Databricks SQL クエリなどのワークフローを調整し、タスクをスケジュールできます。
注:
すべての Databricks コンピュート タイプとタスク タイプを使用して、増分バッチ インジェストを構成できます。 ストリーミング インジェストは、クラシック ジョブ コンピュート と Delta Live Tablesの本番運用でのみサポートされています。
ジョブには、主に 2 つの操作モードがあります。
連続ジョブ は、障害が発生すると自動的に再試行します。 このモードは、ストリーミング インジェスト用です。
トリガーされたジョブは、 トリガーされたときにタスクを実行します。 トリガーには、次のものが含まれます。
指定したスケジュールでジョブを実行する時間ベースのトリガー。
指定した場所にファイルが配置されたときにジョブを実行するファイルベースのトリガー。
REST API 呼び出し、Databricks CLI コマンドの実行、ワークスペース UI の [ 今すぐ実行 ] ボタンのクリックなど、その他のトリガー。
増分バッチ ワークロードの場合は、次のように AvailableNow
トリガー モードを使用してジョブを構成します。
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
ストリーミングワークロードの場合、デフォルトのトリガー間隔は processingTime ="500ms"
です。 次の例は、マイクロバッチを 5 秒ごとに処理する方法を示しています。
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
重要
サーバレス ジョブは、構造化ストリーミングの Scala、連続モード、または時間ベースのトリガ間隔をサポートしていません。 クラシック ジョブは、ほぼリアルタイムのインジェスト セマンティクスが必要な場合は、使用します。