Apache Airflow で Databricksジョブをオーケストレーション

この記事では、ApacheAirflow を使用してデータ パイプラインをオーケストレーションするためのDatabricks Airflowサポートについて説明し、 ローカルにインストールして構成する手順を示し、 を使用してDatabricks ワークフローをデプロイおよび実行する例を示します。Airflow

データパイプラインでのジョブオーケストレーション

データ処理パイプラインの開発と展開では、多くの場合、タスク間の複雑な依存関係の管理が必要になります。 たとえば、パイプラインはソースからデータを読み取り、データをクリーンアップし、クリーンアップされたデータを変換し、変換されたデータをターゲットに書き込みます。 パイプラインを運用化するときに、テスト、スケジュール設定、エラーのトラブルシューティングのサポートも必要です。

ワークフロー システムは、タスク間の依存関係を定義し、パイプラインを実行するタイミングをスケジュールし、ワークフローを監視できるようにすることで、これらの課題に対処します。 Apache Airflow は、データパイプラインを管理およびスケジューリングするためのオープンソースソリューションです。 Airflowは、データパイプラインを操作の有向非巡回グラフ (DAG) として表します。 Python ファイルでワークフローを定義すると、Airflow がスケジュールと実行を管理します。 Airflow Databricks 接続を使用すると、Databricks が提供する最適化された Spark エンジンを Airflow のスケジュール機能で活用できます。

要件

  • Airflow と Databricks の統合には、Airflow バージョン 2.5.0 以降が必要です。 この記事の例は、Airflow バージョン 2.6.1 でテストされています。

  • Airflow には Python 3.8、3.9、3.10、または 3.11 が必要です。 この記事の例は Python 3.8 でテストされています。

  • この記事の Airflow をインストールして実行する手順では、 Python 仮想環境 を作成するために pipenv が 必要です。

DatabricksのAirflowオペレータ

Airflow DAG はタスクで構成され、各タスクはAirflow Operator を実行します。 Databricks への統合をサポートする Airflow オペレーターは、 Databricks プロバイダーに実装されています。

Databricks プロバイダーには、 テーブルへのデータのインポートSQL クエリの実行Databricks Git フォルダーの操作など、Databricks ワークスペースに対してさまざまなタスクを実行するための演算子が含まれています。

Databricks プロバイダーは、ジョブをトリガーするための 2 つの演算子を実装します。

新しい Databricks ジョブを作成したり、既存のジョブをリセットしたりするために、Databricks プロバイダーはDatabricksCreateJobsOperatorを実装します。 DatabricksCreateJobsOperatorPOST /api/2.1/Job/createを使用します。 そしてPOST /api/2.1/Job/reset API リクエスト。 DatabricksCreateJobsOperatorDatabricksRunNowOperatorを組み合わせてジョブを作成し、実行することができます。

注:

Databricks オペレーターを使用してジョブをトリガーするには、Databricks 接続構成で資格情報を提供する必要があります。 「 Databricks用の パーソナル アクセスの作成」を 参照してください。Airflow

Databricks Airflow オペレーターは、ジョブ実行ページの URL をpolling_period_secondsごとに Airflow ログに書き込みます (デフォルトは 30 秒)。 詳細については、 Web サイトの apache-Airflow providers-databricks パッケージ ページを参照してください。Airflow

Airflow Databricksインテグレーションをローカルにインストールする

テストと開発のためにAirflowとDatabricksプロバイダーをローカルにインストールするには、次のステップを使用します。 インストールのその他のオプションAirflow (本番運用インストールの作成を含む) については、Airflow ドキュメントの インストール を参照してください。

ターミナルを開き、次のコマンドを実行します。

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

<firstname><lastname><email>をユーザー名とEメールで置き換えます。 管理者ユーザーのパスワードを入力するように求められます。 このパスワードは Airflow UI にログインするために必要なので、必ず保存してください。

このスクリプトは次のステップを実行します。

  1. airflow という名前のディレクトリを作成し、そのディレクトリに変更します。

  2. pipenvを使用して、Python 仮想環境を作成して生成します。 Databricks では、パッケージ バージョンとコードの依存関係をその環境に分離するために、Python 仮想環境を使用することをお勧めします。 この分離により、予期しないパッケージ バージョンの不一致やコード依存関係の競合を減らすことができます。

  3. airflowディレクトリのパスに設定されたAIRFLOW_HOMEという名前の環境変数を初期化します。

  4. Airflow および Airflow Databricks プロバイダー パッケージをインストールします。

  5. airflow/dagsディレクトリを作成します。Airflow はdagsディレクトリを使用して DAG 定義を保存します。

  6. Airflow がメタデータを追跡するために使用する SQLite データベースを初期化します。 本番運用Airflowデプロイメントでは、標準データベースを使用してAirflowを構成します。 Airflow デプロイメントの SQLite データベースとデフォルト構成は、 airflowディレクトリで初期化されます。

  7. Airflow の管理者ユーザーを作成します。

