Work with feature tables

Note

Database and feature table names cannot contain hyphens (-).

Create a database for feature tables

Before creating any feature tables, you must create a database to store them.

%sql CREATE DATABASE IF NOT EXISTS <database_name>

Feature tables are stored as Delta tables. When you create a feature table with create_feature_table(), you must specify the database name. For example, this argument creates a Delta table named customer_features in the database recommender_system.

name='recommender_system.customer_features'

Create a feature table in Databricks Feature Store

The basic steps to creating a feature table are:

  1. Write the Python functions to compute the features. The output of each function should be an Apache Spark DataFrame with a unique primary key. The primary key can consist of one or more columns.
  2. Create a feature table by instantiating a FeatureStoreClient and using the create_feature_table() function.
  3. Populate the feature table using write_table().
from databricks.feature_store import feature_table

def compute_customer_features(data):
  ''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
  pass

# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient

customer_features_df = compute_customer_features(df)

fs = FeatureStoreClient()

customer_feature_table = fs.create_feature_table(
  name='recommender_system.customer_features',
  keys='customer_id',
  schema=customer_features_df.schema,
  description='Customer features'
)

# An alternative is to use `create_feature_table()` and specify the `features_df` argument.
# This code automatically saves the features to the underlying Delta table.

# customer_feature_table = fs.create_feature_table(
#  ...
#  features_df=customer_features_df,
#  ...
# )

# To use a composite key, pass all keys in the create_feature_table() call

# customer_feature_table = fs.create_feature_table(
#   ...
#   keys=['customer_id', 'date'],
#   ...
# )

# Use write_table() to write data to the feature table
# Overwrite mode does a full refresh of the feature table

fs.write_table(
  name='recommender_system.customer_features',
  df = customer_features_df,
  mode = 'overwrite'
)

Update a feature table

You can update a feature table by adding new features or by modifying specific rows based on the primary key.

The following feature table metadata cannot be updated:

  • Primary key
  • Partition key
  • Name or type of an existing feature

Add new features to an existing feature table

You can add new features to an existing feature table in one of two ways:

  • Update the existing feature computation function and run write_table() with the returned DataFrame. This updates the feature table schema and merges new feature values based on the primary key.
  • Create a new feature computation function to calculate the new feature values. The DataFrame returned by this new computation function must contain the feature tables’s primary keys and partition keys (if defined). Run write_table() with the DataFrame to write the new features to the existing feature table, using the same primary key.

Update only specific rows in a feature table

Use mode = "merge" in write_table(). Rows whose primary key does not exist in the DataFrame sent in the write_table() call remain unchanged.

fs.write_table(
  name='recommender.customer_features',
  df = customer_features_df,
  mode = 'merge'
)

Schedule a job to update a feature table

To ensure that features in feature tables always have the most recent values, Databricks recommends that you create a job that runs a notebook to update your feature table on a regular basis, such as every day. If you already have a non-scheduled job created, you can convert it to a scheduled job to make sure the feature values are always up-to-date.

Code to update a feature table uses mode='merge', as shown in the following example.

fs = FeatureStoreClient()

customer_features_df = compute_customer_features(data)

fs.write_table(
  df=customer_features_df,
  name='recommender_system.customer_features',
  mode='merge'
)

Store past values of daily features

Define a feature table with a composite primary key. Include the date in the primary key. For example, for a feature table store_purchases, you might use a composite primary key (date, user_id) and partition key date for efficient reads.

You can then create code to read from the feature table filtering date to the time period of interest. To keep the feature table up to date, set up a regularly scheduled job to write features, or stream new feature values into the feature table.

Create a streaming feature computation pipeline to update features

To create a streaming feature computation pipeline, pass a streaming DataFrame as an argument to write_table. This method returns a StreamingQuery object.

def compute_additional_customer_features(data):
  ''' Returns Streaming DataFrame
  '''
  pass  # not shown

customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")
stream_df = compute_additional_customer_features(customer_transactions)

fs.write_table(
  df=stream_df,
  name='recommender_system.customer_features',
  mode='merge'
)

Read from a feature table

Use read_table() to read feature values, and get_feature_table() to read feature table metadata.

Read data from a specific timestamp

Databricks feature tables are Delta table, so you can retrieve feature values as of any timestamp.

import datetime
yesterday = datetime.date.today() - datetime.timedelta(days=1)

# read customer_features values from 1 day ago
customer_features_df = fs.read_table(
  name='recommender_system.customer_features',
  as_of_delta_timestamp=str(yesterday)
)

Create a training dataset

To select specific features from a feature table for model training, you create a training dataset.

To create a training dataset:

  1. Create a FeatureLookup to specify each feature you want to use in the training set. The argument lookup_key represents the names of the columns in the provided training_df data to join with each feature table’s primary keys.
  2. Call create_training_set() to define the training dataset.

In this example, the DataFrame returned by trainingSet.load_df() contains a column for each feature in feature_lookups. It preserves all columns of the DataFrame provided to FeatureStoreClient.create_training_set() except those excluded using exclude_columns.

from databricks.feature_store import FeatureLookup

