Train models using the Databricks Feature Store

This article describes how you can train models using features from the Databricks Feature Store. You must first create a training dataset, which defines the features to use and how to join them. Then, when you train a model, the model retains references to the features.

When you use the model for inference, you can choose to have it retrieve feature values from Feature Store. You can also serve the model with either Serverless Real-Time Inference or Classic MLflow Model Serving on Databricks and it will automatically lookup features published to online stores.

Create a training dataset

To select specific features from a feature table for model training, you create a training dataset using the FeatureStoreClient.create_training_set API and an object called a FeatureLookup. A FeatureLookup specifies each feature to use in the training set, including the name of the feature table, the name(s) of the features, and the key(s) to use when joining the feature table with the DataFrame passed to FeatureStoreClient.create_training_set.

Use the feature_names parameter when you create a FeatureLookup. feature_names takes a single feature name, a list of feature names, or None to look up all features (excluding primary keys) in the feature table at the time that the training set is created.

Note

The type and order of lookup_key columns in the DataFrame must match the type and order of the primary keys of the reference feature table.

This article includes code examples for both versions of the syntax.

In this example, the DataFrame returned by trainingSet.load_df contains a column for each feature in feature_lookups. It preserves all columns of the DataFrame provided to FeatureStoreClient.create_training_set except those excluded using exclude_columns.

from databricks.feature_store import FeatureLookup

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d', 'total_purchases_7d'],
      lookup_key = 'customer_id'
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_names = ['category'],
      lookup_key = 'product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups = feature_lookups,
  label = 'rating',
 exclude_columns = ['customer_id', 'product_id']
)

training_df = training_set.load_df()

Create a TrainingSet when lookup keys do not match the primary keys

Use the argument lookup_key in the FeatureLookup for the column name in the training set. FeatureStoreClient.create_training_set performs an ordered join between the columns from the training set specified in the lookup_key argument using the order in which the primary keys were specified when the feature table was created.

In this example, recommender_system.customer_features has the following primary keys: customer_id, dt.

The recommender_system.product_features feature table has primary key product_id.

If the training_df has the following columns:

  • cid

  • transaction_dt

  • product_id

  • rating

the following code will create the correct feature lookups for the TrainingSet:

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d', 'total_purchases_7d'],
      lookup_key = ['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_names = ['category'],
      lookup_key = 'product_id'
    )
  ]

When FeatureStoreClient.create_training_set is called, it creates a training dataset by performing a left join, joining the tables recommender_system.customer_features and training_df using the keys (customer_id,dt) corresponding to (cid,transaction_dt), as shown in the following code:

customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

Create a TrainingSet containing two features with the same name from different feature tables

Use the optional argument output_name in the FeatureLookup. The name provided is used in place of the feature name in the DataFrame returned by TrainingSet.load_df. For example, with the following code, the DataFrame returned by training_set.load_df includes columns customer_height and product_height.

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['height'],
      lookup_key = 'customer_id',
      output_name = 'customer_height',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_names = ['height'],
      lookup_key = 'product_id',
      output_name = 'product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id']
  )
  training_df = training_set.load_df()

Create a TrainingSet using the same feature multiple times

To create a TrainingSet using the same feature joined by different lookup keys, use multiple FeatureLookups. Use a unique output_name for each FeatureLookup output.

feature_lookups = [
    FeatureLookup(
      table_name = 'taxi_data.zip_features',
      feature_names = ['temperature'],
      lookup_key = ['pickup_zip'],
      output_name = 'pickup_temp'
    ),
    FeatureLookup(
        table_name = 'taxi_data.zip_features',
        feature_names = ['temperature'],
        lookup_key = ['dropoff_zip'],
        output_name = 'dropoff_temp'
    )
  ]

Create a TrainingSet for unsupervised machine learning models

Set label=None when creating a TrainingSet for unsupervised learning models. For example, the following TrainingSet can be used to cluster different customers into groups based on their interests:

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['interests'],
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = None,
    exclude_columns = ['customer_id']
  )

  training_df = training_set.load_df()

