Unity Catalog上のモデルの例

この例では、Unity Catalog のモデルを使用して、風力発電所の毎日の電力出力を予測する機械学習アプリケーションを構築する方法を示します。 この例では、次の方法を示します。

  • MLflow.

  • 登録する models to Unity Catalog.

  • モデルを記述し、エイリアスを使用して推論のためにデプロイします。

  • 登録したモデルを本番運用アプリケーションと統合します。

  • Unity Catalog でモデルを検索して発見します。

  • モデルを削除します。

この記事では、MLflow トラッキングと Unity Catalog UI と APIsのモデルを使用してこれらのステップを実行する方法について説明します。

要件

「要件」のすべての要件を満たしていることを確認します。さらに、この記事のコード例では、次の特権があることを前提としています。

  • USE CATALOG main カタログに対する特権。

  • CREATE MODEL main.default スキーマに対する USE SCHEMA 権限。

ノートブック

この記事のすべてのコードは、次のノートブックで提供されています。

Unity Catalog上のモデルのサンプルノートブック

ノートブックを新しいタブで開く

MLflow Python クライアントをインストールする

この例では、MLflow Python クライアント バージョン 2.5.0 以降と TensorFlow が必要です。 ノートブックの上部に次のコマンドを追加して、これらの依存関係をインストールします。

%pip install --upgrade "mlflow-skinny[databricks]>=2.5.0" tensorflow
dbutils.library.restartPython()

データセットの読み込み、モデルのトレーニング、Unity Catalogへの登録

このセクションでは、風力発電所のデータセットを読み込み、モデルをトレーニングし、モデルを Unity Catalogに登録する方法について説明します。 モデル トレーニングの実行とメトリックは、 エクスペリメントの実行で追跡されます。

データセットの読み込み

次のコードは、米国の風力発電所の気象データと電力出力情報を含むデータセットを読み込みます。 データセットには、6 時間ごとにサンプリングされた wind directionwind speed、および air temperature フィーチャ ( 00:00に 1 回、 08:00に 1 回、 16:00に 1 回)、および数年にわたる毎日の総出力 (power) が含まれています。

import pandas as pd
wind_farm_data = pd.read_csv("https://github.com/dbczumar/model-registry-demo-notebook/raw/master/dataset/windfarm_data.csv", index_col=0)

def get_training_data():
  training_data = pd.DataFrame(wind_farm_data["2014-01-01":"2018-01-01"])
  X = training_data.drop(columns="power")
  y = training_data["power"]
  return X, y

def get_validation_data():
  validation_data = pd.DataFrame(wind_farm_data["2018-01-01":"2019-01-01"])
  X = validation_data.drop(columns="power")
  y = validation_data["power"]
  return X, y

def get_weather_and_forecast():
  format_date = lambda pd_date : pd_date.date().strftime("%Y-%m-%d")
  today = pd.Timestamp('today').normalize()
  week_ago = today - pd.Timedelta(days=5)
  week_later = today + pd.Timedelta(days=5)

  past_power_output = pd.DataFrame(wind_farm_data)[format_date(week_ago):format_date(today)]
  weather_and_forecast = pd.DataFrame(wind_farm_data)[format_date(week_ago):format_date(week_later)]
  if len(weather_and_forecast) < 10:
    past_power_output = pd.DataFrame(wind_farm_data).iloc[-10:-5]
    weather_and_forecast = pd.DataFrame(wind_farm_data).iloc[-10:]

  return weather_and_forecast.drop(columns="power"), past_power_output["power"]

Unity Catalog上のモデルにアクセスできるようにMLflowクライアントを構成する

デフォルトにより、MLflow Python クライアントは、Databricks のワークスペース モデルレジストリにモデルを作成します。 Unity Catalogのモデルにアップグレードするには、 Unity Catalogのモデルにアクセスするようにクライアントを構成します。

import mlflow
mlflow.set_registry_uri("databricks-uc")

モデルをトレーニングし登録する

次のコード トレーニングする TensorFlow Keras を使用してデータセット内の気象特徴に基づいて電力出力を予測し、MLflow APIs を使用して適合モデルを Unity Catalogに登録します。

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

MODEL_NAME = "main.default.wind_forecasting"

