特徴を使用してモデルをトレーニングする

この記事では、 Unity Catalogでの特徴量エンジニアリングまたはローカル ワークスペース Feature Storeを使用してモデルをトレーニングする方法について説明します。 最初に、使用するフィーチャとその結合方法を定義するトレーニング データセットを作成する必要があります。 その後、モデルをトレーニングする と、モデルはフィーチャへの参照を保持します。

Unity Catalog で特徴量エンジニアリングを使用してモデルをトレーニングすると、カタログ エクスプローラーでモデルのリネージを表示できます。 モデルの作成に使用されたテーブルと関数は、自動的に追跡され、表示されます。 「 リネージ Feature Store 表示」を参照してください。

推論にモデルを使用する場合、 Feature Storeから特徴値を取得するように選択できます。 Feature StoreモデルはMLflow pyfunc インターフェイスとも互換性があるため、MLflow を使用して特徴量テーブルでバッチ推論を実行できます。

トレーニング データセット を作成する

モデル トレーニングのために特徴量テーブルから特定の特徴を選択するには、 FeatureEngineeringClient.create_training_set (Unity Catalogでの特徴量エンジニアリングの場合) または FeatureStoreClient.create_training_set (ワークスペース Feature Store の場合) API と、 FeatureLookupと呼ばれるオブジェクトを使用してトレーニング データセットを作成します。 FeatureLookupは、特徴量テーブルの名前、特徴量テーブルと create_training_setに渡された DataFrame を結合するときに使用するキーなど、トレーニングセットで使用する各特徴量を指定します。詳細については、「 機能ルックアップ 」を参照してください。

FeatureLookupを作成するときに feature_names パラメーターを使用します。feature_names は、トレーニング セットの作成時に特徴テーブル内のすべての特徴量 (主キーを除く) を検索するために、単一の特徴名、特徴名のリスト、または None を取ります。

DataFrame 内の lookup_key 列の型と順序は、参照特徴量テーブルの主キー (タイムスタンプ キーを除く) の型と順序と一致する必要があります。

この記事には、両方のバージョンの構文のコード例が含まれています。

この例では、 trainingSet.load_df によって返される DataFrame には、 feature_lookupsの各フィーチャの列が含まれています。 exclude_columnsを使用して除外された列を除き、 create_training_set に提供された DataFrame のすべての列を保持します。

from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fe = FeatureEngineeringClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()
from databricks.feature_store import FeatureLookup, FeatureStoreClient

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()

ルックアップキーが主キー と一致しない場合のトレーニングセットの作成

トレーニング セットの列名の FeatureLookup の引数 lookup_key を使用します。create_training_set は、 lookup_key 引数で指定されたトレーニング セットの列間で、特徴テーブルの作成時に主キーが指定された順序を使用して順序付き結合を実行します。

この例では、 recommender_system.customer_features には次の主キーがあります: customer_iddt

recommender_system.product_features 特徴量テーブルには、主キー product_idがあります。

training_df に次の列がある場合:

  • cid

  • transaction_dt

  • product_id

  • rating

次のコードは、 TrainingSetの正しい機能ルックアップを作成します。

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

create_training_set が呼び出されると、次のコードに示すように、左結合を実行し、(cidtransaction_dt) に対応するキー (customer_iddt) を使用してテーブルを結合し、 recommender_system.customer_features training_df することでトレーニングデータセットを作成します。

customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

異なる特徴テーブル から同じ名前の2つの特徴を含むトレーニングセットを作成する

オプションの引数 output_nameFeatureLookupで使用します。 指定された名前は、 TrainingSet.load_dfによって返される DataFrame の機能名の代わりに使用されます。 たとえば、次のコードでは、 training_set.load_df によって返される DataFrame には、列 customer_heightproduct_heightが含まれます。

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]

fe = FeatureEngineeringClient()

with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()

同じ特徴量を複数回 使用してトレーニングセットを作成する

異なるルックアップ キーで結合された同じ機能を使用してトレーニング セットを作成するには、複数の機能ルックアップを使用します。 フィーチャルックアップ出力ごとに一意の output_name を使用します。

feature_lookups = [
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]
feature_lookups = [
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]

教師なし機械学習モデルの トレーニングセットを作成する

教師なし学習モデルのトレーニングセットを作成するときに label=None を設定します。 たとえば、次の TrainingSet を使用して、さまざまな顧客を関心に基づいてグループにクラスター化できます。

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]

fe = FeatureEngineeringClient()
with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )

  training_df = training_set.load_df()
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]

fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )

  training_df = training_set.load_df()

モデルをトレーニングし、特徴テーブル を使用してバッチ推論を実行する

