自然言語処理 (NLP) のための Hugging Face トランスフォーマーを使用したモデル推論

この記事では、自然言語処理 (NLP) モデルの推論に Hugging Face トランスフォーマーを使用する方法について説明します。

Hugging Face トランスフォーマーには、事前トレーニング済みのモデルを推論に使用する ための pipelines クラスが用意されています。 🤗 トランスフォーマー パイプラインは、Databricks で簡単に使用できる 幅広い NLP タスク をサポートしています。

要件

  • MLflow 2.3

  • Hugging Face transformers ライブラリがインストールされているクラスターは、バッチ推論に使用できます。 transformers ライブラリは、 Databricks Runtime LTS ML以降にプレインストールされています。 人気のあるNLPモデルの多くはGPUハードウェアで最適に機能するため、CPUでの使用に特別に最適化されたモデルを使用しない限り、最新のGPUハードウェアを使用して最高のパフォーマンスを得ることができます。

Pandas UDF を使用して Spark クラスター でモデル計算を分散する

事前トレーニング済みモデルを使用してエクスペリメントを使用すると、 Pandas UDF を使用してモデルをラップし、ワーカー CPU または GPU で計算を実行できます。 Pandas UDF は、モデルを各ワーカーに配布します。

機械翻訳用の Hugging Face Transformers パイプラインを作成し、Pandas UDF を使用して Spark クラスターのワーカーでパイプラインを実行することもできます。

import pandas as pd
from transformers import pipeline
import torch
from pyspark.sql.functions import pandas_udf

device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)

@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
  translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
  return pd.Series(translations)

この方法で device を設定すると、クラスターで使用可能な場合は GPU が使用されるようになります。

翻訳用の Hugging Face パイプラインは、Python dict オブジェクトのリストを返し、それぞれに 1 つのキー translation_text と翻訳されたテキストを含む値を返します。 この UDF は、結果から翻訳を抽出して、翻訳されたテキストのみを含む Pandas シリーズを返します。 device=0を設定して GPU を使用するようにパイプラインを構築した場合、クラスターに複数の GPU を持つインスタンスがある場合、Spark はワーカー ノードに GPU を自動的に再割り当てします。

UDF を使用してテキスト列を変換するには、 select ステートメントで UDF を呼び出します。

texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["texts"]))
display(df.select(df.texts, translation_udf(df.texts).alias('translation')))

複雑な結果の種類 を返す

Pandas UDF を使用すると、より構造化された出力を返すこともできます。 たとえば、名前付きエンティティ認識では、パイプラインは、エンティティ、そのスパン、型、および関連付けられたスコアを含む dict オブジェクトの一覧を返します。 翻訳の例に似ていますが、 @pandas_udf 注釈の戻り値の型は、名前付きエンティティ認識の場合はより複雑です。

パイプラインの結果を調べる (ドライバーでパイプラインを実行するなど) を通じて、使用する戻り値の型を把握できます。

この例では、次のコードを使用します。

from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)
ner_pipeline(texts)

注釈を生成するには:

[[{'entity_group': 'ORG',
   'score': 0.99933606,
   'word': 'Hugging Face',
   'start': 0,
   'end': 12},
  {'entity_group': 'LOC',
   'score': 0.99967843,
   'word': 'New York City',
   'start': 42,
   'end': 55}],
 [{'entity_group': 'ORG',
   'score': 0.9996372,
   'word': 'Databricks',
   'start': 0,
   'end': 10},
  {'entity_group': 'LOC',
   'score': 0.999588,
   'word': 'San Francisco',
   'start': 23,
   'end': 36}]]

これを戻り値の型として表すには、 struct フィールドの array を使用して、 dict エントリを structのフィールドとしてリストします。

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('array<struct<word string, entity_group string, score float, start integer, end integer>>')
def ner_udf(texts: pd.Series) -> pd.Series:
  return pd.Series(ner_pipeline(texts.to_list(), batch_size=1))

display(df.select(df.texts, ner_udf(df.texts).alias('entities')))

