Integrar o MLflow e o Ray

MLflow é uma plataforma de código aberto para gerenciar cargas de trabalho de machine learning e IA. A combinação do Ray com o MLflow permite que o senhor distribua cargas de trabalho com o Ray e acompanhe modelos, métricas, parâmetros e metadados gerados durante o treinamento com o MLflow.

Este artigo aborda como integrar o MLflow com os seguintes componentes do Ray:

  • Ray Core: Aplicativos distribuídos de uso geral que não são cobertos pelo Ray Tune e Ray ensinar

  • Ray ensinar: Treinamento de modelos distribuídos

  • Ray Tune: ajuste distribuído de hiperparâmetros

Integrar o Ray Core e o MLflow

O Ray Core fornece os elementos básicos para aplicativos distribuídos de uso geral. Ele permite que o senhor escalone Python funções e classes em vários nós.

Esta seção descreve os seguintes padrões para integrar o Ray Core e o MLflow:

  • registrar MLflow modelos do processo do driver Ray

  • registro MLflow modelos de execução infantil

Registrar o MLflow do processo do driver Ray

Geralmente, é melhor log MLflow modelos do processo do driver em vez de worker nós. Isso se deve à complexidade adicional de passar referências com estado para o trabalhador remoto.

Por exemplo, o código a seguir falha porque o servidor de acompanhamento MLflow não é inicializado usando o MLflow Client de dentro dos nós worker.

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

Em vez disso, retorne as métricas para o nó do driver. As métricas e os metadados geralmente são pequenos o suficiente para serem transferidos de volta ao driver sem causar problemas de memória.

Pegue o exemplo mostrado acima e atualize-o para log as métricas retornadas de uma tarefa Ray:

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

Para tarefas que exigem salvar artefatos grandes, como uma tabela Pandas grande, imagens, gráficos ou modelos, o site Databricks recomenda manter o artefato como um arquivo. Em seguida, recarregue o artefato no contexto do driver ou diretamente em log o objeto com MLflow, especificando o caminho para o arquivo salvo.

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

registrar Ray tarefa como MLflow child execução

O senhor pode integrar o Ray Core com o site MLflow usando a execução infantil. Isso envolve os seguintes passos:

  1. Criar uma execução principal: Inicializar uma execução pai no processo do driver. Essa execução atua como um contêiner hierárquico para todas as execuções secundárias subsequentes.

  2. Criar execução secundária: Em cada tarefa Ray, inicie uma execução secundária sob a execução principal. Cada execução infantil pode log suas próprias métricas de forma independente.

Para implementar essa abordagem, garanta que cada tarefa Ray receba as credenciais de cliente necessárias e o pai run_id. Essa configuração estabelece a relação hierárquica pai-filho entre a execução. O trecho de código a seguir demonstra como recuperar as credenciais e transmitir o run_id principal:

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray ensinar e MLflow

A maneira mais simples de log os modelos do Ray ensinar para MLflow é usar o ponto de verificação gerado pela execução do treinamento. Após a conclusão da execução do treinamento, recarregue o modelo em sua estrutura nativa de aprendizagem profunda (como PyTorch ou TensorFlow) e, em seguida, log com o código MLflow correspondente.

Essa abordagem garante que o modelo seja armazenado corretamente e esteja pronto para avaliação ou implantação.

logs O código a seguir recarrega um modelo de um ponto de verificação do Ray ensinar e o envia para MLflow:

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Embora geralmente seja uma prática recomendada enviar os objetos de volta ao nó do driver, com o Ray ensinar, salvar os resultados finais é mais fácil do que todo o histórico de treinamento do processo worker.

Para armazenar vários modelos de uma execução de treinamento, especifique o número de pontos de verificação a serem mantidos no campo ray.train.CheckpointConfig. Os modelos podem então ser lidos e registrados da mesma forma que o armazenamento de um único modelo.

Observação

MLflow não é responsável por lidar com a tolerância a falhas durante o treinamento do modelo, mas sim por acompanhar o ciclo de vida do modelo. Em vez disso, a tolerância a falhas é gerenciada pelo próprio Ray ensinar.

Para armazenar as métricas de treinamento especificadas por Ray ensinar, recupere-as do objeto de resultado e armazene-as usando MLflow.

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Para configurar corretamente o Spark e o Ray clusters e evitar problemas de alocação de recursos, o senhor deve ajustar a configuração resources_per_worker. Especificamente, defina o número de CPUs para cada Ray worker como sendo um a menos do que o número total de CPUs disponíveis em um nó de Ray worker. Esse ajuste é crucial porque, se o instrutor reservar todos os núcleos disponíveis para os atores Ray, isso pode levar a erros de contenção de recurso.

Ray Tune e MLflow

A integração do Ray Tune com MLflow permite que o senhor acompanhe e log experimentos de ajuste de hiperparâmetros com eficiência em Databricks. Essa integração aproveita os recursos de acompanhamento de experimentos do MLflowpara registrar métricas e resultados diretamente do Ray tarefa.

Abordagem de execução infantil para extração de madeira

Semelhante ao registro da tarefa do Ray Core, os aplicativos do Ray Tune podem usar uma abordagem de execução filha para log métricas de cada tentativa ou iteração de ajuste. Use os seguintes passos para implementar uma abordagem de execução infantil:

  1. Criar uma execução principal: Inicializar uma execução pai no processo do driver. Essa execução serve como o contêiner principal para todas as execuções secundárias subsequentes.

  2. registrar a execução filha: Cada tarefa do Ray Tune cria uma execução secundária sob a execução principal, mantendo uma hierarquia clara dos resultados do experimento.

O exemplo a seguir demonstra como autenticar e log do Ray Tune tarefa usando MLflow.

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow


mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)


def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1


def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)


def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()


with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)