Unity Catalogでの特徴量エンジニアリング

このページでは、 Unity Catalogで特徴量テーブルを作成および操作する方法について説明します。

このページは、 Unity Catalogが有効になっているワークスペースにのみ適用されます。 ワークスペースで Unity Catalogが有効になっていない場合は、 ワークスペース Feature Storeの機能の操作を参照してください。

要件

Unity Catalog での特徴量エンジニアリングにはDatabricks Runtime 13.2 以上が必要です。

Unity Catalogでの特徴量エンジニアリングをインストールする

Unity Catalogでの特徴量エンジニアリングでは、Python クライアント FeatureEngineeringClientが提供されます。 このクラスは、PyPI で databricks-feature-engineering パッケージを使用して使用でき、Databricks Runtime 13.2 機械学習以降にプレインストールされています。 ML 以外の Databricks Runtime を使用している場合は、クライアントを手動でインストールする必要があります。 互換性マトリックスを使用して、Databricks Runtime のバージョンに適したバージョンを見つけます。

%pip install databricks-feature-engineering

dbutils.library.restartPython()

Unity Catalogで特徴量テーブルのカタログとスキーマを作成する

新しいカタログを作成するか、既存の カタログ を 特徴量テーブルに使用する必要があります。

新しいカタログを作成するには、メタストアに対するCREATE CATALOG特権が必要です。

CREATE CATALOG IF NOT EXISTS <catalog-name>

既存のカタログを使用するには、カタログに対する USE CATALOG 特権が必要です。

USE CATALOG <catalog-name>

Unity Catalog の 特徴量テーブルは 、スキーマに格納する必要があります。 カタログに新しいスキーマを作成するには、カタログに対する CREATE SCHEMA 特権が必要です。

CREATE SCHEMA IF NOT EXISTS <schema-name>

Unity Catalog で特徴量テーブルを作成する

Unity Catalog の特徴量テーブルは、Unity Catalog によって管理されるDeltaテーブルまたはDelta Live Tablesです。 特徴量テーブルには主キーが必要です。 Unity Catalogの特徴量テーブルの名前は、<catalog-name>.<schema-name>.<table-name> という 3 レベルの構造になっています。

Databricks SQL または PythonFeatureEngineeringClient DeltaUnity Catalogを使用して、 に機能 テーブルを作成できます。Delta Live Tablesパイプラインを使用して、 に機能Delta Live Tables Unity Catalogを作成できます。

主キー制約 を持つ任意の Delta テーブルを特徴量テーブルとして使用できます。次のコードは、主キーを持つテーブルを作成する方法を示しています。

CREATE TABLE ml.recommender_system.customer_features (
  customer_id int NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
);

時系列特徴テーブルを作成するには、時間列を主キー列として追加し、TIMESERIES キーワードを指定します。キーワードには 13.3 LTS 以上 Databricks Runtime が必要です。

CREATE TABLE ml.recommender_system.customer_features (
  customer_id int NOT NULL,
  ts timestamp NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
);

テーブルを作成したら、他の Delta テーブルを書き込むのと同じ方法でデータを書き込むことができ、特徴量テーブルとして使用できます。

次の例で使用されるコマンドとパラメーターの詳細については、 「フィーチャー エンジニアリング Python API リファレンス」を参照してください。

  1. Python 関数を記述して、フィーチャをコンピュートします。 各関数の出力は、一意の主キーを持つ Apache Spark DataFrame である必要があります。 主キーは、1 つ以上の列で構成できます。

  2. 特徴量テーブルを作成するには、 FeatureEngineeringClient をインスタンス化し、 create_tableを使用します。

  3. write_tableを使用して特徴量テーブルを設定します。

from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

# Prepare feature DataFrame
def compute_customer_features(data):
  ''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
  pass

customer_features_df = compute_customer_features(df)

# Create feature table with `customer_id` as the primary key.
# Take schema from DataFrame output by compute_customer_features
customer_feature_table = fe.create_table(
  name='ml.recommender_system.customer_features',
  primary_keys='customer_id',
  schema=customer_features_df.schema,
  description='Customer features'
)

# An alternative is to use `create_table` and specify the `df` argument.
# This code automatically saves the features to the underlying Delta table.

# customer_feature_table = fe.create_table(
#  ...
#  df=customer_features_df,
#  ...
# )

# To use a composite primary key, pass all primary key columns in the create_table call

