Databricks でレイを使用する

プレビュー

この機能はパブリックプレビュー段階です。

Ray 2.3.0 以降では、Ray クラスターを作成し、Databricks を使用して Apache Spark クラスター上で Ray アプリケーションを実行できます。 チュートリアルや例など、Ray で機械学習を始める方法については、 Ray のドキュメントを参照してください。 Ray と Apache Spark の統合の詳細については、 「Ray on Spark API ドキュメント」を参照してください。

要件

  • Databricks Runtime 12.0 機械学習以上。

  • Databricks Runtime クラスターのアクセス モードは、「割り当て済み」モードまたは「分離共有なし」モードのいずれかである必要があります。

レイ のインストール

次のコマンドを使用して、Rayをインストールします。 [default] 拡張機能は、Ray ダッシュボード コンポーネントで必要です。

%pip install ray[default]>=2.3.0

Databricks クラスターにユーザー固有の Ray クラスターを作成する

レイ クラスターを作成するには、 ray.util.spark.setup_ray_cluster APIです。

Databricks クラスターに接続されている Databricks ノートブックでは、次のコマンドを実行できます。

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

ray.util.spark.setup_ray_cluster API は、Spark 上にレイ クラスターを作成します。内部的には、バックグラウンド Spark ジョブが作成されます。 ジョブ内の各 Spark タスクによって Ray ワーカー ノードが作成され、ドライバー上に Ray ヘッド ノードが作成されます。 引数 num_worker_nodes は、作成する Ray ワーカー ノードの数を表します。 各 Ray ワーカー ノードに割り当てられる CPU または GPU コアの数を指定するには、引数 num_cpus_per_node または num_gpus_per_nodeを設定します。

Ray クラスターが作成された後は、任意の Ray アプリケーション コードをノートブックで直接実行できます。 [新しいタブで Ray クラスター ダッシュボードを開く]をクリックして、クラスターの Ray ダッシュボードを表示します。

ヒント

Databricks のシングル ユーザー クラスターを使用している場合は、 num_worker_nodesray.util.spark.MAX_NUM_WORKER_NODESに設定して、Ray クラスターで使用可能なすべてのリソースを使用できます。

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

引数 collect_log_to_path を設定して、Ray クラスターログを収集する宛先パスを指定できます。 Ray クラスターのシャットダウン後にログ収集を実行します。 Databricks では、Spark クラスターを終了した場合でもログが保持されるように、 /dbfs/ で始まるパスを設定することをお勧めします。 そうしないと、クラスターのシャットダウン時にクラスター上のローカルストレージが削除されるため、ログを回復できません。

「作成された Ray クラスターを Ray アプリケーションで自動的に使用するには、 ray.util.spark.setup_ray_clusterを呼び出して、 RAY_ADDRESS環境変数を Ray クラスターのアドレスに設定します。」 ray.init API のaddress引数を使用して、代替クラスター アドレスを指定できます。

レイ アプリケーション を実行する

Ray クラスターが作成されたら、Databricks ノートブックで任意の Ray アプリケーション コードを実行できます。

重要

Databricks では、アプリケーションに必要なライブラリを %pip install <your-library-dependency> を使用してインストールし、それに応じて Ray クラスターとアプリケーションで使用できるようにすることをお勧めします。 Ray init 関数呼び出しで依存関係を指定すると、Spark ワーカー ノードにアクセスできない場所に依存関係がインストールされるため、バージョンの非互換性とインポート エラーが発生します。

たとえば、次のように Databricks ノートブックで単純な Ray アプリケーションを実行できます。

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

オートスケール モードで Ray クラスターを作成する

Ray 2.8.0 以降では、Databricks で開始された Ray クラスターは、Databricks オートスケールとの統合をサポートしています。 「Databricks クラスターのオートスケール」を参照してください。

Ray 2.8.0 以降では、ワークロードに応じたスケールアップまたはスケールダウンをサポートする Ray クラスターを Databricks クラスター上に作成できます。 このオートスケール統合により、Databricks 環境内で内部的に Databricks クラスターのオートスケールがトリガーされます。

オートスケールを有効にするには、次のコマンドを実行します。

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

オートスケールが有効な場合、 num_worker_nodesは Ray ワーカー ノードの最大数を示します。 Ray ワーカー ノードのデフォルトの最小数は 0 です。 このデフォルト設定は、Ray クラスターがアイドル状態の場合、Ray ワーカー ノードがゼロになるまでスケールダウンすることを意味します。 これは、すべてのシナリオで高速応答性を実現するのに理想的ではない可能性がありますが、有効にすると、コストを大幅に削減できます。

