ストリーミングとインクリメンタル インジェスト

Databricks では、 Apache Spark 構造化ストリーミングを使用して、インジェスト ワークロードに関連する次のような多数の製品をバックアップします。

  • Auto Loader

  • COPY INTO

  • Delta Liveテーブルパイプライン

  • Databricks SQL のマテリアライズド ビューとストリーミング テーブル

この記事では、ストリーミング バッチ処理セマンティクスと増分バッチ処理セマンティクスの違いの一部について説明し、Databricks で必要なセマンティクスのインジェスト ワークロードを構成する概要について説明します。

ストリーミングと増分バッチ取り込みの違いは何か

可能なインジェストワークフロー構成は、ほぼリアルタイムの処理から頻度の低い増分バッチ処理まで多岐にわたります。 どちらのパターンも Apache Spark 構造化ストリーミングを使用してインクリメンタル処理を強化しますが、セマンティクスは異なります。 わかりやすくするために、この記事では、ほぼリアルタイムのインジェストを ストリーミング インジェスト と呼び、頻度の低いインクリメンタル処理を インクリメンタル バッチ インジェストと呼んでいます。

ストリーミング インジェスト

ストリーミングは、データ取り込みとテーブル更新のコンテキストでは、 Databricks が常時オンのインフラストラクチャを使用してソースからシンクにレコードをマイクロバッチで取り込む、リアルタイムデータに近い処理を指します。 ストリーミングワークロードは、インジェストを停止する障害が発生しない限り、設定されたデータソースから更新を継続的に取り込みます。

増分バッチ取り込み

増分バッチインジェストとは、すべての新しいレコードが短命なジョブのデータソースから処理されるパターンを指します。 増分バッチ取り込みは、多くの場合、スケジュールに従って行われますが、手動またはファイルの到着に基づいてトリガーすることもできます。

増分バッチ取り込みは 、バッチ 取り込み とは異なり、データソース内の新しいレコードを自動的に検出し、既に取り込まれたレコードを無視します。

ジョブによる取り込み

Databricks ジョブを使用すると、ノートブック、ライブラリ、Delta Live Tables パイプライン、Databricks SQL クエリなどのワークフローを調整し、タスクをスケジュールできます。

注:

すべての Databricks コンピュート タイプとタスク タイプを使用して、増分バッチ インジェストを構成できます。 ストリーミング インジェストは、クラシック ジョブ コンピュート と Delta Live Tablesの本番運用でのみサポートされています。

ジョブには、主に 2 つの操作モードがあります。

  • 連続ジョブ は、障害が発生すると自動的に再試行します。 このモードは、ストリーミング インジェスト用です。

  • トリガーされたジョブは、 トリガーされたときにタスクを実行します。 トリガーには、次のものが含まれます。

    • 指定したスケジュールでジョブを実行する時間ベースのトリガー。

    • 指定した場所にファイルが配置されたときにジョブを実行するファイルベースのトリガー。

    • REST API 呼び出し、Databricks CLI コマンドの実行、ワークスペース UI の [ 今すぐ実行 ] ボタンのクリックなど、その他のトリガー。

増分バッチ ワークロードの場合は、次のように AvailableNow トリガー モードを使用してジョブを構成します。

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

ストリーミングワークロードの場合、デフォルトのトリガー間隔は processingTime ="500ms"です。 次の例は、マイクロバッチを 5 秒ごとに処理する方法を示しています。

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

重要

サーバレス ジョブは、構造化ストリーミングの Scala、連続モード、または時間ベースのトリガ間隔をサポートしていません。 クラシック ジョブは、ほぼリアルタイムのインジェスト セマンティクスが必要な場合は、使用します。

Delta Live Tablesを使用したインジェスト

ジョブと同様に、Delta Live Tables パイプラインは、トリガー モードまたは連続モードで実行できます。 ストリーミング テーブルでほぼリアルタイムのストリーミング セマンティクスを実現するには、連続モードを使用します。

ストリーミングテーブルを使用して、クラウドオブジェクトストレージ、Apache Kafka、Amazon Kinesis、Google Pub/Sub、または Apache Pulsar からのストリーミングまたは増分バッチ取り込みを設定します。

Databricks SQLを使用したインジェスト

COPY INTO は、クラウド・オブジェクト・ストレージ内のデータ・ファイルの増分バッチ処理のための使い慣れた SQL 構文を提供します。 COPY INTO 動作は、クラウド・オブジェクト・ストレージのストリーミング・テーブルでサポートされるパターンと似ていますが、すべてのデフォルト設定が、サポートされているすべてのファイル・フォーマットで同等であるとは限りません。