# customer_feature_table = fe.create_table(
#   ...
#   primary_keys=['customer_id', 'date'],
#   ...
# )

# To create a time series table, set the timeseries_columns argument

# customer_feature_table = fe.create_table(
#   ...
#   primary_keys=['customer_id', 'date'],
#   timeseries_columns='date',
#   ...
# )

Delta Live Tablesのテーブル制約はパブリック プレビュー段階にあります。 次のコード例は、Delta Live Tables プレビュー チャンネルを使用して実行する必要があります。

主キー制約を 持つ任意のDelta ライブ テーブルを特徴量テーブルとして使用できます。次のコードは、主キーを使用してDeltaライブ テーブルを作成する方法を示しています。

CREATE LIVE TABLE customer_features (
  customer_id int NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
) AS SELECT * FROM ...;

時系列特徴量テーブルを作成するには、時間列を主キー列として追加し、 TIMESERIESキーワードを指定します。

CREATE LIVE TABLE customer_features (
  customer_id int NOT NULL,
  ts timestamp NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
) AS SELECT * FROM ...;

テーブルの作成後は、他のDelta Live Tables同じ方法でデータを書き込むことができ、特徴量テーブルとして使用できます。

Delta Live Tables のテーブル制約は SQL でのみサポートされます。 で宣言されたDelta Live Tables の主キーを設定Delta Live Tables Pythonするには、「既存のストリーミング テーブルまたは パイプラインによって作成されたマテリアライズド ビューを特徴量テーブルとして使用する」を参照してください 。

Unity Catalog 内の既存の Delta テーブルを特徴量テーブル として使用する

DeltaUnity Catalog内の主キーを持つ テーブルは、 の特徴量テーブルにすることができ、そのテーブルで機能Unity Catalog UI とAPI を使用できます。

  • テーブルの所有者のみが主キー制約を宣言できます。 所有者の名前は、カタログ エクスプローラのテーブル詳細ページに表示されます。

  • Deltaテーブルのデータ型が Unity Catalog での特徴量エンジニアリングでサポートされていることを確認します。 「サポートされているデータ型」を参照してください。

  • TIMESERIES キーワードには 13.3 LTS 以上Databricks Runtimeが必要です。

既存のDeltaテーブルに主キー制約がない場合は、次のように作成できます。

  1. 主キー列を NOT NULLに設定します。 主キー列ごとに、実行:

    ALTER TABLE <full_table_name> ALTER COLUMN <pk_col_name> SET NOT NULL
    
  2. テーブルを変更して、主キー制約を追加します。

    ALTER TABLE <full_table_name> ADD CONSTRAINT <pk_name> PRIMARY KEY(pk_col1, pk_col2, ...)
    

    pk_name は主キー制約の名前です。 慣例により、テーブル名 (スキーマとカタログなし) に _pk サフィックスを付けることができます。 たとえば、 "ml.recommender_system.customer_features" という名前のテーブルは、主キー制約の名前として customer_features_pk になります。

    テーブルを 時系列特徴テーブルにするには、次のように、主キー列の 1 つに TIMESERIES キーワードを指定します。

    ALTER TABLE <full_table_name> ADD CONSTRAINT <pk_name> PRIMARY KEY(pk_col1 TIMESERIES, pk_col2, ...)
    

    テーブルに主キー制約を追加すると、テーブルが機能 UI に表示され、特徴量テーブルとして使用できるようになります。

Delta Live Tablesパイプラインによって作成された既存のストリーミング テーブルまたはマテリアライズド ビューを特徴量テーブルとして使用します。

主キーを持つUnity Catalogのストリーミング テーブルまたはマテリアライズド ビューは、 Unity Catalogの特徴量テーブルにすることができ、そのテーブルで機能 UI とAPIを使用できます。

  • Delta Live Tablesのテーブル制約はパブリック プレビュー段階にあります。 次のコード例は、Delta Live Tables プレビュー チャンネルを使用して実行する必要があります。

  • テーブルの所有者のみが主キー制約を宣言できます。 所有者の名前は、カタログ エクスプローラのテーブル詳細ページに表示されます。

  • Deltaテーブルのデータ型が Unity Catalog での特徴量エンジニアリングでサポートされていることを確認します。 「サポートされているデータ型」を参照してください。

SQLを使用して作成されたストリーミングテーブルまたはマテリアライズドビューに主キーを追加する

