Orquestrar o trabalho Databricks com Apache Airflow

Este artigo descreve o suporte do Apache Airflow para orquestrar o pipeline de dados com o Databricks, tem instruções para instalar e configurar o Airflow localmente e fornece um exemplo de implantação e execução de um Databricks fluxo de trabalho com o Airflow.

Job orquestração in a pipeline de dados

O desenvolvimento e a implantação de um processamento de dados pipeline geralmente exigem o gerenciamento de dependências complexas entre as tarefas. Por exemplo, um pipeline pode ler dados de uma fonte, limpar os dados, transformar os dados limpos e gravar os dados transformados em um destino. O senhor também precisa de suporte para testar, programar e solucionar erros quando operacionalizar um pipeline.

Os sistemas de fluxo de trabalho abordam esses desafios, permitindo que o senhor defina as dependências entre as tarefas, programe a execução do pipeline e monitore o fluxo de trabalho. Apache Airflow é uma solução de código aberto para gerenciar e programar pipeline de dados. Airflow representa o pipeline de dados como um gráfico acíclico direcionado (DAGs) de operações. O senhor define um fluxo de trabalho em um arquivo Python e Airflow gerencia a programação e a execução. A conexão Airflow Databricks permite que o senhor aproveite o mecanismo otimizado Spark oferecido por Databricks com o recurso programar de Airflow.

Requisitos

  • A integração entre o Airflow e o Databricks requer o Airflow versão 2.5.0 e posterior. Os exemplos deste artigo foram testados com o site Airflow versão 2.6.1.

  • O Airflow requer o Python 3.8, 3.9, 3.10 ou 3.11. Os exemplos deste artigo foram testados com o site Python 3.8.

  • As instruções neste artigo para instalar e executar o Airflow exigem que o pipenv crie um ambiente virtualPython .

Airflow operadores para Databricks

Um Airflow DAG é composto de tarefas, em que cada tarefa executa um Airflow Operator. Airflow Os operadores que dão suporte à integração com o site Databricks são implementados no provedorDatabricks .

O provedor Databricks inclui operadores para executar várias tarefas em um Databricks workspace, incluindo a importação de dados para uma tabela, a execução de consultas SQL e o trabalho com pastasDatabricks Git .

O provedor Databricks implementa dois operadores para acionar o Job:

Para criar um novo trabalho no site Databricks ou redefinir um trabalho existente, o provedor Databricks implementa o DatabricksCreateJobsOperator. O DatabricksCreateJobsOperator usa o POST /api/2.1/Job/create e POST /api/2.1/Job/Reset Solicitações de API. O senhor pode usar o DatabricksCreateJobsOperator com o DatabricksRunNowOperator para criar e executar um trabalho.

Observação

O uso dos operadores do Databricks para acionar um trabalho exige o fornecimento de credenciais na configuração da conexão Databricks. Consulte Criar um Databricks pessoal access token para Airflow.

Os operadores do Databricks Airflow gravam o URL da página de execução do trabalho no Airflow logs a cada polling_period_seconds (o default é de 30 segundos). Para obter mais informações, consulte a página do pacote apache-Airflow -providers-databricks no site Airflow.

Instale a integração do Airflow Databricks localmente

Para instalar o Airflow e o provedor Databricks localmente para testes e desenvolvimento, use os seguintes passos. Para outras opções de instalação do Airflow, incluindo a criação de uma instalação de produção, consulte a instalação na documentação do Airflow.

Abra um terminal e execute o seguinte comando:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Substitua <firstname>, <lastname> e <email> por seu nome de usuário e email. Será solicitado que o senhor digite uma senha para o usuário administrador. Certifique-se de salvar essa senha, pois ela é necessária para acessar log in na interface do usuário Airflow.

