Use features to train models

This article describes how you can train models using Feature Engineering in Unity Catalog or the legacy Workspace Feature Store. You must first create a training dataset, which defines the features to use and how to join them. Then, when you train a model, the model retains references to the features.

When you train a model using Feature Engineering in Unity Catalog, you can view the model’s lineage in Catalog Explorer. Tables and functions that were used to create the model are automatically tracked and displayed. See Feature governance and lineage.

When you use the model for inference, you can choose to have it retrieve feature values from the feature store. Feature store models are also compatible with the MLflow pyfunc interface, so you can use MLflow to perform batch inference with feature tables.

A model can use at most 50 tables and 100 functions for training.

Create a training dataset

To select specific features from a feature table for model training, you create a training dataset using the FeatureEngineeringClient.create_training_set (for Feature Engineering in Unity Catalog) or FeatureStoreClient.create_training_set (for Workspace Feature Store) API and an object called a FeatureLookup. A FeatureLookup specifies each feature to use in the training set, including the name of the feature table, the name(s) of the features, and the key(s) to use when joining the feature table with the DataFrame passed to create_training_set. See Feature Lookup for more information.

Use the feature_names parameter when you create a FeatureLookup. feature_names takes a single feature name, a list of feature names, or None to look up all features (excluding primary keys) in the feature table at the time that the training set is created.

Note

The type and order of lookup_key columns in the DataFrame must match the type and order of the primary keys (excluding timestamp keys) of the reference feature table.

This article includes code examples for both versions of the syntax.

In this example, the DataFrame returned by trainingSet.load_df contains a column for each feature in feature_lookups. It preserves all columns of the DataFrame provided to create_training_set except those excluded using exclude_columns.

from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fe = FeatureEngineeringClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()
from databricks.feature_store import FeatureLookup, FeatureStoreClient

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()

Create a TrainingSet when lookup keys do not match the primary keys

Use the argument lookup_key in the FeatureLookup for the column name in the training set. create_training_set performs an ordered join between the columns from the training set specified in the lookup_key argument using the order in which the primary keys were specified when the feature table was created.

In this example, recommender_system.customer_features has the following primary keys: customer_id, dt.

The recommender_system.product_features feature table has primary key product_id.

If the training_df has the following columns:

  • cid

  • transaction_dt

  • product_id

  • rating

the following code will create the correct feature lookups for the TrainingSet:

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

When create_training_set is called, it creates a training dataset by performing a left join, joining the tables recommender_system.customer_features and training_df using the keys (customer_id,dt) corresponding to (cid,transaction_dt), as shown in the following code:

customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

Create a TrainingSet containing two features with the same name from different feature tables

Use the optional argument output_name in the FeatureLookup. The name provided is used in place of the feature name in the DataFrame returned by TrainingSet.load_df. For example, with the following code, the DataFrame returned by training_set.load_df includes columns customer_height and product_height.

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]

fe = FeatureEngineeringClient()

with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()

Create a TrainingSet using the same feature multiple times

To create a TrainingSet using the same feature joined by different lookup keys, use multiple FeatureLookups. Use a unique output_name for each FeatureLookup output.

feature_lookups = [
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]
feature_lookups = [
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]

Create a TrainingSet for unsupervised machine learning models

Set label=None when creating a TrainingSet for unsupervised learning models. For example, the following TrainingSet can be used to cluster different customers into groups based on their interests:

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]

fe = FeatureEngineeringClient()
with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )

  training_df = training_set.load_df()
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]

fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )

  training_df = training_set.load_df()

Create a TrainingSet when using a view as a feature table

To use a view as a feature table, you must use databricks-feature-engineering version 0.7.0 or above, which is built into Databricks Runtime 16.0 ML.

The view must be a simple SELECT view from the source Delta table. A simple SELECT view is defined as a view created from a single Delta table in Unity Catalog that can be used as a feature table, and whose primary keys are selected without JOIN, GROUP BY, or DISTINCT clauses. Acceptable keywords in the SQL statement are SELECT, FROM, WHERE, ORDER BY, LIMIT, and OFFSET.

In the following example, ml.recommender_system.customer_table has primary keys cid and dt, where dt is a timeseries column. The example assumes that the dataframe training_df has columns cid, dt, and label:

from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

customer_features_df = spark.sql("CREATE OR REPLACE VIEW ml.recommender_system.customer_features AS SELECT cid, dt, pid, rating FROM ml.recommender_system.customer_table WHERE rating > 3")

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['pid', 'rating'],
      lookup_key=['cid'],
      timestamp_lookup_key='dt'
    ),
]

