Databricks Feature Store

Preview

This feature is in Public Preview.

Databricks Feature Store is a centralized repository of features. It enables feature sharing and discovery across your organization and also ensures that the same feature computation code is used for model training and inference.

Requirements

Databricks Runtime 8.3 ML or above.

Introduction

Raw data needs to be processed and transformed before it can be used in machine learning. This process is called “feature engineering” and includes transformations such as aggregating data (for example, the number of purchases by a user in a given time window) and more complex calculations that may themselves be the result of machine learning algorithms such as word embeddings.

Converting raw data into features for model training is time-consuming. Creating and maintaining feature definition pipelines requires significant effort. Teams often want to explore and leverage features created by other data scientists in the organization.

Another challenge is maintaining consistency between training and serving. A feature pipeline might be created by a data scientist as a prototype and then reimplemented by data engineers for production use. If the model is deployed for low-latency online serving, machine learning engineers might rebuild the same feature computation to optimize for serving. This slows the process of moving models to production and can introduce errors or inconsistencies, sometimes called “skew”, between the code used for training the model and the code used for inference.

These problems can be addressed using a feature store—a centralized repository of features that enables feature sharing and discovery across an organization and also ensures that the same feature computation code is used for model training and inference.

You can use Databricks Feature Store to create new features, explore and re-use existing features, and select features for training and scoring machine learning models.

Databricks Feature Store is fully integrated with other Databricks components. Feature tables are stored as Delta tables. Deployed MLflow models can automatically retrieve features from Feature Store. The Databricks Feature Store UI, accessible from the Databricks workspace, lets you browse and search for existing features and displays information about feature lineage—including data sources used to compute features and the models, notebooks, and jobs that use a specific feature.

Concepts

This section describes concepts to help you use Databricks Feature Store and feature tables.

Feature table

Features are organized as feature tables. Each table is backed by a Delta table and additional metadata.

A feature table must have a primary key. Features in a feature table are typically computed and updated using a common computation function.

Feature table metadata tracks the data sources from which a table was generated and the notebooks and jobs that created or updated the table.

Offline store

The offline feature store is used for feature discovery, model training, and batch inference. It contains feature tables materialized as Delta tables.

Streaming

In addition to batch writes, Databricks Feature Store supports streaming. You can write feature values to a feature table from a streaming source, and feature computation code can utilize Structured Streaming to transform raw data streams into features.

Training set

A training set consists of a list of features and a DataFrame containing raw training data, labels, and primary keys by which to look up features. You create the training set by specifying features to extract from Feature Store, and provide the training set as input during model training.

See Create a training dataset for an example of how to create and use a training set.

Model packaging

A machine learning model trained using features from Databricks Feature Store retains references to these features. At inference time, the model can optionally retrieve feature values from Feature Store. The caller only needs to provide the primary key of the features used in the model (for example, user_id), and the model retrieves all required feature values from Feature Store.

In batch inference, feature values are retrieved from the offline store and joined with new data prior to scoring. Real-time inference is not supported.

To package a model with feature metadata, use FeatureStoreClient.log_model().

Workflow overview

The typical machine learning workflow using Feature Store follows this path:

  1. Write code to convert raw data into features and create a Spark DataFrame containing the desired features.
  2. Write the DataFrame as a feature table in Feature Store.
  3. Create a training set based on features from feature tables.
  4. Train a model.
  5. Log the model as an MLflow model.
  6. Perform batch inference on new data. The model automatically retrieves the features it needs from Feature Store.

Example notebook

The Feature Store taxi example notebook illustrates the process of creating features, updating them, and using them for model training and batch inference.

Before you run the taxi example notebook, you must run the Feature Store taxi example dataset notebook, which performs some data preprocessing.

Feature Store taxi example dataset notebook

Open notebook in new tab

Feature Store taxi example notebook

Open notebook in new tab

Feature Store Python API reference

To work with feature tables, you use the Feature Store Python API.

Download the Feature Store Python API reference.

Work with feature tables

Note

