Criar um Unity Catalog pipeline clonando um Hive metastore pipeline

Prévia

A solicitação clone a pipeline na API Delta Live Tables Rest está em visualização pública.

Este artigo descreve a solicitação clone a pipeline no Databricks Rest API e como o senhor pode usá-la para copiar um pipeline existente que publica no Hive metastore para um novo pipeline que publica no Unity Catalog. Quando você chama a solicitação clone a pipeline, ela:

  • Copia o código-fonte e a configuração do pipeline existente para um novo pipeline, aplicando quaisquer substituições de configuração que o senhor tenha especificado.

  • Atualiza as definições e referências das tabelas materializadas view e de transmissão com as alterações necessárias para que esses objetos sejam gerenciados por Unity Catalog.

  • O senhor deve começar a atualizar o site pipeline para migrar os dados e metadados existentes, como pontos de verificação, para quaisquer tabelas de transmissão no site pipeline. Isso permite que essas tabelas de transmissão retomem o processamento no mesmo ponto que o original pipeline.

Após a conclusão das operações de clonagem, o pipeline original e o novo podem ser executados de forma independente.

Este artigo inclui exemplos de como chamar a solicitação API diretamente e por meio de um script Python de um notebook Databricks.

Antes de começar

Os seguintes itens são necessários antes de clonar um pipeline:

  • Para clonar um Hive metastore pipeline, as tabelas e a visualização definidas no pipeline devem publicar tabelas em um esquema de destino. Para saber como adicionar um esquema de destino a um pipeline, consulte Como publicar o conjunto de dados Delta Live Tables no legado Hive metastore.

  • As referências a Hive metastore gerenciar tabelas ou exibições no pipeline a ser clonado devem ser totalmente qualificadas com o catálogo (hive_metastore), o esquema e o nome da tabela. Por exemplo, no código a seguir que cria um customers dataset o argumento do nome da tabela deve ser atualizado para hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Não edite o código-fonte da fonte Hive metastore pipeline enquanto uma operação de clonagem estiver em andamento, incluindo o Notebook configurado como parte do pipeline e quaisquer módulos armazenados em pastas Git ou arquivos workspace.

  • A fonte Hive metastore pipeline não deve estar em execução quando o senhor iniciar as operações de clonagem. Se uma atualização estiver em execução, pare-a ou espere que ela seja concluída.

A seguir, outras considerações importantes antes de clonar um pipeline:

  • Se as tabelas em Hive metastore pipeline especificarem um local de armazenamento usando o argumento path em Python ou LOCATION em SQL, passe a configuração "pipelines.migration.ignoreExplicitPath": "true" para a solicitação de clone. A definição dessa configuração está incluída nas instruções abaixo.

  • Se o site Hive metastore pipeline incluir uma fonte Auto Loader que especifique um valor para a opção cloudFiles.schemaLocation e o site Hive metastore pipeline continuar operacional após a criação do clone Unity Catalog, o senhor deverá definir a opção mergeSchema como true no site Hive metastore pipeline e no site clonado Unity Catalog pipeline. Ao adicionar essa opção ao site Hive metastore pipeline antes da clonagem, o senhor copiará a opção para o novo site pipeline.

Clonar um pipeline com a API REST da Databricks

O exemplo a seguir usa o comando curl para chamar a solicitação clone a pipeline na API do Databricks Rest:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Substituir:

clone-pipeline.JSON:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Substituir:

  • <target-catalog-name> com o nome de um catálogo no Unity Catalog no qual o novo pipeline deve ser publicado. Esse deve ser um catálogo existente.

  • <target-schema-name> com o nome de um esquema no Unity Catalog para o qual o novo pipeline deve ser publicado se for diferente do nome do esquema atual. Esse parâmetro é opcional e, se não for especificado, o nome do esquema existente será usado.

  • <new-pipeline-name> com um nome opcional para o novo pipeline. Se não for especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com [UC] anexado.

clone_mode especifica o modo a ser usado para as operações de clonagem. MIGRATE_TO_UC é a única opção suportada.