Feature Storeの特徴を使用してモデルをトレーニングすると、モデルは特徴への参照を保持します。推論にモデルを使用する場合、 Feature Storeから特徴値を取得するように選択できます。 モデルで使用されるフィーチャの主キーを指定する必要があります。 モデルは、ワークスペース内の Feature Store から必要な機能を取得します。 その後、スコアリング中に必要に応じて特徴値を結合します。

推論時の特徴検索をサポートするには:

  • FeatureEngineeringClient ( Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient (ワークスペース Feature Storeの場合) のlog_model方法を使用してモデルをログに記録する必要があります。

  • TrainingSet.load_df から返された DataFrame を使用してモデルをトレーニングする必要があります。 この DataFrame を使用してモデルをトレーニングする前に何らかの方法で変更した場合、推論にモデルを使用するときに変更は適用されません。 これにより、モデルのパフォーマンスが低下します。

  • モデルの種類には、MLflow に対応する python_flavor が必要です。 MLflow では、次のようなほとんどの Python モデル トレーニング フレームワークがサポートされています。

    • Scikit-Learn

    • Keras

    • PyTorch

    • スパークML

    • ライトGBM

    • XGBoost

    • TensorFlow Keras ( python_flavor mlflow.kerasを使用)

  • カスタム MLflow pyfunc モデル

# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fe = FeatureEngineeringClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fe = FeatureEngineeringClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fe.score_batch(
    model_uri=model_uri,
    df=batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
    model_uri=model_uri,
    df=batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

特徴メタデータ でパッケージ化されたモデルをスコアリングするときにカスタム特徴値を使用する

既定では、特徴メタデータと共にパッケージ化されたモデルは、推論時に特徴量テーブルから特徴を検索します。 スコアリングにカスタム特徴量値を使用するには、DataFrame FeatureEngineeringClient.score_batch(Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient.score_batch (ワークスペースFeature Store の場合) に渡される にそれらを含めます。

たとえば、次の 2 つの機能を持つモデルをパッケージ化するとします。

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]

推論では、 account_creation_dateという名前の列を含む DataFrame で score_batch を呼び出すことで、特徴 account_creation_date のカスタム値を指定できます。この場合、API は Feature Store から num_lifetime_purchases 機能のみを検索し、指定されたカスタム account_creation_date 列の値をモデル スコアリングに使用します。

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)

Feature Store 特徴と外部に存在するデータの組み合わせを使用してモデルをトレーニングし、スコア付け Feature Store

Feature Store フィーチャと外部からのデータを組み合わせてモデル Feature Storeトレーニングする 。特徴メタデータを使用してモデルをパッケージ化すると、モデルは推論のために Feature Store から特徴値を取得します。

モデルをトレーニングするには、DataFrame FeatureEngineeringClient.create_training_set(Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient.create_training_set (ワークスペースFeature Store の場合) に渡される に列として追加データを含めます。この例では、 Feature Store から特徴量total_purchases_30dと外部列browserを使用します。

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]

fe = FeatureEngineeringClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)

推論では、 FeatureStoreClient.score_batch で使用される DataFrame には browser 列が含まれている必要があります。

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
  model_uri=model_uri,
  df=batch_df
)
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri=model_uri,
  df=batch_df
)

MLflow を使用してモデルをロードし、バッチ推論を実行する

FeatureEngineeringClient (Unity Catalog での特徴量エンジニアリングの場合) または FeatureStoreClient (Workspace Feature Storeの場合) の log_model メソッドを使用してモデルがログに記録された後、推論で MLflow を使用できるようになります。 MLflow.pyfunc.predict Feature Store から特徴値を取得し、推論時に提供された値を結合します。 モデルで使用されるフィーチャの主キーを指定する必要があります。

MLflow によるバッチ推論には、MLflow バージョン 2.11 以降が必要です。

# Train model
import mlflow
from sklearn import linear_model


feature_lookups = [
  FeatureLookup(
    table_name='ml.recommender_system.customer_features',
    feature_names=['total_purchases_30d'],
    lookup_key='customer_id',
  ),
  FeatureLookup(
    table_name='ml.recommender_system.product_features',
    feature_names=['category'],
    lookup_key='product_id'
  )
]


fe = FeatureEngineeringClient()


with mlflow.start_run():


  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )


  training_df = training_set.load_df().toPandas()


  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating


  model = linear_model.LinearRegression().fit(X_train, y_train)


  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model",
    #refers to the default value of "result_type" if not provided at inference
    params={"result_type":"double"},
  )


# Batch inference with MLflow


# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.


# batch_df has columns ‘customer_id’ and ‘product_id’
model = mlflow.pyfunc.load_model(model_version_uri)


# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})


# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)