Use features to train models
This article describes how you can train models using Feature Engineering in Unity Catalog or the legacy Workspace Feature Store. You must first create a training dataset, which defines the features to use and how to join them. Then, when you train a model, the model retains references to the features.
When you train a model using Feature Engineering in Unity Catalog, you can view the model’s lineage in Catalog Explorer. Tables and functions that were used to create the model are automatically tracked and displayed. See Feature governance and lineage.
When you use the model for inference, you can choose to have it retrieve feature values from the feature store. Feature store models are also compatible with the MLflow pyfunc interface, so you can use MLflow to perform batch inference with feature tables.
A model can use at most 50 tables and 100 functions for training.
Create a training dataset
To select specific features from a feature table for model training, you create a training dataset using the FeatureEngineeringClient.create_training_set
(for Feature Engineering in Unity Catalog) or FeatureStoreClient.create_training_set
(for Workspace Feature Store) API and an object called a FeatureLookup
. A FeatureLookup
specifies each feature to use in the training set, including the name of the feature table, the name(s) of the features, and the key(s) to use when joining the feature table with the DataFrame passed to create_training_set
. See Feature Lookup for more information.
Use the feature_names
parameter when you create a FeatureLookup
.
feature_names
takes a single feature name, a list of feature names, or None to look up all features (excluding primary keys) in the feature table at the time that the training set is created.
Note
The type and order of lookup_key
columns in the DataFrame must match the type and order of the primary keys (excluding timestamp keys) of the reference feature table.
This article includes code examples for both versions of the syntax.
In this example, the DataFrame returned by trainingSet.load_df
contains a column for each feature in feature_lookups
. It preserves all columns of the DataFrame provided to create_training_set
except those excluded using exclude_columns
.
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
from databricks.feature_store import FeatureLookup, FeatureStoreClient
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fs = FeatureStoreClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
Create a TrainingSet when lookup keys do not match the primary keys
Use the argument lookup_key
in the FeatureLookup
for the column name in the training set. create_training_set
performs an ordered join between the columns from the training set specified in the lookup_key
argument using the order in which the primary keys were specified when the feature table was created.
In this example, recommender_system.customer_features
has the following primary keys: customer_id
, dt
.
The recommender_system.product_features
feature table has primary key product_id
.
If the training_df
has the following columns:
cid
transaction_dt
product_id
rating
the following code will create the correct feature lookups for the TrainingSet
:
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
When create_training_set
is called, it creates a training dataset by performing a left join, joining the tables recommender_system.customer_features
and training_df
using the keys (customer_id
,dt
) corresponding to (cid
,transaction_dt
), as shown in the following code:
customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
Create a TrainingSet containing two features with the same name from different feature tables
Use the optional argument output_name
in the FeatureLookup
. The name provided is used in place of the feature name in the DataFrame returned by TrainingSet.load_df
. For example, with the following code, the DataFrame returned by training_set.load_df
includes columns customer_height
and product_height
.
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
Create a TrainingSet using the same feature multiple times
To create a TrainingSet using the same feature joined by different lookup keys, use multiple FeatureLookups.
Use a unique output_name
for each FeatureLookup output.
feature_lookups = [
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]
feature_lookups = [
FeatureLookup(
table_name='taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]
Create a TrainingSet for unsupervised machine learning models
Set label=None
when creating a TrainingSet for unsupervised learning models. For example, the following TrainingSet can
be used to cluster different customers into groups based on their interests:
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
Create a TrainingSet when using a view as a feature table
To use a view as a feature table, you must use databricks-feature-engineering
version 0.7.0 or above, which is built into Databricks Runtime 16.0 ML.
The view must be a simple SELECT view from the source Delta table. A simple SELECT view is defined as a view created from a single Delta table in Unity Catalog that can be used as a feature table, and whose primary keys are selected without JOIN, GROUP BY, or DISTINCT clauses. Acceptable keywords in the SQL statement are SELECT, FROM, WHERE, ORDER BY, LIMIT, and OFFSET.
In the following example, ml.recommender_system.customer_table
has primary keys cid
and dt
, where dt
is a timeseries column. The example assumes that the dataframe training_df
has columns cid
, dt
, and label
:
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
customer_features_df = spark.sql("CREATE OR REPLACE VIEW ml.recommender_system.customer_features AS SELECT cid, dt, pid, rating FROM ml.recommender_system.customer_table WHERE rating > 3")
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['pid', 'rating'],
lookup_key=['cid'],
timestamp_lookup_key='dt'
),
]
fe = FeatureEngineeringClient()
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='label'
)
training_df = training_set.load_df()
Train models and perform batch inference with feature tables
When you train a model using features from Feature Store, the model retains references to the features. When you use the model for inference, you can choose to have it retrieve feature values from Feature Store. You must provide the primary key(s) of the features used in the model. The model retrieves the features it requires from Feature Store in your workspace. It then joins the feature values as needed during scoring.
To support feature lookup at inference time:
You must log the model using the
log_model
method ofFeatureEngineeringClient
(for Feature Engineering in Unity Catalog) orFeatureStoreClient
(for Workspace Feature Store).You must use the DataFrame returned by
TrainingSet.load_df
to train the model. If you modify this DataFrame in any way before using it to train the model, the modifications are not applied when you use the model for inference. This decreases the performance of the model.The model type must have a corresponding
python_flavor
in MLflow. MLflow supports most Python model training frameworks, including:scikit-learn
keras
PyTorch
SparkML
LightGBM
XGBoost
TensorFlow Keras (using the
python_flavor
mlflow.keras
)
Custom MLflow pyfunc models
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fe = FeatureEngineeringClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fs = FeatureStoreClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fs.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
model_uri=model_uri,
df=batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
Use custom feature values when scoring a model packaged with feature metadata
By default, a model packaged with feature metadata looks up features from feature tables at inference. To use custom feature values for scoring, include them in the DataFrame passed to FeatureEngineeringClient.score_batch
(for Feature Engineering in Unity Catalog) or FeatureStoreClient.score_batch
(for Workspace Feature Store).
For example, suppose you package a model with these two features:
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]
At inference, you can provide custom values for the feature account_creation_date
by calling score_batch
on a DataFrame that includes a column named account_creation_date
. In this case the API looks up only the num_lifetime_purchases
feature from Feature Store and uses the provided custom account_creation_date
column values for model scoring.
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)
Train and score a model using a combination of Feature Store features and data residing outside Feature Store
You can train a model using a combination of Feature Store features and data from outside Feature Store. When you package the model with feature metadata, the model retrieves feature values from Feature Store for inference.
To train a model, include the extra data as columns in the DataFrame passed to FeatureEngineeringClient.create_training_set
(for Feature Engineering in Unity Catalog) or FeatureStoreClient.create_training_set
(for Workspace Feature Store). This example uses the feature total_purchases_30d
from Feature Store and the external column browser
.
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]
fe = FeatureEngineeringClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)
At inference, the DataFrame used in FeatureStoreClient.score_batch
must include the browser
column.
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
model_uri=model_uri,
df=batch_df
)
Load models and perform batch inference using MLflow
After a model has been logged using the log_model
method of FeatureEngineeringClient
(for Feature Engineering in Unity Catalog) or FeatureStoreClient
(for Workspace Feature Store), MLflow can be used at inference. MLflow.pyfunc.predict
retrieves feature values from Feature Store and also joins any values provided at inference time. You must provide the primary key(s) of the features used in the model.
Note
Batch inference with MLflow requires MLflow version 2.11 and above. Models that use time series feature tables are not supported. To do batch inference with time series feature tables, use score_batch
. See Train models and perform batch inference with feature tables.
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model",
#refers to the default value of "result_type" if not provided at inference
params={"result_type":"double"},
)
# Batch inference with MLflow
# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.
# batch_df has columns ‘customer_id’ and ‘product_id’
model = mlflow.pyfunc.load_model(model_version_uri)
# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})
# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)