Database and feature table names cannot contain hyphens (-).

Create a database for feature tables

Before creating any feature tables, you must create a database to store them.

%sql CREATE DATABASE IF NOT EXISTS <database_name>

Feature tables are stored as Delta tables. When you create a feature table with create_feature_table(), you must specify the database name. For example, this argument creates a Delta table named customer_features in the database recommender_system.

name='recommender_system.customer_features'

Create a feature table in Databricks Feature Store

The basic steps to creating a feature table are:

  1. Write the Python functions to compute the features. The output of each function should be an Apache Spark DataFrame with a unique primary key. The primary key can consist of one or more columns.
  2. Create a feature table by instantiating a FeatureStoreClient and using the create_feature_table() function.
  3. Populate the feature table using write_table().
from databricks.feature_store import feature_table

def compute_customer_features(data):
  ''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
  pass

# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient

customer_features_df = compute_customer_features(df)

fs = FeatureStoreClient()

customer_feature_table = fs.create_feature_table(
  name='recommender_system.customer_features',
  keys='customer_id',
  schema=customer_features_df.schema,
  description='Customer features'
)

# An alternative is to use `create_feature_table()` and specify the `features_df` argument.
# This code automatically saves the features to the underlying Delta table.

# customer_feature_table = fs.create_feature_table(
#  ...
#  features_df=customer_features_df,
#  ...
# )

# To use a composite key, pass all keys in the create_feature_table() call

# customer_feature_table = fs.create_feature_table(
#   ...
#   keys=['customer_id', 'date'],
#   ...
# )

# Use write_table() to write data to the feature table
# Overwrite mode does a full refresh of the feature table

fs.write_table(
  name='recommender_system.customer_features',
  df = customer_features_df,
  mode = 'overwrite'
)

Update a feature table

You can update a feature table by adding new features or by modifying specific rows based on the primary key.

Add new features to an existing feature table

You can add new features to an existing feature table in one of two ways:

  • Update the existing feature computation function and run write_table() with the returned DataFrame. This updates the feature table schema and merges new feature values based on the primary key.
  • Create a new feature computation function to calculate the new feature values. The DataFrame returned by this new computation function must contain the feature tables’s primary keys and partition keys (if defined). Run write_table() with the DataFrame to write the new features to the existing feature table, using the same primary key.

Update only specific rows in a feature table

Use mode = "merge" in write_table(). Rows whose primary key does not exist in the DataFrame sent in the write_table() call remain unchanged.

fs.write_table(
  name='recommender.customer_features',
  df = customer_features_df,
  mode = 'merge'
)

Schedule a job to update a feature table

You can create a job to run a notebook with code similar to the following, and schedule the job to run on a regular basis, such as every day.

fs = FeatureStoreClient()

customer_features_df = compute_customer_features(data)

fs.write_table(
  df=customer_features_df,
  name='recommender_system.customer_features',
  mode='merge'
)

Store past values of daily features

Define a feature table with a composite primary key. Include the date in the primary key. For example, for a feature table store_purchases, you might use a composite primary key (date, user_id) and partition key date for efficient reads.

You can then create code to read from the feature table filtering date to the time period of interest. To keep the feature table up to date, set up a regularly scheduled job to write features, or stream new feature values into the feature table.

Create a streaming feature computation pipeline to update features

To create a streaming feature computation pipeline, use a streaming DataFrame as argument to write_table.

def compute_additional_customer_features(data):
  ''' Returns Streaming DataFrame
  '''
  pass  # not shown

customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")
stream_df = compute_additional_customer_features(customer_transactions)

fs.write_table(
  df=stream_df,
  name='recommender_system.customer_features',
  mode='merge'
)

Read from a feature table

Use read_table() to read feature values, and get_feature_table() to read feature table metadata.

Read data from a specific timestamp

Databricks feature tables are Delta table, so you can retrieve feature values as of any timestamp.

import datetime
yesterday = datetime.date.today() - datetime.timedelta(days=1)