ヒント

Databricks プロバイダーのインストールを確認するには、Airflow インストール ディレクトリで次のコマンドを実行します。

airflow providers list

Airflowウェブサーバーとスケジューラを起動します

Airflow UI を表示するには、Airflow Web サーバーが必要です。 Web サーバーを起動するには、Airflow インストール ディレクトリでターミナルを開き、次のコマンドを実行します。

注:

ポートの競合により Airflow Web サーバーの起動に失敗した場合は、 Airflow 構成でデフォルト ポートを変更できます。

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

スケジューラは、DAG をスケジュールする Airflow コンポーネントです。 スケジューラを起動するには、Airflow インストール ディレクトリで新しいターミナルを開き、次のコマンドを実行します。

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Airflowのインストールをテストします

Airflowのインストールを確認するには、Airflowに含まれている DAG の例のいずれかを実行します。

  1. ブラウザ ウィンドウで、 http://localhost:8080/homeを開きます。 Airflow のインストール時に作成したユーザー名とパスワードを使用して、Airflow UI にログインします。 Airflow DAGページが表示されます。

  2. DAG の停止/一時停止解除トグルをクリックして、サンプル DAG の 1 つ (例: example_python_operatorを一時停止解除します。

  3. DAG のトリガー 」ボタンをクリックして、サンプルの DAG をトリガーします。

  4. DAG 名をクリックすると、DAG の実行ステータスなどの詳細が表示されます。

Airflow 用の Databricks 個人用アクセストークンを作成する

Airflow 、 Databricks個人アクセス権 (PAT) を使用してDatabricksに接続します。 PAT を作成するには:

  1. Databricks ワークスペースで、上部のバーにある Databricks ユーザー名をクリックし、ドロップダウンから[設定]を選択します。

  2. [開発者] をクリックします。

  3. [アクセス]の横にある[管理]をクリックします。

  4. 「新しいトークンの生成」をクリックします。

  5. (任意)今後このトークンを識別するのに役立つコメントを入力し、トークンのデフォルトの有効期間である90日を変更します。有効期間のないトークンを作成するには(非推奨)、[有効期間 (日) ] ボックスを空白のままにしてください。

  6. [生成] をクリックします。

  7. 表示されたトークンを安全な場所にコピーし、[完了] をクリックします。

注:

コピーしたトークンは必ず安全な場所に保存してください。 コピーしたトークンを他の人と共有しないでください。 コピーしたトークンを紛失した場合、まったく同じトークンを再生成することはできません。 代わりに、この手順を繰り返して新しいトークンを作成する必要があります。 コピーしたウイルスを紛失した場合、またはウイルスが侵害されたと思われる場合、 Databricks 、ウイルス感染ページのウイルス感染の横にあるゴミ箱 (取り消し) アイコンをクリックして、そのウイルスをワークスペースから直ちに削除することを強くお勧めします。

ワークスペースでトークンを作成または使用できない場合は、ワークスペース管理者がトークンを無効にしているか、トークンを作成または使用する権限を付与していない可能性があります。 ワークスペース管理者または次のトピックを参照してください。

注:

セキュリティのベスト プラクティスとして、自動化されたツール、システム、スクリプト、アプリを使用して認証する場合、 Databricks 、ワークスペース ユーザーではなくサービス プリンシパルに属する個人のアクセス権を使用することをお勧めします。 サービスプリンシパル用のウイルスを作成するには、 「サービスプリンシパル用のウイルスの管理」を参照してください。

Databricks 接続を構成する

Airflowのインストールには、Databricks のデフォルト接続が含まれています。 上記で作成した個人用アクセストークンを使用してワークスペースに接続するように接続を更新するには:

  1. ブラウザ ウィンドウで、 http://localhost:8080/connection/list/を開きます。 ログインを求められたら、管理者のユーザー名とパスワードを入力します。

  2. [Conn ID][安全]を見つけ、 [レコードの編集]ボタンをクリックします。

  3. ホストフィールドの値を、Databricks デプロイメントのワークスペース インスタンス名(例: https://adb-123456789.cloud.databricks.comに置き換えます。

  4. [パスワード]フィールドに、 Databricksの個人アクセス権を入力します。

  5. [保存]をクリックします。

例: Airflow DAG を作成して Databricks ジョブを実行する

次の例は、ローカル マシンで実行され、サンプル DAG をデプロイして Databricks で実行をトリガーする単純な Airflow デプロイを作成する方法を示しています。 この例では、次のことを行います。

  1. 新しいノートブックを作成し、構成された問題に基づいて挨拶を印刷するコードを追加します。

  2. ノートブックを実行する単一のタスクを含む Databricks ジョブを作成します。

  3. Databricks ワークスペースへのAirflow接続を構成します。

  4. ノートブック ジョブをトリガーするAirflow DAG を作成します。 Python スクリプトで DAG を定義するには、 DatabricksRunNowOperatorを使用します。

  5. Airflow UI を使用して DAG をトリガーし、実行状態を表示します。

ノートブックを作成する

この例では、2 つのセルを含むノートブックを使用します。

  • 最初のセルには、当然の値 world に設定された greeting という名前の変数を定義するDatabricksテキスト ウィジェットが含まれています。

  • 2 番目のセルは、プレフィックス helloが付いた greeting 変数の値を出力します。

ノートブックを作成するには:

  1. Databricksワークスペースに移動し、クリックします新しいアイコンサイドバーで「新規」をクリックし、 「ノートブック」を選択します。

  2. ノートブックに「Hello Airflow 」などの名前を付け、デフォルトの言語が「Python」に設定されていることを確認します。

  3. 次のPythonコードをコピーして、ノートブックの最初のセルに貼り付けます。

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. 最初のセルの下に新しいセルを追加し、次の Python コードをコピーして新しいセルに貼り付けます。

    print("hello {}".format(greeting))
    

ジョブの作成

  1. サイドバーのワークフローアイコンワークフロー]をクリックします。

  2. [ 「ジョブを作成」ボタン] をクリックします。

    「タスク」タブが表示され、タスクの作成ダイアログが表示されます。

    最初のタスクダイアログの作成
  3. Add a name for your job...(ジョブの名前の追加)をジョブ名に置き換えてください。

  4. [タスク名]フィールドにタスクの名前を入力します (たとえば、 greeting-タスク)

  5. [タイプ]ドロップダウン メニューで、 [ノートブック]を選択します。

  6. [ソース]ドロップダウン メニューで、 [ワークスペース]を選択します。

  7. [パス] テキスト ボックスをクリックし、ファイル ブラウザーを使用して作成したノートブックを見つけ、ノートブック名をクリックして、 [確認] をクリックします。

  8. パラメーターの下の追加をクリックします。キーフィールドにgreetingを入力します。フィールドにAirflow userを入力します。

  9. タスクを作成」をクリックします。

ジョブの詳細パネルで、ジョブ ID の値をコピーします。 この値は、Airflow からジョブをトリガーするために必要です。

ジョブの実行

Databricks ジョブ UI で新しいジョブをテストするには、右上隅の をクリックします 「今すぐ実行」ボタン 。 実行が完了したら、 ジョブ実行の詳細を表示して出力を確認できます。

新しいAirflow DAGを作成する

Airflow DAG は Python ファイルで定義します。 ノートブック ジョブの例をトリガーする DAG を作成するには:

  1. テキストエディタまたは IDE で、次の内容で databricks_dag.py という名前の新しいファイルを作成します。

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    JOB_IDを、先ほど保存したジョブ ID の値に置き換えます。

  2. ファイルを airflow/dags ディレクトリに保存します。 Airflowは、 airflow/dags/に保存されている DAG ファイルを自動的に読み取り、インストールします。

Airflowに DAG をインストールして確認します

Airflow UIでDAGをトリガーして確認するには、次の手順を実行します。

  1. ブラウザ ウィンドウで、 http://localhost:8080/homeを開きます。 Airflow DAG画面が表示されます。

  2. databricks_dagを見つけて、 [DAG を停止する/一時停止解除]トグルをクリックして、DAG の一時停止を解除します。

  3. [ Trigger DAG ] ボタンをクリックして DAG をトリガーします。

  4. 「実行」列の実行をクリックすると、実行のステータスと詳細が表示されます。