Orquestrar o trabalho Databricks com Apache Airflow

Este artigo descreve o suporte do Apache Airflow para orquestrar o pipeline de dados com o Databricks, tem instruções para instalar e configurar o Airflow localmente e fornece um exemplo de implantação e execução de um Databricks fluxo de trabalho com o Airflow.

Job orquestração in a pipeline de dados

O desenvolvimento e a implantação de um processamento de dados pipeline geralmente exigem o gerenciamento de dependências complexas entre as tarefas. Por exemplo, um pipeline pode ler dados de uma fonte, limpar os dados, transformar os dados limpos e gravar os dados transformados em um destino. O senhor também precisa de suporte para testar, programar e solucionar erros quando operacionalizar um pipeline.

Os sistemas de fluxo de trabalho abordam esses desafios, permitindo que o senhor defina as dependências entre as tarefas, programe a execução do pipeline e monitore o fluxo de trabalho. Apache Airflow é uma solução de código aberto para gerenciar e programar pipeline de dados. Airflow representa o pipeline de dados como um gráfico acíclico direcionado (DAGs) de operações. O senhor define um fluxo de trabalho em um arquivo Python e Airflow gerencia a programação e a execução. A conexão Airflow Databricks permite que o senhor aproveite o mecanismo otimizado Spark oferecido por Databricks com o recurso programar de Airflow.

Requisitos

  • A integração entre o Airflow e o Databricks requer o Airflow versão 2.5.0 e posterior. Os exemplos deste artigo foram testados com o site Airflow versão 2.6.1.

  • O Airflow requer o Python 3.8, 3.9, 3.10 ou 3.11. Os exemplos deste artigo foram testados com o site Python 3.8.

  • As instruções neste artigo para instalar e executar o Airflow exigem que o pipenv crie um ambiente virtualPython .

Airflow operadores para Databricks

Um Airflow DAG é composto de tarefas, em que cada tarefa executa um Airflow Operator. Airflow Os operadores que dão suporte à integração com o site Databricks são implementados no provedorDatabricks .

O provedor Databricks inclui operadores para executar várias tarefas em um Databricks workspace, incluindo a importação de dados para uma tabela, a execução de consultas SQL e o trabalho com pastasDatabricks Git .

O provedor Databricks implementa dois operadores para acionar o Job:

Para criar um novo trabalho no site Databricks ou redefinir um trabalho existente, o provedor Databricks implementa o DatabricksCreateJobsOperator. O DatabricksCreateJobsOperator usa o POST /api/2.1/Job/create e POST /api/2.1/Job/Reset Solicitações de API. O senhor pode usar o DatabricksCreateJobsOperator com o DatabricksRunNowOperator para criar e executar um trabalho.

Observação

O uso dos operadores do Databricks para acionar um trabalho exige o fornecimento de credenciais na configuração da conexão Databricks. Consulte Criar um Databricks pessoal access token para Airflow.

Os operadores do Databricks Airflow gravam o URL da página de execução do trabalho no Airflow logs a cada polling_period_seconds (o default é de 30 segundos). Para obter mais informações, consulte a página do pacote apache-Airflow -providers-databricks no site Airflow.

Instale a integração do Airflow Databricks localmente

Para instalar o Airflow e o provedor Databricks localmente para testes e desenvolvimento, use os seguintes passos. Para outras opções de instalação do Airflow, incluindo a criação de uma instalação de produção, consulte a instalação na documentação do Airflow.

Abra um terminal e execute o seguinte comando:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Substitua <firstname>, <lastname> e <email> por seu nome de usuário e email. Será solicitado que o senhor digite uma senha para o usuário administrador. Certifique-se de salvar essa senha, pois ela é necessária para acessar log in na interface do usuário Airflow.

Esse script executa os seguintes passos:

  1. Cria um diretório chamado airflow e muda para esse diretório.

  2. Usa pipenv para criar e gerar um ambiente virtual Python. Databricks recomenda o uso de um ambiente virtual Python para isolar as versões do pacote e as dependências de código nesse ambiente. Esse isolamento ajuda a reduzir as incompatibilidades inesperadas de versões de pacotes e as colisões de dependências de código.

  3. Inicializa uma variável de ambiente chamada AIRFLOW_HOME definida como o caminho do diretório airflow.

  4. Instala o Airflow e o pacote do provedor Airflow Databricks .

  5. Cria um diretório airflow/dags. O Airflow usa o diretório dags para armazenar as definições de DAG.

  6. Inicializa um banco de dados SQLite que o Airflow usa para rastrear metadados. Em uma implantação de produção do Airflow, o senhor configuraria o Airflow com um banco de dados padrão. O banco de dados SQLite e a configuração default para sua implementação Airflow são inicializados no diretório airflow.

  7. Cria um usuário administrador para o Airflow.

Dica

Para confirmar a instalação do provedor Databricks, execute o seguinte comando no diretório de instalação Airflow:

airflow providers list

Começar a Airflow servidor web e programador

O servidor da Web Airflow é necessário para view a UI Airflow. Para iniciar o servidor da Web, abra um terminal no diretório de instalação Airflow e execute o seguinte comando:

Observação

Se o servidor da Web Airflow não conseguir iniciar devido a um conflito de porta, o senhor poderá alterar a porta default na configuraçãoAirflow .

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

O programador é o componente Airflow que programa DAGs. Para iniciar o programador, abra um novo terminal no diretório de instalação Airflow e execute o seguinte comando:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Teste a instalação do site Airflow