オートスケールモードでは、 num_worker_nodesray.util.spark.MAX_NUM_WORKER_NODESに設定できません。

次の引数は、アップスケーリングとダウンスケーリングの速度を設定します。

  • autoscale_upscaling_speed 保留中にできるノードの数を、現在のノード数の倍数で表します。 値が大きいほど、アップスケーリングがアグレッシブになります。 たとえば、これが 1.0 に設定されている場合、クラスターのサイズはいつでも最大 100% 増加する可能性があります。

  • autoscale_idle_timeout_minutes アイドル状態のワーカー ノードがオートスケーラーによって削除されるまでに経過する必要がある分数を表します。 値が小さいほど、ダウンスケーリングがアグレッシブになります。

Ray 2.9.0 以降では、 autoscale_min_worker_nodesを設定して、Ray クラスターがアイドル状態のときに、Ray クラスターがワーカー数ゼロにスケールダウンしないようにすることもできます。

Ray クライアントを使用してリモート Ray クラスターに接続する

Ray 2.9.0 では、 setup_ray_cluster API を使用して Ray クラスターを作成でき、同じノートブックでray.init() API を呼び出してこの Ray クラスターに接続できます。

次のコマンドを使用してリモート接続文字列を取得します。

from ray.util.spark import setup_ray_cluster

remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

次に、上記のリモート接続文字列を使用してリモート クラスターに接続できます。

import ray
ray.init(remote_conn_str)

Ray クライアントは、 ray.dataモジュールで定義された Ray データセット API をサポートしていません。 回避策として、次のコードに示すように、Ray データセット API を呼び出すコードをリモート Ray タスク内にラップできます。

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Spark DataFrame からデータを読み込む

Spark DataFrame Ray データセットとしてロードするには、まず、Parquet 形式を使用して Spark DataFrameを UC ボリュームまたはDatabricks Filesystem (非推奨) に保存する必要があります。 Databricks Filesystemアクセスを安全に制御するために、Databricks ではクラウド オブジェクト ストレージを DBFS にマウントすることをお勧めします。 次に、次のヘルパー メソッドを使用して、保存された Spark DataFrame パスからray.data.Datasetインスタンスを作成できます。

import ray
import os
from urllib.parse import urlparse


def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Databricks SQL ウェアハウスを介してUnity Catalogテーブルからデータをロードする

Ray 2.8.0 以降の場合、 ray.data.read_databricks_tables API を呼び出して Databricks Unity Catalog テーブルからデータをロードできます。

まず、 DATABRICKS_TOKEN環境変数を Databricks ウェアハウスへのアクセスに設定する必要があります。 Databricks Runtimeでプログラムを実行していない場合は、次のように、DATABRICKS_HOST 環境変数を Databricks ワークスペース URL に設定します。

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

次に、 ray.data.read_databricks_tables()を呼び出して Databricks SQL ウェアハウスから読み取ります。

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Ray ヘッド ノードによって使用されるリソースを構成する

デフォルトでは、Ray on Spark 構成の場合、Databricks は Ray ヘッド ノードに割り当てられるリソースを次のように制限します。

  • 0 CPU コア

  • 0 GPU

  • 128 MB のヒープ メモリ

  • 128 MB のオブジェクト ストア メモリ

これは、Ray ヘッド ノードは通常、グローバル調整のみに使用され、Ray タスクの実行には使用されないためです。 Spark ドライバー ノードのリソースは複数のユーザーで共有されるため、デフォルト設定では Spark ドライバー側のリソースが節約されます。

Ray 2.8.0 以降では、Ray ヘッド ノードによって使用されるリソースを構成できます。 setup_ray_cluster API で次の引数を使用します。

  • num_cpus_head_node: Rayヘッドノードで使用するCPUコアの設定

  • num_gpus_head_node:レイヘッドノードが使用するGPUの設定

  • object_store_memory_head_node: Ray headノードによるオブジェクトストアのメモリサイズ設定

異種クラスターのサポート

トレーニングをより効率的かつコスト効率よく実行するには、Ray on Spark クラスターを作成し、Ray ヘッド ノードと Ray ワーカー ノード間で異なる構成を設定できます。 ただし、すべての Ray ワーカー ノードは同じ構成である必要があります。 Databricks クラスターは異種クラスターを完全にはサポートしませんが、クラスター ポリシーを設定することで、異なるドライバー インスタンス タイプとワーカー インスタンス タイプを使用して Databricks クラスターを作成できます。

例:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

レイ クラスター構成 の調整

各 Ray ワーカー ノードの推奨構成は次のとおりです。

  • レイ ワーカー ノードあたり最低 4 つの CPU コア。

  • 各 Ray ワーカー ノードの最小 10 GB ヒープ メモリ。

そのため、 ray.util.spark.setup_ray_clusterを呼び出すときは、 num_cpus_per_node を値 >=4 に設定 Databricks ことをお勧めします。

各 Ray ワーカーノードのヒープメモリの調整の詳細については、「 Ray ワーカーノードのメモリ割り当て 」を参照してください。

レイワーカーノードの メモリ割り当て

各 Ray ワーカー ノードは、ヒープ メモリとオブジェクト ストア メモリの 2 種類のメモリを使用します。 各タイプに割り当てられたメモリサイズは、以下のように決定されます。

各 Ray ワーカー ノードに割り当てられる合計メモリは次のとおりです。

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES は、Spark ワーカー ノードで起動できる Ray ワーカー ノードの最大数です。 これは、引数 num_cpus_per_node または num_gpus_per_nodeによって決定されます。

引数 object_store_memory_per_nodeを設定しない場合、各 Ray ワーカー ノードに割り当てられるヒープ メモリ サイズとオブジェクト ストア メモリ サイズは次のようになります。

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

引数を設定すると、次の object_store_memory_per_nodeになります。

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

さらに、Ray ワーカー ノードあたりのオブジェクト ストア メモリ サイズは、オペレーティング システムの共有メモリによって制限されます。 最大値は次のとおりです。

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY は、Spark ワーカー ノード用に構成された /dev/shm ディスク サイズです。

Spark クラスター構成のヒント

Ray クラスターは Databricks Spark クラスター上で実行されます。 一般的なシナリオは、Spark ジョブと Spark UDF を使用して、GPU リソースを必要としない単純なデータ前処理タスクを実行することです。 次に、Ray を使用して、GPU の恩恵を受ける複雑な機械学習タスクを実行します。 この場合、Databricks では、すべての Spark DataFrame 変換と Spark UDF の実行で GPU リソースが使用されないように、Spark クラスター レベルの構成パラメーターspark.task.resource.gpu.amount0に設定することをお勧めします。

この構成の利点は次のとおりです。

  • 通常、GPU インスタンス タイプには GPU デバイスよりも多くの CPU コアが搭載されているため、Spark ジョブの並列処理が増加します。

  • Spark クラスターが複数のユーザーと共有されている場合、この構成により、Spark ジョブが同時に実行されている Ray ワークロードと GPU リソースをめぐって競合することがなくなります。

スタック トレースとフレーム グラフを Ray ダッシュボード アクタ ページで 有効にする

[Ray ダッシュボード アクタ] ページでは、アクティブな Ray アクタ のスタック トレースとフレーム グラフを表示できます。 この情報を表示するには、レイ クラスターを起動する前に、次のコマンドを使用して「py-spy」をインストールします。

%pip install py-spy

Ray クラスター をシャットダウンする

Databricks で実行されている Ray クラスターをシャットダウンするには、 ray.utils.spark.shutdown_ray_cluster APIです。

Ray クラスターは、次の場合にもシャットダウンします。

  • 対話型ノートブックを Databricks クラスターからデタッチします。

  • Databricks ジョブ が完了します。

  • Databricks クラスターが再起動または終了します。

  • 指定されたアイドル時間のアクティビティはありません。

ノートブック の例

次のノートブックは、Ray クラスターを作成し、Databricks で Ray アプリケーションを実行する方法を示しています。

Spark スターターノートブック のレイ

ノートブックを新しいタブで開く

制限

  • マルチユーザー共有 Databricks クラスター (分離モードが有効) はサポートされていません。

  • %pip を使用してパッケージをインストールすると、レイ クラスターがシャットダウンします。 %pipを使用してすべてのライブラリのインストールが完了したら、必ずRayを起動してください。

  • ray.util.spark.setup_ray_cluster から設定を上書きする統合を使用すると、Ray クラスターが不安定になり、Ray コンテキストがクラッシュする可能性があります。たとえば、 xgboost_ray パッケージを使用し、アクターまたは cpus_per_actor 構成で RayParams をレイ クラスター構成を超えて設定すると、レイ クラスターがサイレントにクラッシュする可能性があります。