Use o campo configuration para especificar as configurações do novo pipeline. Os valores definidos aqui substituem as configurações no pipeline original.

A resposta da solicitação da API REST clone é o ID do pipeline do novo pipeline do Unity Catalog.

Clonar um pipeline de um notebook Databricks

O exemplo a seguir chama a solicitação create a pipeline de um script Python. O senhor pode usar o Databricks Notebook para executar esse script:

  1. Crie um novo Notebook para o script. Consulte Criar um Notebook.

  2. Copie o seguinte script Python na primeira célula do Notebook.

  3. Atualize os valores do espaço reservado no script substituindo:

    • <databricks-instance> com o nome da instância do espaço de trabalho do Databricks, por exemplo 1234567890123456.7.gcp.databricks.com

    • <pipeline-id> com o identificador exclusivo do site Hive metastore pipeline a ser clonado. O senhor pode encontrar o ID do pipeline na UI do Delta Live Tables.

    • <target-catalog-name> com o nome de um catálogo no Unity Catalog no qual o novo pipeline deve ser publicado. Esse deve ser um catálogo existente.

    • <target-schema-name> com o nome de um esquema no Unity Catalog para o qual o novo pipeline deve ser publicado se for diferente do nome do esquema atual. Esse parâmetro é opcional e, se não for especificado, o nome do esquema existente será usado.

    • <new-pipeline-name> com um nome opcional para o novo pipeline. Se não for especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com [UC] anexado.

  4. execução do script. Veja execução Databricks Notebook.

import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode in this preview
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()


def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS


    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Limitações

A seguir estão as limitações da solicitação de API do Delta Live Tables clone a pipeline:

  • Somente há suporte para a clonagem de um pipeline configurado para usar o Hive metastore para um Unity Catalog pipeline .

  • O senhor pode criar um clone somente no mesmo Databricks workspace que o pipeline do qual está clonando.

  • O site pipeline que o senhor está clonando pode incluir apenas as seguintes fontes de transmissão:

  • Se o Hive metastore pipeline que o senhor está clonando usa o modo de notificação de arquivo Auto Loader, Databricks recomenda não executar o Hive metastore pipeline após a clonagem. Isso ocorre porque a execução do Hive metastore pipeline resulta na eliminação de alguns eventos de notificação de arquivo do clone Unity Catalog. Se a fonte Hive metastore pipeline for executada após a conclusão das operações de clonagem, o senhor poderá preencher novamente os arquivos ausentes usando Auto Loader com a opção cloudFiles.backfillInterval. Para saber mais sobre o modo de notificação de arquivo do Auto Loader, consulte O que é o modo de notificação de arquivo do Auto Loader? Para saber mais sobre o backfilling de arquivos com o Auto Loader, consulte Acionar backfills regulares usando cloudFiles.backfillInterval e opções do Common Auto Loader.

  • A tarefa de manutenção do pipeline é automaticamente pausada para ambos os pipelines enquanto a clonagem está em andamento.

  • O seguinte se aplica a consultas de viagem do tempo em tabelas no site clonado Unity Catalog pipeline:

    • Se uma versão de tabela foi originalmente gravada em um objeto Hive metastore gerenciar, as consultas de viagem do tempo que usam uma cláusula timestamp_expression não são definidas ao consultar o objeto Unity Catalog clonado.

    • No entanto, se a versão da tabela tiver sido gravada no objeto Unity Catalog clonado, as consultas de viagem do tempo usando uma cláusula timestamp_expression funcionarão corretamente.

    • As consultas de viagem do tempo que usam uma cláusula version funcionam corretamente ao consultar um objeto Unity Catalog clonado, mesmo quando a versão foi originalmente gravada no objeto Hive metastore gerenciar.

  • Para outras limitações ao usar Delta Live Tables com o Unity Catalog, consulte Limitações do pipeline do Unity Catalog.

  • Para conhecer as limitações do Unity Catalog, consulte Limitações do Unity Catalog.