Esse script executa os seguintes passos:

  1. Cria um diretório chamado airflow e muda para esse diretório.

  2. Usa pipenv para criar e gerar um ambiente virtual Python. Databricks recomenda o uso de um ambiente virtual Python para isolar as versões do pacote e as dependências de código nesse ambiente. Esse isolamento ajuda a reduzir as incompatibilidades inesperadas de versões de pacotes e as colisões de dependências de código.

  3. Inicializa uma variável de ambiente chamada AIRFLOW_HOME definida como o caminho do diretório airflow.

  4. Instala o Airflow e o pacote do provedor Airflow Databricks .

  5. Cria um diretório airflow/dags. O Airflow usa o diretório dags para armazenar as definições de DAG.

  6. Inicializa um banco de dados SQLite que o Airflow usa para rastrear metadados. Em uma implantação de produção do Airflow, o senhor configuraria o Airflow com um banco de dados padrão. O banco de dados SQLite e a configuração default para sua implementação Airflow são inicializados no diretório airflow.

  7. Cria um usuário administrador para o Airflow.

Dica

Para confirmar a instalação do provedor Databricks, execute o seguinte comando no diretório de instalação Airflow:

airflow providers list

Começar a Airflow servidor web e programador

O servidor da Web Airflow é necessário para view a UI Airflow. Para iniciar o servidor da Web, abra um terminal no diretório de instalação Airflow e execute o seguinte comando:

Observação

Se o servidor da Web Airflow não conseguir iniciar devido a um conflito de porta, o senhor poderá alterar a porta default na configuraçãoAirflow .

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

O programador é o componente Airflow que programa DAGs. Para iniciar o programador, abra um novo terminal no diretório de instalação Airflow e execute o seguinte comando:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Teste a instalação do site Airflow

Para verificar a instalação do Airflow, o senhor pode executar um dos DAGs de exemplo incluídos no Airflow:

  1. Em uma janela do navegador, abra http://localhost:8080/home. Faça login na interface do usuário Airflow com o nome de usuário e a senha que você criou ao instalar o Airflow. A página Airflow DAGs é exibida.

  2. Clique no botão de alternância pausa/Unpause DAG para cancelar a pausa de um dos DAGs de exemplo, por exemplo, o example_python_operator.

  3. Acione o DAG de exemplo clicando no botão Trigger DAG.

  4. Clique no nome do DAG para acessar view detalhes, inclusive o status de execução do DAG.

Crie um Databricks pessoal access token para Airflow

Airflow conecta-se a Databricks usando um Databricks personal access token (PAT). Para criar um PAT:

  1. Em seu Databricks workspace, clique em seu nome de usuário Databricks na barra superior e selecione Settings (Configurações ) no menu suspenso.

  2. Clique em Desenvolvedor.

  3. Em seguida, para acessar tokens, clique em gerenciar.

  4. Clique em Generate new tokens (Gerar novos tokens).

  5. (Opcional) Insira um comentário que o ajude a identificar esse token no futuro e altere o tempo de vida padrão do token de 90 dias. Para criar um token sem vida útil (não recomendado), deixe a caixa Duração (dias) vazia (em branco).

  6. Clique em Gerar.

  7. Copie o token exibido em um local seguro e clique em Concluído.

Observação

Não se esqueça de salvar os tokens copiados em um local seguro. Não compartilhe seus tokens copiados com outras pessoas. Se o senhor perder os tokens copiados, não poderá regenerar exatamente os mesmos tokens. Em vez disso, o senhor deve repetir esse procedimento para criar novos tokens. Se o senhor perder os tokens copiados ou acreditar que os tokens foram comprometidos, o site Databricks recomenda enfaticamente que o senhor exclua imediatamente esses tokens do seu workspace clicando no ícone da lixeira(Revoke) ao lado dos tokens na página de acesso tokens.

Se não for possível criar ou usar tokens no seu workspace, isso pode ocorrer porque o administrador do workspace desativou os tokens ou não lhe deu permissão para criar ou usar tokens. Consulte o administrador do workspace ou o seguinte:

Observação

Como prática recomendada de segurança, quando o senhor se autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, o site Databricks recomenda o uso do access tokens pessoal pertencente à entidade de serviço em vez do workspace de usuários. Para criar o site tokens para uma entidade de serviço, consulte gerenciar tokens para uma entidade de serviço.

Configurar uma Databricks Connect

