execução de um pipeline Delta Live Tables em um fluxo de trabalho

Você pode executar um pipeline Delta Live Tables como parte de um fluxo de trabalho de processamento de dados com Databricks Job, Apache Airflow ou Azure Data Factory.

Empregos

Você pode orquestrar várias tarefas em um Job do Databricks para implementar um fluxo de trabalho de processamento de dados. Para incluir um pipeline Delta Live Tables em um Job, use a tarefa Pipeline ao criar um Job.

Apache Airflow

Apache Airflow é uma solução open source para gerenciamento e programação de fluxo de dados de trabalho. Airflow representa fluxo de trabalho como gráficos acíclicos direcionados (DAGs) de operações. Você define um workflow em um arquivo Python e o Airflow gerencia a programação e execução. Para obter informações sobre como instalar e usar o Airflow com Databricks, consulte Orchestrate Databricks Job with Apache Airflow.

Para executar um pipeline Delta Live Tables como parte de um fluxo de trabalho do Airflow, use o DatabricksSubmitRunOperator.

Requisitos

Os itens a seguir são necessários para usar o suporte Airflow para Delta Live Tables:

Exemplo

O exemplo a seguir cria um Airflow DAG que aciona uma atualização para o pipeline Delta Live Tables com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Substitua CONNECTION_ID pelo identificador de uma conexão do Airflow com seu workspace.

Salve este exemplo no diretório airflow/dags e use a IU do Airflow para view e acionar o DAG. Use a IU do Delta Live Tables para view os detalhes da atualização do pipeline.

Fábrica de dados do Azure

O Azure Data Factory é um serviço ETL baseado em cloudque permite orquestrar a integração de dados e transformar o fluxo de trabalho. O Azure Data Factory oferece suporte direto à execução de tarefas do Databricks em um fluxo de trabalho, incluindo Notebook, tarefas JAR e scripts Python. Você também pode incluir um pipeline em um fluxo de trabalho chamando a API Delta Live Tables de uma atividade da Web do Azure Data Factory. Por exemplo, para acionar uma atualização de pipeline do Azure Data Factory:

  1. Crie um data factory ou abra um data factory existente.

  2. Quando a criação for concluída, abra a página do seu data factory e clique no bloco Open Azure Data Factory Studio . A interface de usuário do Azure Data Factory é exibida.

  3. Crie um novo pipeline do Azure Data Factory selecionando Pipeline no menu suspenso Novo na interface do usuário do Azure Data Factory Studio.

  4. Na caixa de ferramentas Atividades , expanda Geral e arraste a atividade da Web para a tela do pipeline. Clique na tab Configurações e insira os seguintes valores:

    Observação

    Como prática recomendada de segurança, ao autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, a Databricks recomenda que você use access token pessoal pertencente à entidade de serviço em vez de usuários do espaço de trabalho. Para criar tokens para entidades de serviço, consulte gerenciar tokens para uma entidade de serviço.

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Substitua <get-workspace-instance>.

      Substitua <pipeline-id> pelo identificador do pipeline.

    • Método: Selecione POST no menu suspenso.

    • Cabeçalhos: Clique em + Novo. Na caixa de texto Nome , digite Authorization. Na caixa de texto Valor , digite Bearer <personal-access-token>.

      Substitua <personal-access-token> por um access tokenpessoal do Databricks.

    • Corpo: para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para iniciar uma atualização e reprocessar todos os dados para o pipeline: {"full_refresh": "true"}. Se não houver parâmetros de solicitação adicionais, insira chaves vazias ({}).

Para testar a atividade da Web, clique em Depurar na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da execução, incluindo erros, são exibidos na tab Saída do pipeline do Azure Data Factory. Use a IU do Delta Live Tables para view os detalhes da atualização do pipeline.

Dica

Um requisito de fluxo de trabalho comum é iniciar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação Delta Live Tables updates é assíncrona — a solicitação retorna depois de iniciar a atualização, mas antes da conclusão da atualização — as tarefas em seu pipeline do Azure Data Factory com dependência da atualização Delta Live Tables devem aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Until após a atividade da Web que aciona a atualização Delta Live Tables. Na atividade Até:

  1. Adicione uma atividade Aguardar para aguardar um número configurado de segundos para a conclusão da atualização.

  2. Adicione uma atividade da Web após a atividade Aguardar que usa a solicitação Delta Live Tables Get Update Details para obter o status da atualização. O campo state na resposta retorna o estado atual da atualização, inclusive se ela foi concluída.

  3. Use o valor do campo state para definir a condição de encerramento da atividade Até. Você também pode usar uma atividade Definir variável para adicionar uma variável de pipeline com base no valor state e usar essa variável para a condição de encerramento.