パフォーマンス を調整する

UDF のパフォーマンスのチューニングには、いくつかの重要な側面があります。 1 つ目は、各 GPU を効果的に使用することであり、Transformers パイプラインによって GPU に送信されるバッチのサイズを変更することで調整できます。 2 つ目は、クラスター全体を利用するために DataFrame が適切にパーティション分割されていることを確認することです。

最後に、 Hugging Face モデルをキャッシュして、モデルの読み込み時間やイングレス コストを節約することができます。

バッチサイズ を選択する

上記の UDF は、 batch_size 1 ですぐに使用できるはずですが、ワーカーが使用できるリソースを効率的に使用しない可能性があります。 パフォーマンスを向上させるには、クラスター内のモデルとハードウェアに合わせてバッチ サイズを調整します。 Databricks では、最適なパフォーマンスを見つけるために、クラスター上のパイプラインに対してさまざまなバッチ サイズを試すことをお勧めします。 パイプラインのバッチ処理 とその他の パフォーマンス オプション の詳細については、Hugging Face のドキュメントを参照してください。

GPU使用率を最大限に高めるのに十分な大きさのバッチサイズを見つけてくださいが、 CUDA out of memory エラーが発生しません。 チューニング中に CUDA out of memory エラーが発生した場合は、ノートブックをデタッチして再接続し、モデルで使用されているメモリと GPU 内のデータを解放する必要があります。

ステージレベルのスケジューリング による並列処理の調整

デフォルトにより、Spark は各マシンの GPU ごとに 1 つのタスクをスケジュールします。 並列処理を増やすには、ステージレベルのスケジューリングを使用して、GPU ごとに実行するタスクの数を Spark に指示できます。 たとえば、Spark で GPU ごとに 2 つのタスクを実行する場合は、次の方法でこれを指定できます。

from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder

task_requests = TaskResourceRequests().resource("gpu", 0.5)

builder = ResourceProfileBuilder()
resource_profile = builder.require(task_requests).build

rdd = df.withColumn('predictions', loaded_model(struct(*map(col, df.columns)))).rdd.withResources(resource_profile)

使用可能なすべてのハードウェア を使用するようにデータを再パーティション化する

パフォーマンスに関する 2 つ目の考慮事項は、クラスター内のハードウェアを最大限に活用することです。 一般に、ワーカー上の GPU の数 (GPU クラスターの場合) またはクラスター内のワーカー全体のコア数 (CPU クラスターの場合) の小さな倍数が適切に機能します。 入力 DataFrame には、クラスターの並列処理を利用するのに十分なパーティションが既にある場合があります。 DataFrame に含まれるパーティションの数を確認するには、 df.rdd.getNumPartitions() を使用します。repartitioned_df = df.repartition(desired_partition_count)を使用して DataFrame を再パーティション化できます。

モデルを DBFS またはマウント ポイント にキャッシュする

別のクラスターまたは再起動されたクラスターからモデルを頻繁に読み込む場合は、Hugging Face モデルを DBFSルート ボリューム または マウント ポイントにキャッシュすることもできます。 これにより、イングレス コストを削減し、新しいクラスターまたは再起動されたクラスターにモデルを読み込む時間を短縮できます。 これを行うには、パイプラインを読み込む前に、コードで TRANSFORMERS_CACHE 環境変数を設定します。

例:

import os
os.environ['TRANSFORMERS_CACHE'] = '/dbfs/hugging_face_transformers_cache/'

または、 MLflow の "トランスフォーマー" フレーバーを使用してモデルを MLflow に記録することで、同様の結果を得ることができます。

ノートブック: Hugging Face トランスフォーマーの推論と MLflow ロギング

サンプル コードをすばやく使用するために、このノートブックは、 Hugging Face Transformers パイプライン推論と MLflow ログを使用したテキスト概要作成のエンドツーエンドの例です。

Hugging Face トランスフォーマーパイプライン推論ノートブック

ノートブックを新しいタブで開く