ワークフローで Delta LiveTables パイプラインを実行する
Delta Live Tables パイプラインは、Databricks ジョブ、Apache Airflow、または Azure Data Factory を使用したデータ処理ワークフローの一部として実行できます。
ジョブ
Databricks ジョブで複数のタスクをオーケストレーションして、データ処理ワークフローを実装できます。 Delta Live Tables パイプラインをジョブに含めるには、ジョブの作成時にパイプライン タスクを使用します。ジョブの Delta Live Tables パイプライン タスクを参照してください。
Apache Airflow
Apache Airflowは、データ ワークフローを管理およびスケジュールするためのオープンソース ソリューションです。 Airflow は、ワークフローを操作の有向非巡回グラフ (DAG) として表現します。 Python ファイルでワークフローを定義し、Airflow がスケジュールと実行を管理します。 での インストールと使用については、AirflowDatabricks 「 を使用して Databricksジョブを調整する」 を参照してください。ApacheAirflow
エアフロー ワークフローの一部として Delta Live Tables パイプラインを実行するには、 DatabricksSubmitRunOperator を使用します。
要件
Delta Live Tablesのエアフローサポートを使用するには、次のものが必要です。
エアフローバージョン2.1.0 またはそれ以降。
Databricks プロバイダー パッケージのバージョン 2.1.0またはそれ以降。
例
次の例では、識別子 8279d543-063c-4d63-9926-dae38e35ce8b
を使用して Delta Live Tables パイプラインの更新をトリガーするエアフロー DAG を作成します。
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
CONNECTION_ID
を、ワークスペースへのAirflow 接続の識別子に置き換えます。
この例をairflow/dags
ディレクトリに保存し、Airflow UI を使用して DAG を表示およびトリガーします。 Delta Live Tables UI を使用して、パイプライン更新の詳細を表示します。
Azure Data Factory
注
Delta Live Tables と Azure Data Factory にはそれぞれ、エラー発生時の再試行回数を構成するオプションが含まれています。 再試行値が Delta Live Tables パイプライン と 、パイプラインを呼び出す Azure Data Factory アクティビティで構成されている場合、再試行回数は、Azure Data Factory の再試行値に Delta Live Tables の再試行値を掛けた値になります。
たとえば、パイプラインの更新が失敗した場合、 Delta Live Tables はデフォルトによって最大 5 回更新を再試行します。 Azure Data Factory の再試行が 3 回に設定され、Delta Live Tables パイプラインが 5 回の再試行のデフォルトを使用している場合、失敗した Delta Live Tables パイプラインは最大 15 回再試行される可能性があります。パイプラインの更新が失敗した場合に過剰な再試行を行わないように、Databricks では、Delta Live Tables パイプラインまたはパイプラインを呼び出す Azure Data Factory アクティビティを構成するときに、再試行回数を制限することをお勧めします。
Delta Live Tables パイプラインの再試行構成を変更するには、パイプラインの構成時に pipelines.numUpdateRetryAttempts
設定を使用します。
Azure Data Factory は、データ統合と変換のワークフローを調整できるクラウドベースの ETL サービスです。 Azure Data Factory は、ノートブック、JAR タスク、Python スクリプトなど、ワークフローでの Databricks タスクの実行を直接サポートします。 また、Azure Data Factory Web アクティビティ から Delta Live Tables API を呼び出すことで、パイプラインをワークフローに含めることもできます。たとえば、Azure Data Factory からパイプラインの更新をトリガーするには、次のようにします。
データ ファクトリを作成するか、既存のデータ ファクトリ を開きます。
作成が完了したら、データ ファクトリのページを開き、[ Azure Data Factory Studio を開く ] タイルをクリックします。 Azure データ ファクトリのユーザー インターフェイスが表示されます。
Azure Data Factory Studio ユーザー インターフェイスの [新規] ドロップダウン メニューから [パイプライン] を選択して、新しい Azure Data Factory パイプラインを作成します。
[ アクティビティ ] ツールボックスで、[ 全般 ] を展開し、 Web アクティビティをパイプライン キャンバスにドラッグします。 [ 設定 ] タブをクリックし、次の値を入力します。
注
セキュリティのベスト プラクティスとして、自動化されたツール、システム、スクリプト、アプリを使用して認証する場合、 Databricksでは、ワークスペース ユーザーではなく、サービス プリンシパルに属する個人のアクセス トークンを使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、 「サービスプリンシパルのトークンの管理」を参照してください。
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.<get-workspace-instance>
を交換してください。<pipeline-id>
をパイプライン識別子に置き換えます。方法: ドロップダウン メニューから [POST ] を選択します。
ヘッダー: [+ 新規] をクリックします。 名前 テキスト ボックスに、
Authorization
と入力します。値 テキスト ボックスに、Bearer <personal-access-token>
と入力します。<personal-access-token>
を Databricks の個人用アクセストークンに置き換えます。本文: 追加のリクエストパラメーターを渡すには、パラメーターを含むJSONドキュメントを入力します。 たとえば、更新を開始し、パイプラインのすべてのデータを再処理するには、次のようにします。
{"full_refresh": "true"}
. 追加の要求パラメーターがない場合は、空の中括弧 ({}
) を入力します。
Web アクティビティをテストするには、Data Factory UI のパイプライン ツール バーの [ デバッグ ] をクリックします。 実行の出力と状態 (エラーを含む) は、Azure Data Factory パイプラインの [ 出力 ] タブに表示されます。 Delta Live Tables UI を使用して、パイプラインの更新の詳細を表示します。
ヒント
一般的なワークフロー要件は、前のタスクの完了後にタスクを開始することです。 Delta Live Tables updates
要求は非同期であるため (要求は更新の開始後、更新が完了する前に返されます)、Delta Live Tables の更新に依存する Azure Data Factory パイプライン内のタスクは、更新が完了するまで待機する必要があります。 更新の完了を待機するオプションは、 Delta Live Tables 更新をトリガーする Web アクティビティの後に Until アクティビティ を追加することです。Until アクティビティで、次の操作を行います。
更新が完了するまで構成された秒数待機 する待機アクティビティ を追加します。
Wait アクティビティの後に、Delta Live Tables 更新の詳細要求を使用して更新のステータスを取得する Web アクティビティを追加します。 応答の
state
フィールドは、更新が完了したかどうかを含め、更新の現在の状態を返します。state
フィールドの値を使用して、Until アクティビティの終了条件を設定します。[ 変数の設定] アクティビティ を使用して、state
値に基づいてパイプライン変数を追加し、この変数を終了条件に使用することもできます。