Delta Live Tables のパイプライン設定を構成する

この記事では、 Delta Live Tablesのパイプライン設定の構成について詳しく説明します。 Delta Live Tables には、パイプライン設定を構成および編集するためのユーザー インターフェイスが用意されています。 UI には、JSON で設定を表示および編集するオプションもあります。

ほとんどの設定は、UI または JSON 仕様のいずれかを使用して構成できます。 一部の詳細オプションは、JSON 構成を使用してのみ使用できます。

Databricks では、UI を使用して Delta Live Tables の設定に慣れることをお勧めします。 必要に応じて、ワークスペースで JSON 構成を直接編集できます。 JSON 構成ファイルは、パイプラインを新しい環境にデプロイする場合、または CLI またはREST APIを使用する場合にも役立ちます。

Delta Live Tables JSON 構成設定の完全なリファレンスについては、「 Delta Live Tables パイプライン構成」を参照してください。

製品のエディションを選択する

パイプラインの要件に最適な機能を備えた Delta Live Tables 製品エディションを選択します。 次の製品エディションを使用できます。

  • Core ストリーミング取り込みワークロードを実行します。 パイプラインにチェンジデータキャプチャ (CDC) や Delta Live Tables の期待などの高度な機能が必要ない場合は、 Core エディションを選択します。

  • Pro ストリーミング取り込みとCDCワークロードを実行します。 Pro 製品エディションでは、 Core すべての機能に加えて、ソース データの変更に基づいてテーブルを更新する必要があるワークロードのサポートがサポートされています。

  • Advanced ストリーミング取り込みワークロード、CDC ワークロード、および期待値を必要とするワークロードを実行します。 Advanced 製品エディションは、 Core エディションと Pro エディションの機能をサポートし、Delta Live Tables の期待値に対するデータ品質制約の適用もサポートします。

パイプラインを作成または編集するときに、製品エディションを選択できます。 パイプラインごとに異なるエディションを選択できます。 Delta Live Tables 製品ページをご覧ください。

パイプラインに、選択した製品エディションでサポートされていない機能 (期待値など) が含まれている場合は、エラーの理由を示すエラー メッセージが表示されます。 その後、パイプラインを編集して適切なエディションを選択できます。

パイプライン モードを選択する

パイプラインは、継続的に更新することも、パイプライン モードに基づいて手動トリガーを使用して更新することもできます。 「継続的なパイプライン実行 とトリガーによるパイプライン実行」を参照してください。

クラスターポリシーを選択

ユーザーには、Delta Live Tables パイプラインを構成および更新するためにコンピュートをデプロイするアクセス許可が必要です。 ワークスペース管理者は、Delta Live Tables のコンピュート リソースへのアクセスをユーザーに提供できるようにクラスター ポリシーを構成できます。 「 Delta Live Tables パイプライン コンピュートの制限を定義する」を参照してください。

  • クラスターポリシーはオプションです。 Delta Live Tablesに必要なコンピュート権限がない場合は、ワークスペース管理者に確認してください。

  • クラスターポリシーのデフォルト値が正しく適用されるようにするには、パイプライン設定のクラスター設定apply_policy_default_values 値を true に設定します。

    {
      "clusters": [
        {
          "label": "default",
          "policy_id": "<policy-id>",
          "apply_policy_default_values": true
        }
      ]
    }
    

ソース コード ライブラリを構成する

Delta Live Tables UI のファイル セレクターを使用して、パイプラインを定義するソース コードを構成できます。パイプラインのソース コードは、Databricks ノートブック、またはワークスペース ファイルに格納されている SQL または Python スクリプトで定義されます。 パイプラインを作成または編集するときに、1 つ以上のノートブックまたはワークスペース ファイル、またはノートブックとワークスペース ファイルの組み合わせを追加できます。

Delta Live Tables はデータセットの依存関係を自動的に分析してパイプラインの処理グラフを構築するため、ソースコードライブラリを任意の順序で追加できます。

また、JSON ファイルを変更して、ワークスペース ファイルに保存されている SQL および Python スクリプトで定義された Delta Live Tables ソース コードを含めることもできます。 次の例には、ノートブックとワークスペース ファイルが含まれています。

{
  "name": "Example pipeline 3",
  "storage": "dbfs:/pipeline-examples/storage-location/example3",
  "libraries": [
    { "notebook": { "path": "/example-notebook_1" } },
    { "notebook": { "path": "/example-notebook_2" } },
    { "file": { "path": "/Workspace/Users/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.sql" } },
    { "file": { "path": "/Workspace/Users/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.py" } }
  ]
}

ストレージロケーションを指定する