fe = FeatureEngineeringClient()

training_set = fe.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='label'
)

training_df = training_set.load_df()

Train models and perform batch inference with feature tables

When you train a model using features from Feature Store, the model retains references to the features. When you use the model for inference, you can choose to have it retrieve feature values from Feature Store. You must provide the primary key(s) of the features used in the model. The model retrieves the features it requires from Feature Store in your workspace. It then joins the feature values as needed during scoring.

To support feature lookup at inference time:

  • You must log the model using the log_model method of FeatureEngineeringClient (for Feature Engineering in Unity Catalog) or FeatureStoreClient (for Workspace Feature Store).

  • You must use the DataFrame returned by TrainingSet.load_df to train the model. If you modify this DataFrame in any way before using it to train the model, the modifications are not applied when you use the model for inference. This decreases the performance of the model.

  • The model type must have a corresponding python_flavor in MLflow. MLflow supports most Python model training frameworks, including:

    • scikit-learn

    • keras

    • PyTorch

    • SparkML

    • LightGBM

    • XGBoost

    • TensorFlow Keras (using the python_flavor mlflow.keras)

  • Custom MLflow pyfunc models

# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fe = FeatureEngineeringClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fe = FeatureEngineeringClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fe.score_batch(
    model_uri=model_uri,
    df=batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
    model_uri=model_uri,
    df=batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

Use custom feature values when scoring a model packaged with feature metadata

By default, a model packaged with feature metadata looks up features from feature tables at inference. To use custom feature values for scoring, include them in the DataFrame passed to FeatureEngineeringClient.score_batch (for Feature Engineering in Unity Catalog) or FeatureStoreClient.score_batch (for Workspace Feature Store).

For example, suppose you package a model with these two features:

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]

At inference, you can provide custom values for the feature account_creation_date by calling score_batch on a DataFrame that includes a column named account_creation_date. In this case the API looks up only the num_lifetime_purchases feature from Feature Store and uses the provided custom account_creation_date column values for model scoring.

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)

Train and score a model using a combination of Feature Store features and data residing outside Feature Store

You can train a model using a combination of Feature Store features and data from outside Feature Store. When you package the model with feature metadata, the model retrieves feature values from Feature Store for inference.

To train a model, include the extra data as columns in the DataFrame passed to FeatureEngineeringClient.create_training_set (for Feature Engineering in Unity Catalog) or FeatureStoreClient.create_training_set (for Workspace Feature Store). This example uses the feature total_purchases_30d from Feature Store and the external column browser.

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]

fe = FeatureEngineeringClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)

At inference, the DataFrame used in FeatureStoreClient.score_batch must include the browser column.

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
  model_uri=model_uri,
  df=batch_df
)
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri=model_uri,
  df=batch_df
)

Load models and perform batch inference using MLflow

After a model has been logged using the log_model method of FeatureEngineeringClient (for Feature Engineering in Unity Catalog) or FeatureStoreClient (for Workspace Feature Store), MLflow can be used at inference. MLflow.pyfunc.predict retrieves feature values from Feature Store and also joins any values provided at inference time. You must provide the primary key(s) of the features used in the model.

Note

Batch inference with MLflow requires MLflow version 2.11 and above. Models that use time series feature tables are not supported. To do batch inference with time series feature tables, use score_batch. See Train models and perform batch inference with feature tables.

# Train model
import mlflow
from sklearn import linear_model


feature_lookups = [
  FeatureLookup(
    table_name='ml.recommender_system.customer_features',
    feature_names=['total_purchases_30d'],
    lookup_key='customer_id',
  ),
  FeatureLookup(
    table_name='ml.recommender_system.product_features',
    feature_names=['category'],
    lookup_key='product_id'
  )
]


fe = FeatureEngineeringClient()


with mlflow.start_run():


  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )


  training_df = training_set.load_df().toPandas()


  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating


  model = linear_model.LinearRegression().fit(X_train, y_train)


  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model",
    #refers to the default value of "result_type" if not provided at inference
    params={"result_type":"double"},
  )


# Batch inference with MLflow


# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.


# batch_df has columns ‘customer_id’ and ‘product_id’
model = mlflow.pyfunc.load_model(model_version_uri)


# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})


# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)

Handle missing feature values

When a non-existent lookup key is passed to the model for prediction, the feature value fetched by FeatureLookup is NaN.