Unity Catalogパイプライン をクローンしてHive metastore パイプラインを作成する

プレビュー

Delta Live Tables Rest API の clone a pipeline 要求は パブリック プレビュー段階です。

この記事では、 Databricks Rest API の clone a pipeline 要求と、それを使用して Hive metastore に発行する既存のパイプラインを Unity Catalogに発行する新しいパイプラインにコピーする方法について説明します。 clone a pipeline要求を呼び出すと、次の処理が行われます。

  • 既存のパイプラインから新しいパイプラインにソース コードと設定をコピーし、指定した設定の上書きを適用します。

  • マテリアライズド ビューとストリーミング テーブルの定義と参照を、Unity Catalog によって管理されるオブジェクトに必要な変更を加えて更新します。

  • パイプラインの更新を開始して、パイプライン内の任意のストリーミングテーブルの既存のデータとメタデータ (チェックポイントなど) を移行します。 これにより、これらのストリーミングテーブルは、元のパイプラインと同じポイントで処理を再開できます。

クローン操作が完了すると、元のパイプラインと新しいパイプラインの両方を個別に実行できます。

この記事には、API 要求を直接呼び出す例と、Databricks ノートブックから Python スクリプトを使用して呼び出す例が含まれています。

始める前に

パイプラインをクローニングする前に、次のものが必要です。

  • Hive metastore パイプラインをクローンするには、パイプラインで定義されたテーブルとビューがテーブルをターゲットスキーマに公開する必要があります。ターゲット スキーマをパイプラインに追加する方法については、「Delta Live Tables データセットを従来のHive metastoreに公開する方法」を参照してください。

  • クローンを作成するパイプライン内の Hive metastore マネージドテーブルまたはビューへの参照は、カタログ (hive_metastore)、スキーマ、およびテーブル名で完全修飾されている必要があります。 たとえば、次のコードで customers データセットを作成する場合は、テーブル名引数を次のように更新する必要がありますhive_metastore.sales.customers

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • クローン操作の進行中は、ソース Hive metastore パイプラインのソース コード (パイプラインの一部として構成されたノートブックや、 Git フォルダーまたはワークスペース ファイルに格納されているモジュールなど) を編集しないでください。

  • クローン操作を開始するときには、ソース Hive metastore パイプラインが実行されていてはなりません。 更新が実行中の場合は、更新を停止するか、完了するまで待ちます。

パイプラインをクローニングする前のその他の重要な考慮事項を次に示します。

  • Hive metastore パイプラインのテーブルで、Python の path 引数または SQLの LOCATION を使用してストレージの場所を指定している場合は、"pipelines.migration.ignoreExplicitPath": "true"設定をクローンリクエストに渡します。この設定については、以下の手順を参照してください。

  • Hive metastoreパイプラインに オプションの値を指定するAuto Loader ソースcloudFiles.schemaLocation Hive metastoreが含まれており、Unity Catalog クローンの作成後も パイプラインが動作し続ける場合は、mergeSchema trueHive metastoreパイプラインとクローン作成されたUnity Catalog パイプラインの両方で オプションを に設定する必要があります。クローンを作成する前にこのオプションを Hive metastore パイプラインに追加すると、オプションが新しいパイプラインにコピーされます。

Databricks REST API を使用してパイプラインを複製する

次の例では、 curl コマンドを使用して、Databricks Rest API で clone a pipeline 要求を呼び出します。

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

以下のように置き換えてください。

clone-パイプライン。JSON:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

以下のように置き換えてください。

  • <target-catalog-name> を、新しいパイプラインの発行先となる Unity Catalog のカタログの名前に置き換えます。 これは既存のカタログである必要があります。

  • <target-schema-name> を、新しいパイプラインが現在のスキーマ名と異なる場合に発行する必要がある Unity Catalog 内のスキーマの名前に置き換えます。 このパラメーターはオプションであり、指定しない場合は、既存のスキーマ名が使用されます。

  • <new-pipeline-name> を新しいパイプラインのオプションの名前に置き換えます。 指定しない場合、新しいパイプラインには、ソース パイプライン名に [UC] を付加した名前が付けられます。

clone_mode クローン操作に使用するモードを指定します。 サポートされているオプションは MIGRATE_TO_UC のみです。