def train_and_register_keras_model(X, y):
  with mlflow.start_run():
    model = Sequential()
    model.add(Dense(100, input_shape=(X.shape[-1],), activation="relu", name="hidden_layer"))
    model.add(Dense(1))
    model.compile(loss="mse", optimizer="adam")

    model.fit(X, y, epochs=100, batch_size=64, validation_split=.2)
    example_input = X[:10].to_numpy()
    mlflow.tensorflow.log_model(
        model,
        artifact_path="model",
        input_example=example_input,
        registered_model_name=MODEL_NAME
    )
  return model

X_train, y_train = get_training_data()
model = train_and_register_keras_model(X_train, y_train)

UIでモデルを表示する

カタログ エクスプローラーを使用して、Unity Catalog で登録済みのモデルとモデル バージョンを表示および管理できます。作成したモデルを main カタログと default スキーマで探します。

登録済みモデルページ

推論用のモデルバージョンのデプロイ

Unity Catalogのモデルは、モデルのデプロイメントのエイリアスをサポートしています。 エイリアスは、登録済みモデルの特定のバージョンへの変更可能な名前付き参照 ("Champion" や "Challenger" など) を提供します。 下流の推論ワークフローでこれらのエイリアスを使用して、モデルのバージョンを参照およびターゲットにすることができます。

カタログエクスプローラで登録済みのモデルに移動したら、「 エイリアス 」列の下をクリックして「チャンピオン」エイリアスを最新のモデルバージョンに割り当て、「続行」を押して変更を保存します。

登録済みモデルのエイリアスを設定する

APIを使用したモデルバージョンの読み込み

MLflow モデル コンポーネントは、複数の機械学習フレームワークからモデルを読み込むための関数を定義します。 たとえば、mlflow.tensorflow.load_model() は MLflow 形式で保存された TensorFlow モデルをロードするために使用され、 mlflow.sklearn.load_model() は MLflow 形式で保存された Scikit-Learn モデルをロードするために使用されます。

これらの関数は、 Unity Catalogのモデルからモデルを読み込むことができます。

import mlflow.pyfunc

model_version_uri = "models:/{model_name}/1".format(model_name=MODEL_NAME)

print("Loading registered model version from URI: '{model_uri}'".format(model_uri=model_version_uri))
model_version_1 = mlflow.pyfunc.load_model(model_version_uri)

model_champion_uri = "models:/{model_name}@Champion".format(model_name=MODEL_NAME)

print("Loading registered model version from URI: '{model_uri}'".format(model_uri=model_champion_uri))
champion_model = mlflow.pyfunc.load_model(model_champion_uri)

チャンピオンモデルで出力を予測

このセクションでは、チャンピオン モデルを使用して、風力発電所の天気予報データを評価します。 forecast_power() アプリケーションは、指定されたステージから最新バージョンの予測モデルを読み込み、それを使用して今後 5 日間の電力本番運用を予測します。

from mlflow.tracking import MlflowClient

def plot(model_name, model_alias, model_version, power_predictions, past_power_output):
  import matplotlib.dates as mdates
  from matplotlib import pyplot as plt
  index = power_predictions.index
  fig = plt.figure(figsize=(11, 7))
  ax = fig.add_subplot(111)
  ax.set_xlabel("Date", size=20, labelpad=20)
  ax.set_ylabel("Power\noutput\n(MW)", size=20, labelpad=60, rotation=0)
  ax.tick_params(axis='both', which='major', labelsize=17)
  ax.xaxis.set_major_formatter(mdates.DateFormatter('%m/%d'))
  ax.plot(index[:len(past_power_output)], past_power_output, label="True", color="red", alpha=0.5, linewidth=4)
  ax.plot(index, power_predictions.squeeze(), "--", label="Predicted by '%s'\nwith alias '%s' (Version %d)" % (model_name, model_alias, model_version), color="blue", linewidth=3)
  ax.set_ylim(ymin=0, ymax=max(3500, int(max(power_predictions.values) * 1.3)))
  ax.legend(fontsize=14)
  plt.title("Wind farm power output and projections", size=24, pad=20)
  plt.tight_layout()
  display(plt.show())

def forecast_power(model_name, model_alias):
  import pandas as pd
  client = MlflowClient()
  model_version = client.get_model_version_by_alias(model_name, model_alias).version
  model_uri = "models:/{model_name}@{model_alias}".format(model_name=MODEL_NAME, model_alias=model_alias)
  model = mlflow.pyfunc.load_model(model_uri)
  weather_data, past_power_output = get_weather_and_forecast()
  power_predictions = pd.DataFrame(model.predict(weather_data))
  power_predictions.index = pd.to_datetime(weather_data.index)
  print(power_predictions)
  plot(model_name, model_alias, int(model_version), power_predictions, past_power_output)

