Unity Catalogパイプライン をクローンしてHive metastore パイプラインを作成する
プレビュー
Delta Live Tables Rest API の clone a pipeline
要求は パブリック プレビュー段階です。
この記事では、 Databricks Rest API の clone a pipeline
要求と、それを使用して Hive metastore に発行する既存のパイプラインを Unity Catalogに発行する新しいパイプラインにコピーする方法について説明します。 clone a pipeline
要求を呼び出すと、次の処理が行われます。
既存のパイプラインから新しいパイプラインにソース コードと設定をコピーし、指定した設定の上書きを適用します。
マテリアライズド ビューとストリーミング テーブルの定義と参照を、Unity Catalog によって管理されるオブジェクトに必要な変更を加えて更新します。
パイプラインの更新を開始して、パイプライン内の任意のストリーミングテーブルの既存のデータとメタデータ (チェックポイントなど) を移行します。 これにより、これらのストリーミングテーブルは、元のパイプラインと同じポイントで処理を再開できます。
クローン操作が完了すると、元のパイプラインと新しいパイプラインの両方を個別に実行できます。
この記事には、API 要求を直接呼び出す例と、Databricks ノートブックから Python スクリプトを使用して呼び出す例が含まれています。
始める前に
パイプラインをクローニングする前に、次のものが必要です。
Hive metastore パイプラインをクローンするには、パイプラインで定義されたテーブルとビューがテーブルをターゲットスキーマに公開する必要があります。ターゲット スキーマをパイプラインに追加する方法については、「Delta Live Tables データセットを従来のHive metastoreに公開する方法」を参照してください。
クローンを作成するパイプライン内の Hive metastore マネージドテーブルまたはビューへの参照は、カタログ (
hive_metastore
)、スキーマ、およびテーブル名で完全修飾されている必要があります。 たとえば、次のコードでcustomers
データセットを作成する場合は、テーブル名引数を次のように更新する必要がありますhive_metastore.sales.customers
@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
クローン操作の進行中は、ソース Hive metastore パイプラインのソース コード (パイプラインの一部として構成されたノートブックや、 Git フォルダーまたはワークスペース ファイルに格納されているモジュールなど) を編集しないでください。
クローン操作を開始するときには、ソース Hive metastore パイプラインが実行されていてはなりません。 更新が実行中の場合は、更新を停止するか、完了するまで待ちます。
パイプラインをクローニングする前のその他の重要な考慮事項を次に示します。
Hive metastore パイプラインのテーブルで、Python の
path
引数または SQLのLOCATION
を使用してストレージの場所を指定している場合は、"pipelines.migration.ignoreExplicitPath": "true"
設定をクローンリクエストに渡します。この設定については、以下の手順を参照してください。Hive metastoreパイプラインに オプションの値を指定するAuto Loader ソース
cloudFiles.schemaLocation
Hive metastoreが含まれており、Unity Catalog クローンの作成後も パイプラインが動作し続ける場合は、mergeSchema
true
Hive metastoreパイプラインとクローン作成されたUnity Catalog パイプラインの両方で オプションを に設定する必要があります。クローンを作成する前にこのオプションを Hive metastore パイプラインに追加すると、オプションが新しいパイプラインにコピーされます。
Databricks REST API を使用してパイプラインを複製する
次の例では、 curl
コマンドを使用して、Databricks Rest API で clone a pipeline
要求を呼び出します。
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
以下のように置き換えてください。
<personal-access-token>
を Databricks 個人用アクセス トークンに置き換えます。<databricks-instance>
を Databricks ワークスペース インスタンス名に置き換えます (例:1234567890123456.7.gcp.databricks.com
<pipeline-id>
をクローン Hive metastore パイプラインの一意の識別子に置き換えます。 パイプライン ID は、 Delta Live Tables UI で確認できます。
clone-パイプライン。JSON:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
以下のように置き換えてください。
<target-catalog-name>
を、新しいパイプラインの発行先となる Unity Catalog のカタログの名前に置き換えます。 これは既存のカタログである必要があります。<target-schema-name>
を、新しいパイプラインが現在のスキーマ名と異なる場合に発行する必要がある Unity Catalog 内のスキーマの名前に置き換えます。 このパラメーターはオプションであり、指定しない場合は、既存のスキーマ名が使用されます。<new-pipeline-name>
を新しいパイプラインのオプションの名前に置き換えます。 指定しない場合、新しいパイプラインには、ソース パイプライン名に[UC]
を付加した名前が付けられます。
clone_mode
クローン操作に使用するモードを指定します。 サポートされているオプションは MIGRATE_TO_UC
のみです。
configuration
フィールドを使用して、新しいパイプラインの構成を指定します。ここで設定した値は、元のパイプラインの設定を上書きします。
clone
REST API 要求からの応答は、新しい Unity Catalog パイプラインのパイプライン ID です。
Databricks ノートブックからパイプラインを複製する
次の例では、Python スクリプトから create a pipeline
リクエストを呼び出します。 Databricks ノートブックを使用して、このスクリプトを実行できます。
スクリプトの新しいノートブックを作成します。 「ノートブックを作成する」を参照してください。
次の Python スクリプトをノートブックの最初のセルにコピーします。
スクリプト内のプレースホルダー値を更新するには、以下を置き換えます。
<databricks-instance>
を Databricks ワークスペース インスタンス名に置き換えます (例:1234567890123456.7.gcp.databricks.com
<pipeline-id>
をクローン Hive metastore パイプラインの一意の識別子に置き換えます。 パイプライン ID は、 Delta Live Tables UI で確認できます。<target-catalog-name>
を、新しいパイプラインの発行先となる Unity Catalog のカタログの名前に置き換えます。 これは既存のカタログである必要があります。<target-schema-name>
を、新しいパイプラインが現在のスキーマ名と異なる場合に発行する必要がある Unity Catalog 内のスキーマの名前に置き換えます。 このパラメーターはオプションであり、指定しない場合は、既存のスキーマ名が使用されます。<new-pipeline-name>
を新しいパイプラインのオプションの名前に置き換えます。 指定しない場合、新しいパイプラインには、ソース パイプライン名に[UC]
を付加した名前が付けられます。
スクリプトを実行します。 「Databricks ノートブックの実行」を参照してください。
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode in this preview
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
制限事項
Delta Live Tables clone a pipeline
API 要求の制限事項を次に示します。
Hive metastoreを使用するように設定されたパイプラインからUnity Catalogパイプラインへのクローニングのみがサポートされています。
クローンを作成できるのは、クローン作成元のパイプラインと同じ Databricks ワークスペースのみです。
クローンを作成するパイプラインには、次のストリーミングソースのみを含めることができます。
Delta ソース
Auto Loader( Auto Loaderでサポートされているすべてのデータソースを含む) クラウド・オブジェクト・ストレージからのファイルのロードを参照してください。
Apached Kafka と構造化ストリーミング。 ただし、Kafka ソースを
kafka.group.id
オプションを使用するように設定することはできません。 「Apache Kafka と Databricks を使用したストリーム処理」を参照してください。Amazon Kinesis と構造化ストリーミング。 ただし、Kinesis ソースを
consumerMode
efo
に設定するように設定することはできません。
クローニングしている Hive metastore パイプラインで Auto Loader ファイル通知モードを使用している場合は Databricks 、クローニング後に Hive metastore パイプラインを実行しないことをお勧めします。 これは、 Hive metastore パイプラインを実行すると、 Unity Catalog クローンから一部のファイル通知イベントが削除されるためです。 クローン操作の完了後にソース Hive metastore パイプラインが実行される場合は、
cloudFiles.backfillInterval
オプションを指定した Auto Loader を使用して、不足しているファイルをバックフィルできます。 Auto Loader ファイル通知モードの詳細については、「Auto Loader ファイル通知モードとは」を参照してください。Auto Loaderを使用したファイルのバックフィルについては、「cloudFiles.backfillInterval を使用して定期的なバックフィルをトリガーする」を参照してください。 および 一般的な Auto Loader オプション。クローン作成の進行中は、両方のパイプラインでパイプライン メンテナンス タスクが自動的に停止されます。
クローンされた Unity Catalog パイプラインのテーブルに対するタイムトラベルクエリには、以下が適用されます。
テーブルバージョンが最初に Hive metastore 管理オブジェクトに書き込まれた場合、クローニングされた Unity Catalog オブジェクトをクエリするときに、
timestamp_expression
句を使用したタイムトラベルクエリは未定義です。ただし、テーブルバージョンがクローン化された Unity Catalog オブジェクトに書き込まれた場合、
timestamp_expression
句を使用したタイムトラベルクエリは正しく機能します。version
句を使用したタイムトラベルクエリは、クローンされた Unity Catalog オブジェクトをクエリするときに、バージョンが最初に Hive metastore 管理オブジェクトに書き込まれた場合でも正しく機能します。
Delta Live Tables を Unity Catalog と共に使用する場合のその他の制限事項については、「 Unity Catalog パイプラインの制限事項」を参照してください。
Unity Catalog の制限事項については、「 Unity Catalog の制限事項」を参照してください。