SQL を使用して作成した既存のストリーミング テーブルまたはマテリアライズド ビューの主キーを設定するには、テーブルを管理するノートブックでストリーミング テーブルまたはマテリアライズド ビューのスキーマを更新します。 次に、Unity Catalog で制約を適用するようにテーブルを更新します。

コードは次のようになります。

CREATE OR REFRESH MATERIALIZED VIEW existing_live_table(
  id int NOT NULL PRIMARY KEY,
  ...
) AS SELECT ...

Python を使用して作成されたストリーミング テーブルまたはマテリアライズド ビューに主キーを追加する

Delta Live Tablesパイプラインによって作成された既存のストリーミングSQL テーブルまたはマテリアライズド ビューの主キーを作成するには、ストリーミング テーブルまたはマテリアライズド ビューが を使用して作成された場合でも、Python を使用する必要があります。新しい SQL ノートブックを作成して、既存のテーブルから読み取る新しいストリーミング テーブルまたはマテリアライズド ビューを定義します。 次に、既存の Delta Live Tables パイプラインのステップとして、または新しいパイプラインでノートブックを実行します。

新しい SQL ノートブックには、次のようなコードが含まれている必要があります。

CREATE OR REFRESH MATERIALIZED VIEW new_live_table_with_constraint(
  id int NOT NULL PRIMARY KEY,
  ...
) AS SELECT * FROM existing_live_table

Unity Catalog の特徴量テーブルへのアクセスを制御する

Unity Catalog の特徴量テーブルのアクセス制御は、 Unity Catalogによって管理されます。Unity Catalog 権限を参照してください。

Unity Catalog で特徴量テーブルを更新する

Unity Catalog で特徴量テーブルを更新するには、新しい特徴量テーブルを追加するか、主キーに基づいて特定の行を変更します。

次の特徴量テーブルのメタデータは更新しないでください。

  • 主キー。

  • パーティション キー。

  • 既存のフィーチャの名前またはデータ タイプ。

これらを変更すると、トレーニングとサービング モデルの機能を使用するダウンストリーム パイプラインが壊れます。

Unity Catalog の既存の特徴量テーブルに新しい特徴量テーブルを追加する

既存の特徴量テーブルに新しい特徴量テーブルを追加するには、次の 2 つの方法があります。

  • 既存の特徴量計算関数を更新し、返された DataFrameで write_table を実行します。 これにより、特徴量テーブル スキーマが更新され、主キーに基づいて新しい特徴量テーブル値がマージされます。

  • 新しい特徴量計算関数を作成して、新しい特徴量を計算します。 この新しい計算関数によって返される DataFrame には、特徴量テーブルの主キーとパーティション キー (定義されている場合) が含まれている必要があります。 実行 write_table DataFrame を使用して、同じ主キーを使用して、新しい特徴量テーブルを既存の特徴量テーブルに書き込みます。

特徴量テーブル 内の特定の行のみを更新する

write_tablemode = "merge" を使用します。write_table 呼び出しで送信された DataFrame に主キーが存在しない行は変更されません。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.write_table(
  name='ml.recommender_system.customer_features',
  df = customer_features_df,
  mode = 'merge'
)

特徴量テーブル を更新するジョブをスケジュールする

特徴量テーブルの特徴が常に最新の値を持つようにするために、Databricks では、特徴量テーブルを毎日など定期的に更新するノートブックを実行する ジョブを作成する ことをお勧めします。 スケジュールされていないジョブをすでに作成している場合は、それを スケジュールされたジョブ に変換して、機能値が常に最新であることを確認できます。

特徴量テーブルを更新するコードでは、次の例に示すように、 mode='merge'を使用します。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

customer_features_df = compute_customer_features(data)

fe.write_table(
  df=customer_features_df,
  name='ml.recommender_system.customer_features',
  mode='merge'
)

日次特徴 量の過去の値を保存する

複合主キーを持つ特徴量テーブルを定義します。 主キーに日付を含めます。 たとえば、特徴量テーブル customer_featuresの場合、効率的な読み取りのために複合主キー (datecustomer_id) とパーティション キー date を使用できます。