# read customer_features values from 1 day ago
customer_features_df = fs.read_table(
  name='recommender_system.customer_features',
  as_of_delta_timestamp=str(yesterday)
)

Create a training dataset

To select specific features from a feature table for model training, you create a training dataset.

To create a training dataset:

  1. Create a FeatureLookup to specify each feature you want to use in the training set. The argument lookup_key represents the names of the columns in the provided training_df data to join with each feature table’s primary keys.
  2. Call create_training_set() to define the training dataset.

In this example, the DataFrame returned by trainingSet.load_df() contains a column for each feature in feature_lookups. It preserves all columns of the DataFrame provided to FeatureStoreClient.create_training_set() except those excluded using exclude_columns.

from databricks.feature_store import FeatureLookup

# Model training flow uses these features from Feature Store
# 'total_purchases_30d' feature table 'recommender_system.customer_features' and
# 'category' from 'recommender_system.product_features'
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id'
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# Training DataFrame must have lookup keys 'customer_id' and 'product_id' and label 'rating'
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id', 'product_id']
)

training_df = training_set.load_df()

Create a TrainingSet when lookup keys do not match the primary keys

Use the argument lookup_key in the FeatureLookup for the column name in the training set. FeatureStoreClient.create_training_set() performs an ordered join between the columns from the training set specified in the lookup_key argument using the order in which the primary keys were specified when the feature table was created.

In this example, recommender_system.customer_features has primary keys (customer_id, dt) specified in that order in FeatureStoreClient.create_feature_table(), and recommender_system.product_features feature table has primary key product_id. If the training_df has columns cid, transaction_dt, product_id, and rating, the following code will create the correct feature lookups for the`TrainingSet`:

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = ['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]

When FeatureStoreClient.create_training_set() is called, it creates a training dataset by performing a left join, joining the tables recommender_system.customer_features and training_df using the keys (customer_id,dt) corresponding to (transaction_dt,cid), as shown in the following code:

customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

Create a TrainingSet containing two features with the same name from different feature tables

Use the optional argument output_name in the FeatureLookup. The name provided is used in place of the feature name in the DataFrame returned by TrainingSet.load_df(). For example, with the following code, the DataFrame returned by training_set.load_df() includes columns customer_height and product_height.

Note

The type of lookup_key columns in that DataFrame must match the type of the primary keys of the reference feature table.

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'height',
      lookup_key = 'customer_id',
      output_name = 'customer_height',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'height',
      lookup_key = 'product_id',
      output_name = 'product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id']
  )
  training_df = training_set.load_df()

Train models and perform batch inference with feature tables

When you train a model using features from Feature Store, the model retains references to the features. When you use the model for inference, you can choose to have it retrieve feature values from Feature Store. You must provide the primary key(s) of the features used in the model. The model retrieves the features it requires from Feature Store in your workspace. It then joins the feature values as needed during scoring.

To support feature lookup at inference time:

  • You must log the model using FeatureStoreClient.log_model().
  • You must use the DataFrame returned by TrainingSet.load_df() to train the model. If you modify this DataFrame in any way before using it to train the model, the modifications are not applied when you use the model for inference. This decreases the performance of the model.
  • The model type must have a corresponding python_flavor in MLflow. MLflow supports most Python model training frameworks, including:
    • scikit-learn
    • keras
    • PyTorch
    • SparkML
    • LightGBM
    • XGBoost
    • TensorFlow Keras (using the python_flavor mlflow.keras)
    • Custom MLflow pyfunc models
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]


fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model,
    "recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
    model_uri,
    batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

Use custom feature values when scoring a model packaged with feature metadata

By default, a model packaged with feature metadata looks up features from Feature Store at inference. To use custom feature values for scoring, include them in the DataFrame passed to FeatureStoreClient.score_batch().

For example, suppose you package a model with these two features:

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'account_creation_date',
      lookup_key = 'customer_id',
    ),
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'num_lifetime_purchases',
      lookup_key = 'customer_id'
    ),
  ]