Hive metastoreに発行するパイプラインの保存場所を指定できます。場所を指定する主な目的は、パイプラインによって書き込まれるデータのオブジェクトストレージの場所を制御することです。

Delta Live Tables パイプラインのすべてのテーブル、データ、チェックポイント、およびメタデータは Delta Live Tables によってフル管理されるため、Delta Live Tables データセットとのほとんどのやり取りは、 Hive metastore または Unity Catalogに登録されているテーブルを介して行われます。

パイプライン出力テーブルのターゲット スキーマを指定する

省略可能ですが、新しいパイプラインの開発とテストを超えるたびに、パイプラインによって作成されたテーブルを発行するターゲットを指定する必要があります。 パイプラインをターゲットに発行すると、Databricks 環境の他の場所でデータセットをクエリできるようになります。 「Delta Live Tables パイプラインか ら Hive metastor e にデータを発行する」または 「 Delta Live Table s パイプラインで Unity Catalog を使用す る 」を参照してください。

コンピュートの設定を構成する

各デルタ ライブ テーブル パイプラインには、次の 2 つのクラスターが関連付けられています。

  • updates クラスターは、パイプラインの更新を処理します。

  • maintenance cluster 実行 日常メンテナンスタスク.

これらのクラスターで使用される設定は、パイプライン設定で指定された clusters 属性によって決まります。

クラスターラベルを使用して、特定の種類のクラスターにのみ適用されるコンピュート設定を追加できます。 パイプライン クラスターを構成するときに使用できるラベルは 3 つあります。

クラスター構成を 1 つだけ定義する場合は、クラスター・ラベル設定を省略できます。 default ラベルの設定が指定されていない場合、ラベルはクラスター構成に適用されます。クラスターラベル設定は、クラスターの種類ごとに設定をカスタマイズする必要がある場合にのみ必要です。

  • default ラベルは、 updates クラスターと maintenance クラスターの両方に適用するコンピュート設定を定義します。両方のクラスターに同じ設定を適用すると、必要な構成 (ストレージの場所のデータ アクセス資格情報など) がメンテナンス クラスターに適用されるため、メンテナンス実行の信頼性が向上します。

  • maintenance ラベルは、 maintenance クラスターにのみ適用するコンピュート設定を定義します。maintenance ラベルを使用して、 default ラベルで構成された設定を上書きすることもできます。

  • updates ラベルは、 updates クラスターのみに適用する設定を定義します。updates ラベルを使用して、 maintenance クラスターに適用しない設定を構成します。

default ラベルと updates ラベルを使用して定義された設定がマージされ、 updates クラスターの最終的な構成が作成されます。default ラベルと updates ラベルの両方を使用して同じ設定が定義されている場合、 updates ラベルで定義された設定は、 default ラベルで定義された設定をオーバーライドします。

次の例では、 updates クラスターの構成にのみ追加される Spark 構成パラメーターを定義します。

{
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    },
    {
      "label": "updates",
      "spark_conf": {
         "key": "value"
      }
    }
  ]
}

Delta Live Tables には、Databricks 上の他のコンピュートと同様のクラスター設定のオプションが用意されています。 他のパイプライン設定と同様に、クラスターの JSON 構成を変更して、UI に存在しないオプションを指定できます。 「 コンピュート」を参照してください。

  • Delta Live Tables ランタイムはパイプライン クラスターのライフサイクルを管理し、 Databricks Runtimeのカスタム バージョンを実行するため、Spark バージョンやクラスター名など、パイプライン構成で一部のクラスター設定を手動で設定することはできません。 ユーザーが設定できないクラスター属性を参照してください。

  • Delta Live Tables パイプラインは、Photon を利用するように構成できます。 Photonとはを参照してください

パイプラインを実行するインスタンスタイプの選択

デフォルトでは、Delta Live Tables によってパイプラインを実行するドライバー ノードとワーカー ノードのインスタンス タイプが選択されますが、インスタンス タイプを手動で構成することもできます。 たとえば、インスタンスタイプを選択して、パイプラインのパフォーマンスを向上させたり、パイプラインの実行時にメモリの問題に対処したりできます。 インスタンスの種類は、REST API を使用してパイプライン を作成 または 編集 するとき、または Delta Live Tables UI で構成できます。

Delta Live Tables UI でパイプラインを作成または編集するときにインスタンスタイプを設定するには:

  1. [ 設定 ]ボタンをクリックします。

  2. パイプライン設定の[詳細]セクションの[ワーカー タイプ]および[ドライバー タイプ]ドロップダウン メニューで、パイプラインのインスタンス タイプを選択します。

