MLflow と Ray の統合

MLflow は、機械学習と AI ワークロードを管理するためのオープンソースプラットフォームです。 Rayと MLflow を組み合わせると、Rayでワークロードを分散し、トレーニング中に生成されたモデル、メトリクス、パラメーター、メタデータを MLflowで追跡できます。

この記事では、MLflow を次の Ray コンポーネントと統合する方法について説明します。

  • Ray Core: Ray Tune および Ray トレーニングする でカバーされていない汎用分散アプリケーション

  • Ray トレーニングする: Distributed model トレーニング

  • Ray Tune: 分散ハイパーパラメーターチューニング

Ray Core と MLflow の統合

Ray Core は、汎用分散アプリケーションの基本的な構成要素を提供します。 これにより、Pythonの関数とクラスを複数のノードにスケーリングできます。

このセクションでは、Ray Core と MLflow を統合するための次のパターンについて説明します。

  • Ray ドライバー プロセスからの MLflow モデルのログ記録

  • 子実行からの MLflow モデルのログ記録

Ray ドライバー プロセスからの MLflow のログ

一般に、MLflow モデルはワーカー ノードからではなく、ドライバー プロセスからログに記録するのが最適です。 これは、ステートフルな参照をリモートワーカーに渡す際に複雑さが増すためです。

たとえば、次のコードは、MLflow 追跡サーバーがワーカー ノード内からの MLflow Client を使用して初期化されていないため、失敗します。

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

代わりに、メトリクスをドライバーノードに戻します。 メトリクスとメタデータは、通常、メモリの問題を引き起こさずにドライバーに転送できるほど小さいです。

上記の例を例にとり、Ray タスクから返されたメトリクスをログに記録するように更新します。

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

大きな Pandas テーブル、イメージ、プロット、モデルなど、大きなアーティファクトを保存する必要があるタスクの場合、Databricks ではアーティファクトをファイルとして永続化することをお勧めします。 次に、ドライバー コンテキスト内で成果物を再読み込みするか、保存されたファイルへのパスを指定して MLflow を使用してオブジェクトを直接ログに記録します。

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

MLflow の子実行として Ray タスクをログに記録する

子実行を使用して、Ray Core を MLflow と統合できます。 これには、次の手順が含まれます。

  1. 親実行を作成する: ドライバー プロセスで親実行を初期化します。 この実行は、後続のすべての子実行の階層コンテナとして機能します。

  2. 子実行の作成: 各 Ray タスク内で、親実行の下で子実行を開始します。 各子実行は、独自のメトリクスを個別にログに記録できます。

このアプローチを実装するには、各 Ray タスクが必要なクライアント資格情報と親 run_idを受け取ることを確認します。 この設定により、実行間の階層的な親子関係が確立されます。 次のコード スニペットは、資格情報を取得し、親 run_idを渡す方法を示しています。

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray トレーニングする and MLflow

Ray トレーニングする モデルを MLflow にログに記録する最も簡単な方法は、トレーニング 実行によって生成されたチェックポイントを使用することです。 トレーニング 実行が完了したら、ネイティブのディープラーニング フレームワーク ( PyTorch や TensorFlowなど) にモデルを再読み込みし、対応する MLflow コードでログに記録します。

このアプローチにより、モデルが正しく保存され、評価またはデプロイの準備が整います。

次のコードは、Ray トレーニングする checkpoint からモデルを再読み込みし、 MLflowにログを記録します。

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

一般的には、オブジェクトをドライバーノードに戻すのがベストプラクティスですが、Ray トレーニングするを使用すると、ワーカープロセスからのトレーニング履歴全体を保存するよりも最終結果を保存する方が簡単です。

トレーニング実行から複数のモデルを保存するには、 ray.train.CheckpointConfigに保持するチェックポイントの数を指定します。 その後、1 つのモデルを格納するのと同じ方法でモデルを読み取ってログに記録できます。

注:

MLflow は、モデル トレーニング中のフォールト トレランスの処理ではなく、モデルのライフサイクルの追跡を担当します。 フォールトトレランスは、代わりに Ray トレーニングする 自体によって管理されます。

Ray トレーニングするで指定したトレーニング メトリクスを格納するには、result オブジェクトから取得して MLflowを使用して格納します。

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Spark クラスターと Ray クラスターを適切に構成し、リソース割り当ての問題を回避するには、 resources_per_worker 設定を調整する必要があります。 具体的には、各 Ray ワーカーの CPU の数を、Ray ワーカー ノードで使用可能な CPU の合計数より 1 つ少なく設定します。 トレーナーが使用可能なすべてのコアを Ray アクター用に予約すると、リソース競合エラーにつながる可能性があるため、この調整は非常に重要です。

Ray Tune と MLflow

Ray Tune を MLflow と統合すると、 Databricks内でハイパーパラメーターチューニング エクスペリメントを効率的に追跡および記録できます。 この統合は、 MLflowのエクスペリメント追跡機能を活用して、Ray タスクから直接メトリクスと結果を記録します。

ログ記録のための子実行アプローチ

Ray Core タスクからのログ記録と同様に、Ray Tune アプリケーションでは、子実行アプローチを使用して、各トライアルまたはチューニング イテレーションのメトリクスをログに記録できます。 次のステップを使用して、子実行アプローチを実装します。

  1. 親実行を作成する: ドライバー プロセスで親実行を初期化します。 この実行は、後続のすべての子実行のメイン コンテナとして機能します。

  2. Log child 実行: 各 Ray Tune タスクは、エクスペリメント結果の明確な階層を維持しながら、親実行の下に子実行を作成します。

次の例は、MLflow を使用して Ray Tune タスクから認証およびログを記録する方法を示しています。

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow


mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)


def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1


def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)


def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()


with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)