forecast_power(MODEL_NAME, "Champion")

APIを使用したモデルとモデル バージョンの説明の追加

このセクションのコードは、MLflow API を使用してモデルとモデル バージョンの説明を追加する方法を示しています。

client = MlflowClient()
client.update_registered_model(
  name=MODEL_NAME,
  description="This model forecasts the power output of a wind farm based on weather data. The weather data consists of three features: wind speed, wind direction, and air temperature."
)

client.update_model_version(
  name=MODEL_NAME,
  version=1,
  description="This model version was built using TensorFlow Keras. It is a feed-forward neural network with one hidden layer."
)

新しいモデルバージョンを作成する

従来の機械学習手法は、電力予測にも効果的です。 次のコードは、 Scikit-Learn を使用してランダムフォレストモデルをトレーニングし、 mlflow.sklearn.log_model() 関数を使用して Unity Catalog に登録します。

import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

with mlflow.start_run():
  n_estimators = 300
  mlflow.log_param("n_estimators", n_estimators)

  rand_forest = RandomForestRegressor(n_estimators=n_estimators)
  rand_forest.fit(X_train, y_train)

  val_x, val_y = get_validation_data()
  mse = mean_squared_error(rand_forest.predict(val_x), val_y)
  print("Validation MSE: %d" % mse)
  mlflow.log_metric("mse", mse)

  example_input = val_x.iloc[[0]]

  # Specify the `registered_model_name` parameter of the `mlflow.sklearn.log_model()`
  # function to register the model to <UC>. This automatically
  # creates a new model version
  mlflow.sklearn.log_model(
    sk_model=rand_forest,
    artifact_path="sklearn-model",
    input_example=example_input,
    registered_model_name=MODEL_NAME
  )

新しいモデルのバージョン番号を取得する

次のコードは、モデル名の最新のモデル バージョン番号を取得する方法を示しています。

client = MlflowClient()
model_version_infos = client.search_model_versions("name = '%s'" % MODEL_NAME)
new_model_version = max([model_version_info.version for model_version_info in model_version_infos])

新しいモデルバージョンに説明を追加する

client.update_model_version(
  name=MODEL_NAME,
  version=new_model_version,
  description="This model version is a random forest containing 100 decision trees that was trained in scikit-learn."
)

新しいモデルバージョンをチャレンジャーとしてマークし、モデルをテストする

運用トラフィックを処理するモデルをデプロイする前に、運用データのサンプルでテストすることをお勧めします。 以前は、"Champion" エイリアスを使用して、運用環境のワークロードの大部分に対応するモデル バージョンを示していました。 次のコードは、"Challenger" エイリアスを新しいモデル バージョンに割り当て、そのパフォーマンスを評価します。

client.set_registered_model_alias(
  name=MODEL_NAME,
  alias="Challenger",
  version=new_model_version
)

forecast_power(MODEL_NAME, "Challenger")

新しいモデル バージョンをチャンピオンモデルバージョンとしてデプロイする

次のコードでは、新しいモデル バージョンがテストで適切に動作することを確認した後、"チャンピオン" エイリアスを新しいモデル バージョンに割り当て、「 チャンピオン モデルで電力出力を予測する 」セクションのまったく同じアプリケーション コードを使用して、電力予測を生成します。

client.set_registered_model_alias(
  name=MODEL_NAME,
  alias="Champion",
  version=new_model_version
)

forecast_power(MODEL_NAME, "Champion")

予測モデルには、Keras モデルでトレーニングされたモデル バージョンと Scikit-Learnでトレーニングされたバージョンの 2 つのモデル バージョンがあります。 "Challenger" エイリアスは新しい Scikit-Learn モデル バージョンに割り当てられたままになるため、"Challenger" モデル バージョンを対象とするダウンストリーム ワークロードは引き続き正常に実行されることに注意してください。

製品モデルのバージョン

モデルの削除

モデル バージョンが使用されなくなったら、削除できます。 登録済みのモデル全体を削除することもできます。これにより、関連付けられているすべてのモデル バージョンが削除されます。 モデルバージョンを削除すると、モデルバージョンに割り当てられているエイリアスがすべてクリアされることに注意してください。

MLflow APIを使用したVersion 1の削除

client.delete_model_version(
   name=MODEL_NAME,
   version=1,
)

MLflow APIを使用してモデルを削除する

client = MlflowClient()
client.delete_registered_model(name=MODEL_NAME)