ワークフローで Delta LiveTables パイプラインを実行する

Delta Live Tables パイプラインは、Databricks ジョブ、Apache Airflow、または Azure Data Factory を使用したデータ処理ワークフローの一部として実行できます。

ジョブ

Databricks ジョブで複数のタスクを調整して、データ処理ワークフローを実装できます。 Delta Live Tables パイプラインをジョブに含めるには、 ジョブの作成時に パイプライン タスクを使用します。

Apache Airflow

Apache Airflow は、データワークフローを管理およびスケジュールするためのオープンソースソリューションです。 エアフローは、ワークフローを操作の有向非巡回グラフ (DAG) として表します。 Python ファイルでワークフローを定義すると、Airflow によってスケジュールと実行が管理されます。 Databricksでの Airflow のインストールと使用に関する情報については、「 Apache Airflow を使用した Databricks ジョブのオーケストレーション」を参照してください。

エアフロー ワークフローの一部として Delta Live Tables パイプラインを実行するには、 DatabricksSubmitRunOperator を使用します。

要件

Delta Live Tablesのエアフローサポートを使用するには、次のものが必要です。

  • エアフローバージョン2.1.0 またはそれ以降。

  • Databricks プロバイダー パッケージのバージョン 2.1.0またはそれ以降。

次の例では、識別子 8279d543-063c-4d63-9926-dae38e35ce8b を使用して Delta Live Tables パイプラインの更新をトリガーするエアフロー DAG を作成します。

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_ID をワークスペースへの Airflow 接続 の識別子に置き換えます。

この例を airflow/dags ディレクトリに保存し、エアフローUIを使用してDAG を表示およびトリガー します。 Delta Live Tables UI を使用して、パイプラインの更新の詳細を表示します。

Azure Data Factory

Azure Data Factory は、データ統合と変換のワークフローを調整できるクラウドベースの ETL サービスです。 Azure Data Factory では、ノートブック、JAR タスク、Python スクリプトなど、ワークフローでの Databricks タスクの実行を直接サポートしています。 Azure Data Factory Web アクティビティ から Delta Live Tables API を呼び出して、ワークフローにパイプラインを含めることもできます。たとえば、Azure データ ファクトリからパイプラインの更新をトリガーするには、次のようにします。

  1. データ ファクトリを作成するか、既存のデータ ファクトリ を開きます。

  2. 作成が完了したら、データ ファクトリのページを開き、[ Azure Data Factory Studio を開く ] タイルをクリックします。 Azure データ ファクトリのユーザー インターフェイスが表示されます。

  3. Azure Data Factory Studio ユーザー インターフェイスの [新規] ドロップダウン メニューから [パイプライン] を選択して、新しい Azure Data Factory パイプラインを作成します。

  4. [ アクティビティ ] ツールボックスで、[ 全般 ] を展開し、 Web アクティビティをパイプライン キャンバスにドラッグします。 [ 設定 ] タブをクリックし、次の値を入力します。

    セキュリティのベストプラクティスとして、自動化されたツール、システム、スクリプト、およびアプリで認証する場合、 Databricks では、ワークスペースユーザーではなく、 サービスプリンシパル に属する個人用アクセストークンを使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、「 サービスプリンシパルのトークンを管理する」を参照してください。

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      <get-workspace-instance>を交換してください。

      <pipeline-id> をパイプライン識別子に置き換えます。

    • 方法: ドロップダウン メニューから [POST ] を選択します。

    • ヘッダー: [+ 新規] をクリックします。 名前 テキスト ボックスに、 Authorizationと入力します。 テキスト ボックスに、 Bearer <personal-access-token>と入力します。

      <personal-access-token> を Databricks の個人用アクセストークンに置き換えます。

    • 本文: 追加のリクエストパラメーターを渡すには、パラメーターを含むJSONドキュメントを入力します。 たとえば、更新を開始し、パイプラインのすべてのデータを再処理するには、次のようにします。 {"full_refresh": "true"}. 追加の要求パラメーターがない場合は、空の中括弧 ({}) を入力します。

Web アクティビティをテストするには、Data Factory UI のパイプライン ツール バーの [ デバッグ ] をクリックします。 実行の出力と状態 (エラーを含む) は、Azure Data Factory パイプラインの [ 出力 ] タブに表示されます。 Delta Live Tables UI を使用して、パイプラインの更新の詳細を表示します。

ヒント

一般的なワークフロー要件は、前のタスクの完了後にタスクを開始することです。 Delta Live Tables updates 要求は非同期であるため (要求は更新の開始後、更新が完了する前に返されます)、Delta Live Tables の更新に依存する Azure Data Factory パイプライン内のタスクは、更新が完了するまで待機する必要があります。 更新の完了を待機するオプションは、 Delta Live Tables 更新をトリガーする Web アクティビティの後に Until アクティビティ を追加することです。Until アクティビティで、次の操作を行います。

  1. 更新が完了するまで構成された秒数待機 する待機アクティビティ を追加します。

  2. Delta Live Tables 更新プログラムの詳細の取得 要求を使用して更新プログラムの状態を取得する待機アクティビティの後に Web アクティビティを追加します。応答の state フィールドは、更新が完了しているかどうかなど、更新の現在の状態を返します。

  3. state フィールドの値を使用して、Until アクティビティの終了条件を設定します。[ 変数の設定] アクティビティ を使用して、 state 値に基づいてパイプライン変数を追加し、この変数を終了条件に使用することもできます。