configuration フィールドを使用して、新しいパイプラインの構成を指定します。ここで設定した値は、元のパイプラインの設定を上書きします。

clone REST API 要求からの応答は、新しい Unity Catalog パイプラインのパイプライン ID です。

Databricks ノートブックからパイプラインを複製する

次の例では、Python スクリプトから create a pipeline リクエストを呼び出します。 Databricks ノートブックを使用して、このスクリプトを実行できます。

  1. スクリプトの新しいノートブックを作成します。 「ノートブックを作成する」を参照してください

  2. 次の Python スクリプトをノートブックの最初のセルにコピーします。

  3. スクリプト内のプレースホルダー値を更新するには、以下を置き換えます。

    • <databricks-instance> を Databricks ワークスペース インスタンス名に置き換えます (例: 1234567890123456.7.gcp.databricks.com

    • <pipeline-id> をクローン Hive metastore パイプラインの一意の識別子に置き換えます。 パイプライン ID は、 Delta Live Tables UI で確認できます。

    • <target-catalog-name> を、新しいパイプラインの発行先となる Unity Catalog のカタログの名前に置き換えます。 これは既存のカタログである必要があります。

    • <target-schema-name> を、新しいパイプラインが現在のスキーマ名と異なる場合に発行する必要がある Unity Catalog 内のスキーマの名前に置き換えます。 このパラメーターはオプションであり、指定しない場合は、既存のスキーマ名が使用されます。

    • <new-pipeline-name> を新しいパイプラインのオプションの名前に置き換えます。 指定しない場合、新しいパイプラインには、ソース パイプライン名に [UC] を付加した名前が付けられます。

  4. スクリプトを実行します。 「Databricks ノートブックの実行」を参照してください。

import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode in this preview
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()


def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS


    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

制限事項

Delta Live Tables clone a pipeline API 要求の制限事項を次に示します。

  • Hive metastoreを使用するように設定されたパイプラインからUnity Catalogパイプラインへのクローニングのみがサポートされています。

  • クローンを作成できるのは、クローン作成元のパイプラインと同じ Databricks ワークスペースのみです。

  • クローンを作成するパイプラインには、次のストリーミングソースのみを含めることができます。

  • クローニングしている Hive metastore パイプラインで Auto Loader ファイル通知モードを使用している場合は Databricks 、クローニング後に Hive metastore パイプラインを実行しないことをお勧めします。 これは、 Hive metastore パイプラインを実行すると、 Unity Catalog クローンから一部のファイル通知イベントが削除されるためです。 クローン操作の完了後にソース Hive metastore パイプラインが実行される場合は、cloudFiles.backfillInterval オプションを指定した Auto Loader を使用して、不足しているファイルをバックフィルできます。 Auto Loader ファイル通知モードの詳細については、「Auto Loader ファイル通知モードとは」を参照してください。Auto Loaderを使用したファイルのバックフィルについては、「cloudFiles.backfillInterval を使用して定期的なバックフィルをトリガーする」を参照してください。 および 一般的な Auto Loader オプション

  • クローン作成の進行中は、両方のパイプラインでパイプライン メンテナンス タスクが自動的に停止されます。

  • クローンされた Unity Catalog パイプラインのテーブルに対するタイムトラベルクエリには、以下が適用されます。

    • テーブルバージョンが最初に Hive metastore 管理オブジェクトに書き込まれた場合、クローニングされた Unity Catalog オブジェクトをクエリするときに、timestamp_expression 句を使用したタイムトラベルクエリは未定義です。

    • ただし、テーブルバージョンがクローン化された Unity Catalog オブジェクトに書き込まれた場合、timestamp_expression 句を使用したタイムトラベルクエリは正しく機能します。

    • version句を使用したタイムトラベルクエリは、クローンされた Unity Catalog オブジェクトをクエリするときに、バージョンが最初に Hive metastore 管理オブジェクトに書き込まれた場合でも正しく機能します。

  • Delta Live Tables を Unity Catalog と共に使用する場合のその他の制限事項については、「 Unity Catalog パイプラインの制限事項」を参照してください。

  • Unity Catalog の制限事項については、「 Unity Catalog の制限事項」を参照してください。