CREATE TABLE ml.recommender_system.customer_features (
  customer_id int NOT NULL,
  `date` date NOT NULL,
  feat1 long,
  feat2 varchar(100),
  CONSTRAINT customer_features_pk PRIMARY KEY (`date`, customer_id)
)
PARTITIONED BY (`date`)
COMMENT "Customer features";
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.create_table(
  name='ml.recommender_system.customer_features',
  primary_keys=['date', 'customer_id'],
  partition_columns=['date'],
  schema=customer_features_df.schema,
  description='Customer features'
)

その後、特徴量テーブルから date をフィルター処理して対象期間に読み取るコードを作成できます。

また、 時系列特徴量テーブル を作成して、 create_training_set または score_batchを使用するときに特定の時点のルックアップを有効にすることもできます。 「 Unity Catalog で特徴量テーブルを作成する」を参照してください。

特徴テーブルを最新の状態に保つには、定期的にスケジュールされたジョブを設定して特徴を書き込むか、新しい特徴値を特徴テーブルにストリームします。

ストリーミング特徴量計算パイプラインを作成して特徴 を更新する

ストリーミング特徴量計算パイプラインを作成するには、ストリーミング DataFrame を引数として write_tableに渡します。 このメソッドは、 StreamingQuery オブジェクトを返します。

def compute_additional_customer_features(data):
  ''' Returns Streaming DataFrame
  '''
  pass

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")
stream_df = compute_additional_customer_features(customer_transactions)

fe.write_table(
  df=stream_df,
  name='ml.recommender_system.customer_features',
  mode='merge'
)

Unity Catalog の特徴量テーブルから読み取る

read_table を使用してフィーチャ値を読み取ります。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_features_df = fe.read_table(
  name='ml.recommender_system.customer_features',
)

Unity Catalog の特徴量テーブルを検索および参照する

フィーチャー UI を使用して、 Unity Catalogでフィーチャー テーブルを検索または参照します。

  1. サイドバーの [ 機能 ]Feature Store アイコン をクリックして 、機能 UI を表示します。

  2. カタログセレクタでカタログを選択すると、そのカタログで使用可能なすべての特徴量テーブルが表示されます。 検索ボックスに、特徴量テーブル、機能、またはコメントの名前の全部または一部を入力します。 また、タグのキーまたは値の全部または一部を入力することもできます。検索テキストでは、大文字と小文字が区別されません。

    フィーチャ検索の例

Unity Catalog で特徴量テーブルのメタデータを取得する

get_table を使用して、特徴量テーブルのメタデータを取得します。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
ft = fe.get_table(name="ml.recommender_system.user_feature_table")
print(ft.features)

Unity Catalogの特徴量テーブルと機能でタグを使用する

シンプルなキーと値のペアであるタグを使用して、特徴量テーブルと機能を分類して管理できます。

特徴量テーブルの場合、カタログ エクスプローラー UIDatabricks SQL 、またはフィーチャー エンジニアリングPython APIを使用してタグを作成、編集、削除できます。

機能については、カタログ エクスプローラー UIまたはDatabricks SQLを使用してタグを作成、編集、削除できます。

次の例は、Feature エンジニアリングPython API使用して特徴量テーブル タグを作成、更新、および削除する方法を示しています。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

# Create feature table with tags
customer_feature_table = fe.create_table(
  # ...
  tags={"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2", ...},
  # ...
)

# Upsert a tag
fe.set_feature_table_tag(name="customer_feature_table", key="tag_key_1", value="new_key_value")

# Delete a tag
fe.delete_feature_table_tag(name="customer_feature_table", key="tag_key_2")

Unity Catalog で特徴量テーブルを削除する

の特徴量テーブルを削除するには、カタログUnity Catalog DeltaUnity Catalogエクスプローラーまたは フィーチャー エンジニアリングPythonAPI を使用して の テーブルを直接削除します。

  • 特徴量テーブルを削除すると、上流のプロデューサーと下流のコンシューマー (モデル、エンドポイント、スケジュール済みジョブ) で予想外のエラーが発生する恐れがあります。

  • Unity Catalogで特徴量テーブルを削除すると、基になる Delta テーブルも削除されます。

  • drop_table は、 Databricks Runtime 13.1 機械学習以前ではサポートされていません。 SQL コマンドを使用してテーブルを削除します。

Databricks SQL または FeatureEngineeringClient.drop_table を使用して、 Unity Catalogで特徴量テーブルを削除できます。

DROP TABLE ml.recommender_system.customer_features;
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.drop_table(
  name='ml.recommender_system.customer_features'
)