Sua instalação Airflow contém uma conexão default para Databricks. Para atualizar a conexão para se conectar ao seu workspace usando o access token pessoal que o senhor criou acima:

  1. Em uma janela do navegador, abra http://localhost:8080/connection/list/. Se for solicitado a fazer login, digite seu nome de usuário e senha de administrador.

  2. Em Conn ID, localize default e clique no botão Edit record (Editar registro ).

  3. Substitua o valor no campo Host pelo nome da instância do espaço de trabalho de sua implantação do Databricks, por exemplo, https://adb-123456789.cloud.databricks.com.

  4. No campo Password (Senha ), digite sua Databricks pessoal access token.

  5. Clique em Salvar.

Exemplo: Criar um DAG Airflow para executar um trabalho Databricks

O exemplo a seguir demonstra como criar uma implantação Airflow simples que é executada em seu computador local e implantar um DAG de exemplo para acionar a execução em Databricks. Neste exemplo, o senhor irá:

  1. Crie um novo Notebook e adicione código para imprimir uma saudação com base em um parâmetro configurado.

  2. Crie um Databricks Job com uma única tarefa para executar o Notebook.

  3. Configure uma conexão Airflow para seu Databricks workspace.

  4. Crie um DAG Airflow para acionar o trabalho do notebook. O senhor define o DAG em um script Python usando DatabricksRunNowOperator.

  5. Use a interface de usuário Airflow para acionar o DAG e view o status da execução.

Criar um notebook

Este exemplo usa um Notebook com duas células:

  • A primeira célula contém um widget de textoDatabricks utilidades que define uma variável chamada greeting definida como o valor default world.

  • A segunda célula imprime o valor da variável greeting prefixada por hello.

Para criar o Notebook:

  1. Acesse o site Databricks workspace, clique em Novo ícone New na barra lateral e selecione Notebook.

  2. Dê um nome ao seu Notebook, como Hello Airflow, e certifique-se de que o idioma do default esteja definido como Python.

  3. Copie o seguinte código Python e cole-o na primeira célula do notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Adicione uma nova célula abaixo da primeira célula e copie e cole o seguinte código Python na nova célula:

    print("hello {}".format(greeting))
    

Criar um job

  1. Clique em fluxo de trabalho Icon fluxo de trabalho na barra lateral.

  2. Clique em Botão criar job.

    A aba Tarefas aparece com a caixa de diálogo de criação de tarefas.

    Criar caixa de diálogo da primeira tarefa
  3. Substitua Adicione um nome para o seu trabalho… pelo nome do seu trabalho.

  4. No campo nome da tarefa, digite um nome para a tarefa, por exemplo, greeting-tarefa.

  5. No menu suspenso Type (Tipo ), selecione Notebook.

  6. No menu suspenso Source (Fonte ), selecione workspace (espaço de trabalho).

  7. Clique na caixa de texto Path (Caminho ) e use o navegador de arquivos para localizar o Notebook que o senhor criou, clique no nome do Notebook e clique em Confirm (Confirmar).

  8. Clique em Adicionar em Parâmetros. No campo Chave, digite greeting. No campo Valor, digite Airflow user.

  9. Clique em Criar tarefa.

No painel de detalhesJob , copie o valor de IDJob . Esse valor é necessário para acionar o Job em Airflow.

Execute o job

Para testar o novo trabalho na interface de usuário do Databricks Jobs, clique em Botão executar agora no canto superior direito. Quando a execução for concluída, o senhor poderá verificar a saída visualizando os detalhes da execução do trabalho.

Criar um novo DAG Airflow

O senhor define um Airflow DAG em um arquivo Python. Para criar um DAG para acionar o trabalho de exemplo do Notebook:

  1. Em um editor de texto ou IDE, crie um novo arquivo chamado databricks_dag.py com o seguinte conteúdo:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Substitua JOB_ID pelo valor da ID do trabalho salva anteriormente.

  2. Salve o arquivo no diretório airflow/dags. O Airflow lê e instala automaticamente os arquivos DAG armazenados em airflow/dags/.

Instalar e verificar o DAG no Airflow

Para acionar e verificar o DAG na Airflow UI:

  1. Em uma janela do navegador, abra http://localhost:8080/home. A tela Airflow DAGs é exibida.

  2. Localize databricks_dag e clique no botão de alternância pausa/Unpause DAG para cancelar a pausa do DAG.

  3. Acione o DAG clicando no botão Trigger DAG.

  4. Clique em uma execução na coluna de execução para acessar view o status e os detalhes da execução.