At inference, you can provide custom values for the feature account_creation_date by calling FeatureStoreClient.score_batch() on a DataFrame that includes a column named account_creation_date. In this case the API looks up only the num_lifetime_purchases feature from Feature Store and uses the provided custom account_creation_date column values for model scoring.

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  'models:/ban_prediction_model/1',
  batch_df
)

Train and score a model using a combination of Feature Store features and data residing outside Feature Store

You can train a model using a combination of Feature Store features and data from outside Feature Store. When you package the model with feature metadata, the model retrieves feature values from Feature Store for inference.

To train a model, include the extra data as columns in the DataFrame passed to FeatureStoreClient.create_training_set(). This example uses the feature total_purchases_30d from Feature Store and the external column browser.

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id']  # 'browser' is not excluded
)

At inference, the DataFrame used in FeatureStoreClient.score_batch() must include the browser column.

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri,
  batch_df
)

Use the Feature Store UI

You can use the Feature Store UI to:

  • Search for feature tables by feature table name, feature, or data source
  • Identify the data sources used to create a feature table
  • Identify models that use a particular feature
  • Control access to feature tables

To use the Feature Store UI, click Feature Store Icon Feature Store in the sidebar. This icon appears only when you are in the machine learning persona. The Feature Store UI displays.

The Feature Store UI lists all of the available feature tables, along with metadata including the creator, last modified date, data sources used to compute the feature table, and any online stores the feature table has been pushed to.

Feature store page

Search and browse for feature tables

Use the search box to search for feature tables. You can enter all or part of the name of a feature table, a feature, or a data source used for feature computation.

Track feature lineage

In the UI you can track both how a feature was created (the raw data sources and the notebooks that were used to compute the features) and where it is used (the models trained with it and the notebooks and jobs that read the feature).

Click the name of any feature table to display the feature table page. This page shows more details, including a link to the notebooks that wrote to the feature table (Producer Links) and a list of features in the table along with any models, endpoints, jobs, and notebooks that use the feature.

Feature page

To return to the main Feature Store UI page, click Feature Store near the top of the page.

Control access to feature tables

You can configure Feature Store access control to grant fine-grained permissions on feature table metadata. You can control a user’s ability to view a feature table in the UI, edit its description, or manage other users’ permissions on the table.

Note

Feature Store access control does not govern access to the underlying Delta table, which is governed by table access control.

You can assign four permission levels to feature table metadata: No Permissions, View Metadata, Edit Metadata, and Manage. The table lists the abilities for each permission.

Ability No Permissions View Metadata Edit Metadata Manage
Create feature table X X X X
Read feature table   X X X
Search feature table   X X X
Write features to feature table     X X
Update description of feature table     X X
Modify permissions on feature table       X

By default, when a feature table is created:

  • The creator has Manage permission
  • Workspace admins have Manage permission
  • Other users have No Permissions

Manage Feature Store permissions

  1. On the feature table page, click the arrow to the right of the name of the feature table and select Permissions. If you do not have Can Manage permission for the feature table, you will not see this option.

    Select permissions from drop-down menu
  2. Edit the permissions and click Save.

Known limitations

  1. Feature Store APIs support batch scoring of models packaged with Feature Store. Online inference is not supported.
  2. Feature Store does not support deleting feature tables or features.
  3. The Feature Store library is accessible only in Databricks Runtime notebooks and notebook jobs.
  4. APIs can log and access feature tables only in the current workspace.
  5. Feature Store is accessible using the UI (read) and Python APIs (read/write).

Troubleshooting

Error message: Database recommender_system does not exist in the Hive metastore.

A feature table is stored in a Hive database. The database is specified by the table name prefix, so a feature table recommender_system.customer_features will be stored in the recommender_system database.

To create the database database, run:

%sql CREATE DATABASE IF NOT EXISTS recommender_system;

Error message: ModuleNotFoundError: No module named 'databricks.feature_store'

This error occurs when the Feature Store Python client is not available on the Databricks Runtime you are using. For a list of supported Databricks Runtime versions, see Requirements.