パイプラインの JSON 設定でインスタンス タイプを構成するには、 [JSON]ボタンをクリックし、クラスター構成にインスタンス タイプの構成を入力します。

maintenance クラスターに不要なリソースが割り当てられないようにするために、この例では updates ラベルを使用して、updatesクラスターのみのインスタンスタイプを設定します。インスタンスタイプを updates クラスターと maintenance クラスターの両方に割り当てるには、 default ラベルを使用するか、ラベルの設定を省略します。 default ラベルは、ラベルの設定が指定されていない場合、パイプライン クラスター構成に適用されます。「コンピュートの設定」を参照してください

{
  "clusters": [
    {
      "label": "updates",
      "node_type_id": "n1-highmem-16",
      "driver_node_type_id": "n1-standard-4",
      "..." : "..."
    }
  ]
}

オートスケールを使用して効率を高め、リソース使用量 を削減します

拡張オートスケールを使用して、パイプラインのクラスター使用率を最適化します。 拡張オートスケールは、システムがそれらのリソースによってパイプライン処理速度が向上すると判断した場合にのみ、追加のリソースを追加します。 リソースは不要になると解放され、すべてのパイプラインの更新が完了するとすぐにクラスターはシャットダウンされます。

構成の詳細を含む拡張オートスケールの詳細については、「拡張オートスケールを使用してDelta Live Tables Pipeline のクラスター使用率を最適化する」を参照してください。

コンピュートのシャットダウンを遅らせる

Delta Live Tables クラスターは使用されていないときに自動的にシャットダウンするため、クラスター構成で autotermination_minutes を設定するクラスターポリシーを参照するとエラーが発生します。 クラスターのシャットダウン動作を制御するには、開発モードまたは実稼働モードを使用するか、パイプライン設定で pipelines.clusterShutdown.delay 設定を使用します。 次の例では、 pipelines.clusterShutdown.delay 値を 60 秒に設定します。

{
    "configuration": {
      "pipelines.clusterShutdown.delay": "60s"
    }
}

production モードが有効になっている場合、 pipelines.clusterShutdown.delay のデフォルト値は 0 secondsです。development モードが有効になっている場合、デフォルト値は 2 hoursです。

単一ノード クラスター を作成する

クラスター設定でnum_workersを 0 に設定すると、クラスターは単一ノード クラスターとして作成されます。 オートスケール クラスターを構成し、 min_workersを 0 に設定し、 max_workers 0 に設定すると、単一ノード クラスターも作成されます。

オートスケール クラスターを構成し、 min_workers のみを 0 に設定すると、クラスターは単一ノード クラスターとして作成されません。 クラスターには、終了するまで常に少なくとも 1 つのアクティブなワーカーがあります。

Delta Live Tablesで単一ノードクラスタを作成するためのクラスタ設定の例:

{
    "clusters": [
      {
        "num_workers": 0
      }
    ]
}

クラスタータグの設定

クラスタータグを使用して、パイプライン クラスター の使用状況を監視できます。 パイプラインを作成または編集するとき、またはパイプライン クラスターの JSON 設定を編集して、Delta Live Tables UI でクラスタータグを追加します。

クラウドストレージ構成

Google Cloud Storage(GCS)のバケットにアクセスするには、そのGCSバケットにアクセスできるサービスアカウントを作成し、そのサービスアカウントをクラスター設定に追加する必要があります。 Google クラウド サービス アカウントの作成の詳細については、「 Google クラウド ストレージに接続する」を参照してください。 サービス アカウント構成は、Delta Live Tables API または Delta Live Tables UI を使用してパイプライン を作成 または 編集 するときに追加できます。

  1. パイプラインの [パイプラインの詳細 ] ページで、[ 設定 ] ボタンをクリックします。 [パイプライン設定] ページが表示されます。

  2. [ JSON] ボタンをクリックします。

  3. クラスター構成の gcp_attributes.google_service_account フィールドにサービス アカウント構成を入力します。

{
  "clusters": [
    {
      "gcp_attributes": {
        "google_service_account": "test-gcs-doc@databricks-dev.iam.gserviceaccount.com"
      }
    }
  ]
}

パイプラインのパラメーター化

データセットを定義する Python および SQL コードは、パイプラインの設定によってパラメーター化できます。 パラメーター化により、次のユース ケースが可能になります。

  • 長いパスやその他の変数をコードから分離する。

  • 開発環境またはステージング環境で処理されるデータの量を減らして、テストを高速化します。

  • 同じ変換ロジックを再利用して、複数の Data から処理する。