# Model training flow uses these features from Feature Store
# 'total_purchases_30d' feature table 'recommender_system.customer_features' and
# 'category' from 'recommender_system.product_features'
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id'
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# Training DataFrame must have lookup keys 'customer_id' and 'product_id' and label 'rating'
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id', 'product_id']
)

training_df = training_set.load_df()

Create a TrainingSet when lookup keys do not match the primary keys

Use the argument lookup_key in the FeatureLookup for the column name in the training set. FeatureStoreClient.create_training_set() performs an ordered join between the columns from the training set specified in the lookup_key argument using the order in which the primary keys were specified when the feature table was created.

In this example, recommender_system.customer_features has primary keys (customer_id, dt) specified in that order in FeatureStoreClient.create_feature_table(), and recommender_system.product_features feature table has primary key product_id. If the training_df has columns cid, transaction_dt, product_id, and rating, the following code will create the correct feature lookups for the`TrainingSet`:

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = ['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]

When FeatureStoreClient.create_training_set() is called, it creates a training dataset by performing a left join, joining the tables recommender_system.customer_features and training_df using the keys (customer_id,dt) corresponding to (transaction_dt,cid), as shown in the following code:

customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

Create a TrainingSet containing two features with the same name from different feature tables

Use the optional argument output_name in the FeatureLookup. The name provided is used in place of the feature name in the DataFrame returned by TrainingSet.load_df(). For example, with the following code, the DataFrame returned by training_set.load_df() includes columns customer_height and product_height.

Note

The type of lookup_key columns in that DataFrame must match the type of the primary keys of the reference feature table.

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'height',
      lookup_key = 'customer_id',
      output_name = 'customer_height',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'height',
      lookup_key = 'product_id',
      output_name = 'product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id']
  )
  training_df = training_set.load_df()

Create a TrainingSet for unsupervised machine learning models

Set label=None when creating a TrainingSet for unsupervised learning models. For example, the following TrainingSet can be used to cluster different customers into groups based on their interests:

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'interests',
      lookup_key = 'customer_id',
    ),
  ]
fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = None,
    exclude_columns = ['customer_id']
  )
  training_df = training_set.load_df()

Train models and perform batch inference with feature tables

When you train a model using features from Feature Store, the model retains references to the features. When you use the model for inference, you can choose to have it retrieve feature values from Feature Store. You must provide the primary key(s) of the features used in the model. The model retrieves the features it requires from Feature Store in your workspace. It then joins the feature values as needed during scoring.

To support feature lookup at inference time:

  • You must log the model using FeatureStoreClient.log_model().
  • You must use the DataFrame returned by TrainingSet.load_df() to train the model. If you modify this DataFrame in any way before using it to train the model, the modifications are not applied when you use the model for inference. This decreases the performance of the model.
  • The model type must have a corresponding python_flavor in MLflow. MLflow supports most Python model training frameworks, including:
    • scikit-learn
    • keras
    • PyTorch
    • SparkML
    • LightGBM
    • XGBoost
    • TensorFlow Keras (using the python_flavor mlflow.keras)
    • Custom MLflow pyfunc models
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]


fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model,
    "recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
    model_uri,
    batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

Use custom feature values when scoring a model packaged with feature metadata

By default, a model packaged with feature metadata looks up features from Feature Store at inference. To use custom feature values for scoring, include them in the DataFrame passed to FeatureStoreClient.score_batch().

For example, suppose you package a model with these two features:

# This syntax is deprecated with Databricks Runtime for Machine Learning 9.1 ML and above.
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'account_creation_date',
      lookup_key = 'customer_id',
    ),
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'num_lifetime_purchases',
      lookup_key = 'customer_id'
    ),
  ]

# This syntax is available only with Databricks Runtime for Machine Learning 9.1 ML and above.
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['account_creation_date', 'num_lifetime_purchases'],
      lookup_key = 'customer_id',
    ),
  ]

At inference, you can provide custom values for the feature account_creation_date by calling FeatureStoreClient.score_batch() on a DataFrame that includes a column named account_creation_date. In this case the API looks up only the num_lifetime_purchases feature from Feature Store and uses the provided custom account_creation_date column values for model scoring.

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  'models:/ban_prediction_model/1',
  batch_df
)

Train and score a model using a combination of Feature Store features and data residing outside Feature Store

You can train a model using a combination of Feature Store features and data from outside Feature Store. When you package the model with feature metadata, the model retrieves feature values from Feature Store for inference.

To train a model, include the extra data as columns in the DataFrame passed to FeatureStoreClient.create_training_set(). This example uses the feature total_purchases_30d from Feature Store and the external column browser.

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id']  # 'browser' is not excluded
)

At inference, the DataFrame used in FeatureStoreClient.score_batch() must include the browser column.

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri,
  batch_df
)

Supported data types

Feature Store supports the following PySpark data types:

  • IntegerType
  • FloatType
  • BooleanType
  • StringType
  • DoubleType
  • LongType
  • TimestampType
  • DateType

In Databricks Runtime 9.1 LTS ML and above, the following PySpark data types are also supported:

  • ShortType
  • ArrayType

In online stores, complex data types such as ArrayType are stored in JSON format.