Train models and perform batch inference with feature tables

When you train a model using features from Feature Store, the model retains references to the features. When you use the model for inference, you can choose to have it retrieve feature values from Feature Store. You must provide the primary key(s) of the features used in the model. The model retrieves the features it requires from Feature Store in your workspace. It then joins the feature values as needed during scoring.

To support feature lookup at inference time:

  • You must log the model using FeatureStoreClient.log_model.

  • You must use the DataFrame returned by TrainingSet.load_df to train the model. If you modify this DataFrame in any way before using it to train the model, the modifications are not applied when you use the model for inference. This decreases the performance of the model.

  • The model type must have a corresponding python_flavor in MLflow. MLflow supports most Python model training frameworks, including:

    • scikit-learn

    • keras

    • PyTorch

    • SparkML

    • LightGBM

    • XGBoost

    • TensorFlow Keras (using the python_flavor mlflow.keras)

  • Custom MLflow pyfunc models

    # Train model
    import mlflow
    from sklearn import linear_model
    
    feature_lookups = [
        FeatureLookup(
          table_name = 'recommender_system.customer_features',
          feature_names = ['total_purchases_30d'],
          lookup_key = 'customer_id',
        ),
        FeatureLookup(
          table_name = 'recommender_system.product_features',
          feature_names = ['category'],
          lookup_key = 'product_id'
        )
      ]
    
    
    fs = FeatureStoreClient()
    
    with mlflow.start_run():
    
      # df has columns ['customer_id', 'product_id', 'rating']
      training_set = fs.create_training_set(
        df,
        feature_lookups = feature_lookups,
        label = 'rating',
        exclude_columns = ['customer_id', 'product_id']
      )
    
      training_df = training_set.load_df().toPandas()
    
      # "training_df" columns ['total_purchases_30d', 'category', 'rating']
      X_train = training_df.drop(['rating'], axis=1)
      y_train = training_df.rating
    
      model = linear_model.LinearRegression().fit(X_train, y_train)
    
      fs.log_model(
        model,
        "recommendation_model",
        flavor=mlflow.sklearn,
        training_set=training_set,
        registered_model_name="recommendation_model"
      )
    
    # Batch inference
    
    # If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
    # call automatically retrieves the required features from Feature Store before scoring the model.
    # The DataFrame returned by score_batch() augments batch_df with
    # columns containing the feature values and a column containing model predictions.
    
    fs = FeatureStoreClient()
    
    # batch_df has columns ‘customer_id’ and ‘product_id’
    predictions = fs.score_batch(
        model_uri,
        batch_df
    )
    
    # The ‘predictions’ DataFrame has these columns:
    # ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
    

Use custom feature values when scoring a model packaged with feature metadata

By default, a model packaged with feature metadata looks up features from Feature Store at inference. To use custom feature values for scoring, include them in the DataFrame passed to FeatureStoreClient.score_batch().

For example, suppose you package a model with these two features:

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['account_creation_date', 'num_lifetime_purchases'],
      lookup_key = 'customer_id',
    ),
  ]

At inference, you can provide custom values for the feature account_creation_date by calling FeatureStoreClient.score_batch on a DataFrame that includes a column named account_creation_date. In this case the API looks up only the num_lifetime_purchases feature from Feature Store and uses the provided custom account_creation_date column values for model scoring.

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  'models:/ban_prediction_model/1',
  batch_df
)

Train and score a model using a combination of Feature Store features and data residing outside Feature Store

You can train a model using a combination of Feature Store features and data from outside Feature Store. When you package the model with feature metadata, the model retrieves feature values from Feature Store for inference.

To train a model, include the extra data as columns in the DataFrame passed to FeatureStoreClient.create_training_set. This example uses the feature total_purchases_30d from Feature Store and the external column browser.

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d'],
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id']  # 'browser' is not excluded
)

At inference, the DataFrame used in FeatureStoreClient.score_batch must include the browser column.

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri,
  batch_df
)