次の例では、 startDate 構成値を使用して、開発パイプラインを入力データのサブセットに制限します。

CREATE OR REFRESH LIVE TABLE customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

パイプラインのトリガー間隔

pipelines.trigger.interval を使用して、テーブルまたはパイプライン全体を更新するフローのトリガー間隔を制御できます。トリガーされたパイプラインは各テーブルを 1 回だけ処理するため、 pipelines.trigger.interval は連続パイプラインでのみ使用されます。

Databricks では、ストリーミングとバッチのクエリのデフォルトが異なるため、個々のテーブルに pipelines.trigger.interval を設定することをお勧めします。 パイプラインの値を設定するのは、処理でパイプライン グラフ全体の更新を制御する必要がある場合のみです。

テーブルに pipelines.trigger.interval を設定するには、Python で spark_conf 、または SQL で SET を使用します。

@dlt.table(
  spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
    return (<query>)
SET pipelines.trigger.interval=10 seconds;

CREATE OR REFRESH LIVE TABLE TABLE_NAME
AS SELECT ...

パイプラインに pipelines.trigger.interval を設定するには、パイプライン設定で configuration オブジェクトに追加します。

{
  "configuration": {
    "pipelines.trigger.interval": "10 seconds"
  }
}

管理者以外のユーザーが Unity カタログ対応パイプラインからドライバー ログを表示できるようにする

デフォルトでは、パイプライン所有者とワークスペース管理者のみが、Unity カタログ対応パイプラインを実行するクラスターからドライバー ログを表示する権限を持っています。 パイプライン設定の オブジェクトに次の 構成を追加することでCAN MANAGE 、 、CAN VIEW 、またはCAN RUN 権限をSparkconfiguration 持つ任意のユーザーのドライバー ログへのアクセスを有効にできます。

{
  "configuration": {
    "spark.databricks.acl.needAdminPermissionToViewLogs": "false"
  }
}

パイプラインイベントのEメール 通知を追加する

次の場合に通知を受信するように 1 つ以上の Eメール アドレスを設定できます。

  • パイプラインの更新が正常に完了します。

  • パイプラインの更新は、再試行可能または再試行不可能なエラーで失敗します。 すべてのパイプライン障害の通知を受信するには、このオプションを選択します。

  • パイプラインの更新は、再試行できない (致命的な) エラーで失敗します。 このオプションを選択すると、再試行できないエラーが発生したときにのみ通知を受け取ります。

  • 1 つのデータ フローが失敗します。

パイプライン を作成 または編集するときに電子メール通知を構成するには、次のようにします。

  1. [ 通知を追加] をクリックします。

  2. 通知を受信する 1 つ以上の Eメール アドレスを入力します。

  3. 設定したEメールアドレスに送信する通知の種類ごとにチェックボックスをクリックします。

  4. [ 通知を追加] をクリックします。

SCD タイプ 1 の廃棄標識管理の制御 クエリー

以下の設定を使用して、SCD タイプ 1 処理中の DELETE イベントの廃棄管理の動作を制御できます。

  • pipelines.applyChanges.tombstoneGCThresholdInSeconds: この値は、順不同データ間の予想される最大間隔 (秒単位) と一致するように設定します。デフォルトは 172800 秒 (2 日) です。

  • pipelines.applyChanges.tombstoneGCFrequencyInSeconds: この設定は、廃棄標識のクリーンアップをチェックする頻度を秒単位で制御します。デフォルトは 1800 秒 (30 分) です。

APPLY CHANGES API参照してください: Delta Live Tablesでのチェンジデータキャプチャの簡素化

パイプラインの権限を構成する

パイプラインの権限を管理するには、パイプラインに対するCAN MANAGEまたはIS OWNER権限が必要です。

  1. サイドバーで、 「Delta Live Tables」をクリックします。

  2. パイプラインの名前を選択します。

  3. ケバブメニュー 垂直省略記号をクリックし、[ 権限]を選択します。

  4. [権限設定]で、 [ユーザー、グループ、またはサービス プリンシパルの選択]ドロップダウン メニューを選択し、ユーザー、グループ、またはサービス プリンシパルを選択します。

    [権限設定] ダイアログ
  5. 権限のドロップダウンメニューから権限を選択します。

  6. 追加」をクリックします。

  7. 保存」をクリックします。

RocksDBの 状態ストアを有効にするDelta Live Tables

パイプラインをデプロイする前に次の構成を設定することで、RocksDB ベースの状態管理を有効にできます。

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

RocksDB の構成に関する推奨事項など、RocksDB 状態ストアの詳細については、 「Databricks で RocksDB 状態ストアを構成する」を参照してください。