特徴量を使用してモデルをトレーニングする
この記事では、 Unity Catalogでの特徴量エンジニアリングまたは従来のワークスペース Feature Store を使用してモデルをトレーニングする方法について説明します。 まず、使用する特徴とその結合方法を定義するトレーニングデータセットを作成する必要があります。 その後、モデルをトレーニングすると、モデルは特徴への参照を保持します。
Unity Catalogで特徴量エンジニアリングを使用してモデルをトレーニングすると、Catalog Explorer でモデルのリネージを表示できます。 モデルの作成に使用されたテーブルと関数は自動的に追跡され、表示されます。 特徴量のガバナンスのリネージを参照してください。
推論にモデルを使用する場合、 Feature Storeから特徴値を取得するように選択できます。 Feature StoreモデルはMLflow pyfunc インターフェイスとも互換性があるため、MLflow を使用して特徴量テーブルでバッチ推論を実行できます。
モデルはトレーニングに最大 50 個のテーブルと 100 個の関数を使用できます。
トレーニングデータセットを作成する
モデル トレーニングのために特徴量テーブルから特定の特徴を選択するには、 FeatureEngineeringClient.create_training_set
(Unity Catalogでの特徴量エンジニアリングの場合) または FeatureStoreClient.create_training_set
(ワークスペース Feature Store の場合) API と、 FeatureLookup
と呼ばれるオブジェクトを使用してトレーニング データセットを作成します。 FeatureLookup
は、特徴量テーブルの名前、特徴量テーブルと create_training_set
に渡された DataFrame を結合するときに使用するキーなど、トレーニングセットで使用する各特徴量を指定します。詳細については、「 機能ルックアップ 」を参照してください。
FeatureLookup
を作成するときに feature_names
パラメーターを使用します。feature_names
は、トレーニング セットの作成時に特徴テーブル内のすべての特徴量 (主キーを除く) を検索するために、単一の特徴名、特徴名のリスト、または None を取ります。
注
DataFrame 内の lookup_key
列の型と順序は、参照特徴量テーブルの主キー (タイムスタンプ キーを除く) の型と順序と一致する必要があります。
この記事には、両方のバージョンの構文のコード例が含まれています。
この例では、 trainingSet.load_df
によって返される DataFrame には、 feature_lookups
の各フィーチャの列が含まれています。 exclude_columns
を使用して除外された列を除き、 create_training_set
に提供された DataFrame のすべての列を保持します。
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
from databricks.feature_store import FeatureLookup, FeatureStoreClient
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fs = FeatureStoreClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
ルックアップキーが主キーと一致しない場合のTrainingSetの作成
トレーニング セットの列名の FeatureLookup
の引数 lookup_key
を使用します。create_training_set
は、 lookup_key
引数で指定されたトレーニング セットの列間で、特徴テーブルの作成時に主キーが指定された順序を使用して順序付き結合を実行します。
この例では、 recommender_system.customer_features
には次の主キーがあります: customer_id
、 dt
。
recommender_system.product_features
特徴量テーブルには、主キー product_id
があります。
training_df
に次の列がある場合:
cid
transaction_dt
product_id
rating
次のコードは、 TrainingSet
の正しい機能ルックアップを作成します。
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
create_training_set
が呼び出されると、次のコードに示すように、左結合を実行し、(cid
、transaction_dt
) に対応するキー (customer_id
、dt
) を使用してテーブルを結合し、 recommender_system.customer_features
training_df
することでトレーニングデータセットを作成します。
customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
異なる特徴量テーブルから同じ名前の2つの特徴量を含むトレーニングセットを作成する
オプションの引数 output_name
を FeatureLookup
で使用します。 指定された名前は、 TrainingSet.load_df
によって返される DataFrame の機能名の代わりに使用されます。 たとえば、次のコードでは、 training_set.load_df
によって返される DataFrame には、列 customer_height
と product_height
が含まれます。
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
同じ特徴量を複数回使用してTrainingSetを作成する
異なるルックアップ キーで結合された同じ機能を使用してトレーニング セットを作成するには、複数の機能ルックアップを使用します。 フィーチャルックアップ出力ごとに一意の output_name
を使用します。
feature_lookups = [
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]
feature_lookups = [
FeatureLookup(
table_name='taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]
教師なし機械学習モデルのためのTrainingSetを作成する
教師なし学習モデルのトレーニングセットを作成するときに label=None
を設定します。 たとえば、次の TrainingSet を使用して、さまざまな顧客を関心に基づいてグループにクラスター化できます。
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
ビューを特徴量テーブルとして使用したTrainingSetの作成
ビューを特徴量テーブルとして使用するには、 Databricks Runtime 16.0 MLに組み込まれている 0.7.0 以降のバージョンを使用する必要がありますdatabricks-feature-engineering
。
ビューは、ソース Delta テーブルからの単純な SELECT ビューである必要があります。 単純な SELECT ビューは、特徴量テーブルとして使用でき、プライマリ・キーが JOIN、グループ BY、または DISTINCT 句なしで選択されている 内の 1 つの ・テーブルから作成されたビューとして定義されます。DeltaUnity CatalogSQL 文で使用できるキーワードは、SELECT、FROM、WHERE、ORDER BY、LIMIT、および OFFSET です。
次の例では、 ml.recommender_system.customer_table
にプライマリ・キー ( cid
と dt
があります。ここで、 dt
は時系列カラムです。 この例では、データフレームtraining_df
に列 、dt
cid
、label
があることを前提としています。
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
customer_features_df = spark.sql("CREATE OR REPLACE VIEW ml.recommender_system.customer_features AS SELECT cid, dt, pid, rating FROM ml.recommender_system.customer_table WHERE rating > 3")
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['pid', 'rating'],
lookup_key=['cid'],
timestamp_lookup_key='dt'
),
]
fe = FeatureEngineeringClient()
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='label'
)
training_df = training_set.load_df()
モデルをトレーニングし、特徴量テーブルを使用してバッチ推論を実行する
Feature Storeの特徴を使用してモデルをトレーニングすると、モデルは特徴への参照を保持します。推論にモデルを使用する場合、 Feature Storeから特徴値を取得するように選択できます。 モデルで使用されるフィーチャの主キーを指定する必要があります。 モデルは、ワークスペース内の Feature Store から必要な機能を取得します。 その後、スコアリング中に必要に応じて特徴値を結合します。
推論時の特徴検索をサポートするには:
FeatureEngineeringClient
( Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient
(ワークスペース Feature Storeの場合) のlog_model
方法を使用してモデルをログに記録する必要があります。TrainingSet.load_df
から返された DataFrame を使用してモデルをトレーニングする必要があります。 この DataFrame を使用してモデルをトレーニングする前に何らかの方法で変更した場合、推論にモデルを使用するときに変更は適用されません。 これにより、モデルのパフォーマンスが低下します。モデルの種類には、MLflow に対応する
python_flavor
が必要です。 MLflow では、次のようなほとんどの Python モデル トレーニング フレームワークがサポートされています。Scikit-Learn
Keras
PyTorch
SparkML
LightGBM
XGBoost
TensorFlow Keras (
python_flavor
mlflow.keras
を使用)
カスタム MLflow pyfunc モデル
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fe = FeatureEngineeringClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fs = FeatureStoreClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fs.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
model_uri=model_uri,
df=batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
特徴量メタデータでパッケージ化されたモデルをスコアリングするときにカスタム特徴値を使用する
既定では、特徴メタデータと共にパッケージ化されたモデルは、推論時に特徴量テーブルから特徴を検索します。 スコアリングにカスタム特徴量値を使用するには、DataFrame FeatureEngineeringClient.score_batch
(Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient.score_batch
(ワークスペースFeature Store の場合) に渡される にそれらを含めます。
たとえば、次の 2 つの機能を持つモデルをパッケージ化するとします。
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]
推論では、 account_creation_date
という名前の列を含む DataFrame で score_batch
を呼び出すことで、特徴 account_creation_date
のカスタム値を指定できます。この場合、API は Feature Store から num_lifetime_purchases
機能のみを検索し、指定されたカスタム account_creation_date
列の値をモデル スコアリングに使用します。
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)
Feature Storeの特徴量と、Feature Storeの外部に存在するデータの組み合わせを使用してモデルをトレーニングし、スコア付けする
Feature Store フィーチャと外部からのデータを組み合わせてモデル Feature Storeトレーニングする 。特徴メタデータを使用してモデルをパッケージ化すると、モデルは推論のために Feature Store から特徴値を取得します。
モデルをトレーニングするには、DataFrame FeatureEngineeringClient.create_training_set
(Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient.create_training_set
(ワークスペースFeature Store の場合) に渡される に列として追加データを含めます。この例では、 Feature Store から特徴量total_purchases_30d
と外部列browser
を使用します。
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]
fe = FeatureEngineeringClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)
推論では、 FeatureStoreClient.score_batch
で使用される DataFrame には browser
列が含まれている必要があります。
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
model_uri=model_uri,
df=batch_df
)
MLflow を使用してモデルをロードし、バッチ推論を実行する
FeatureEngineeringClient
(Unity Catalog での特徴量エンジニアリングの場合) または FeatureStoreClient
(Workspace Feature Storeの場合) の log_model
メソッドを使用してモデルがログに記録された後、推論で MLflow を使用できるようになります。 MLflow.pyfunc.predict
Feature Store から特徴値を取得し、推論時に提供された値を結合します。 モデルで使用されるフィーチャの主キーを指定する必要があります。
注
MLflow を使用したバッチ推論には、MLflow バージョン 2.11 以降が必要です。 時系列特徴量テーブルを使用するモデルはサポートされていません。 時系列特徴量テーブルを使用してバッチ推論を行うには、 score_batch
を使用します。 トレーニングするモデルを参照し、特徴量テーブルを使用してバッチ推論を実行します。
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model",
#refers to the default value of "result_type" if not provided at inference
params={"result_type":"double"},
)
# Batch inference with MLflow
# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.
# batch_df has columns ‘customer_id’ and ‘product_id’
model = mlflow.pyfunc.load_model(model_version_uri)
# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})
# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)