Para verificar a instalação do Airflow, o senhor pode executar um dos DAGs de exemplo incluídos no Airflow:

  1. Em uma janela do navegador, abra http://localhost:8080/home. Faça login na interface do usuário Airflow com o nome de usuário e a senha que você criou ao instalar o Airflow. A página Airflow DAGs é exibida.

  2. Clique no botão de alternância pausa/Unpause DAG para cancelar a pausa de um dos DAGs de exemplo, por exemplo, o example_python_operator.

  3. Acione o DAG de exemplo clicando no botão Trigger DAG.

  4. Clique no nome do DAG para acessar view detalhes, inclusive o status de execução do DAG.

Crie um Databricks pessoal access token para Airflow

Airflow conecta-se a Databricks usando um Databricks personal access token (PAT). Para criar um PAT, siga os passos em Databricks personal access tokens para usuários de workspace .

Observação

Como prática recomendada de segurança, quando o senhor se autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, o site Databricks recomenda o uso do access tokens pessoal pertencente à entidade de serviço em vez do workspace de usuários. Para criar o site tokens para uma entidade de serviço, consulte gerenciar tokens para uma entidade de serviço.

Configurar uma Databricks Connect

Sua instalação Airflow contém uma conexão default para Databricks. Para atualizar a conexão para se conectar ao seu workspace usando o access token pessoal que o senhor criou acima:

  1. Em uma janela do navegador, abra http://localhost:8080/connection/list/. Se for solicitado a fazer login, digite seu nome de usuário e senha de administrador.

  2. Em Conn ID, localize default e clique no botão Edit record (Editar registro ).

  3. Substitua o valor no campo Host pelo nome da instância do espaço de trabalho de sua implantação do Databricks, por exemplo, https://adb-123456789.cloud.databricks.com.

  4. No campo Password (Senha ), digite sua Databricks pessoal access token.

  5. Clique em Salvar.

Exemplo: Criar um DAG Airflow para executar um trabalho Databricks

O exemplo a seguir demonstra como criar uma implantação Airflow simples que é executada em seu computador local e implantar um DAG de exemplo para acionar a execução em Databricks. Neste exemplo, o senhor irá:

  1. Crie um novo Notebook e adicione código para imprimir uma saudação com base em um parâmetro configurado.

  2. Crie um Databricks Job com uma única tarefa para executar o Notebook.

  3. Configure uma conexão Airflow para seu Databricks workspace.

  4. Crie um DAG Airflow para acionar o trabalho do notebook. O senhor define o DAG em um script Python usando DatabricksRunNowOperator.

  5. Use a interface de usuário Airflow para acionar o DAG e view o status da execução.

Criar um notebook

Este exemplo usa um Notebook com duas células:

  • A primeira célula contém um widget de textoDatabricks utilidades que define uma variável chamada greeting definida como o valor default world.

  • A segunda célula imprime o valor da variável greeting prefixada por hello.

Para criar o Notebook:

  1. Acesse o site Databricks workspace, clique em Novo ícone New na barra lateral e selecione Notebook.

  2. Dê um nome ao seu Notebook, como Hello Airflow, e certifique-se de que o idioma do default esteja definido como Python.

  3. Copie o seguinte código Python e cole-o na primeira célula do notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Adicione uma nova célula abaixo da primeira célula e copie e cole o seguinte código Python na nova célula:

    print("hello {}".format(greeting))
    

Criar um job

  1. Clique em fluxo de trabalho Icon fluxo de trabalho na barra lateral.

  2. Clique em Botão criar job.

    A aba Tarefas aparece com a caixa de diálogo de criação de tarefas.

    Criar caixa de diálogo da primeira tarefa
  3. Substitua Adicione um nome para o seu trabalho… pelo nome do seu trabalho.

  4. No campo nome da tarefa, digite um nome para a tarefa, por exemplo, greeting-tarefa.

  5. No menu suspenso Type (Tipo ), selecione Notebook.

  6. No menu suspenso Source (Fonte ), selecione workspace (espaço de trabalho).

  7. Clique na caixa de texto Path (Caminho ) e use o navegador de arquivos para localizar o Notebook que o senhor criou, clique no nome do Notebook e clique em Confirm (Confirmar).

  8. Clique em Adicionar em Parâmetros. No campo Chave, digite greeting. No campo Valor, digite Airflow user.

  9. Clique em Criar tarefa.

No painel de detalhesJob , copie o valor de IDJob . Esse valor é necessário para acionar o Job em Airflow.

Execute o job

Para testar o novo trabalho na interface de usuário do Databricks Jobs, clique em Botão executar agora no canto superior direito. Quando a execução for concluída, o senhor poderá verificar a saída visualizando os detalhes da execução do trabalho.

Criar um novo DAG Airflow

O senhor define um Airflow DAG em um arquivo Python. Para criar um DAG para acionar o trabalho de exemplo do Notebook:

  1. Em um editor de texto ou IDE, crie um novo arquivo chamado databricks_dag.py com o seguinte conteúdo:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Substitua JOB_ID pelo valor da ID do trabalho salva anteriormente.

  2. Salve o arquivo no diretório airflow/dags. O Airflow lê e instala automaticamente os arquivos DAG armazenados em airflow/dags/.

Instalar e verificar o DAG no Airflow

Para acionar e verificar o DAG na Airflow UI:

  1. Em uma janela do navegador, abra http://localhost:8080/home. A tela Airflow DAGs é exibida.

  2. Localize databricks_dag e clique no botão de alternância pausa/Unpause DAG para cancelar a pausa do DAG.

  3. Acione o DAG clicando no botão Trigger DAG.

  4. Clique em uma execução na coluna de execução para acessar view o status e os detalhes da execução.