Databricks で Ray クラスターをスケールする

オートスケール、ヘッドノード構成、異種クラスター、リソース割り当てなど、Ray クラスターのサイズを調整して最適なパフォーマンスを得る方法を学習します。

オートスケールモードでレイクラスターを作成する

Ray 2.8.0 以降では、Ray クラスターはDatabricksで開始され、 Databricksオートスケールとの統合をサポートしています。 このオートスケール統合により、 Databricks環境内でDatabricksクラスター オートスケールが内部的にトリガーされます。

オートスケールを有効にするには、次のコマンドを実行します。

Ray バージョン 2.10 未満の場合:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
)

Ray バージョン 2.10 以降の場合:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

ray.util.spark.setup_ray_cluster API は、Apache Spark 上に Ray クラスターを作成します。 内部的には、バックグラウンドの Apache Spark ジョブが作成されます。 ジョブ内の各 Apache Spark タスクは Ray ワーカー ノードを作成し、Ray ヘッド ノードはドライバー上に作成されます。 引数min_worker_nodesmax_worker_nodesは、Ray ワークロード用に作成および使用する Ray ワーカー ノードの範囲を表します。 引数min_worker_nodesが未定義のままの場合、固定サイズの Ray クラスターが、使用可能なワーカー数max_worker_nodesで起動されます。 各 Ray ワーカー ノードに割り当てられる CPU または GPU コアの数を指定するには、引数num_cpus_worker_node (デフォルト値: 1) またはnum_gpus_worker_node (デフォルト値: 0) を設定します。

Ray バージョン 2.10 未満でオートスケールが有効な場合、 num_worker_nodes Ray ワーカー ノードの最大数を示します。 Ray ワーカー ノードのデフォルトの最小数は 0 です。 このデフォルト設定は、Ray クラスターがアイドル状態のときに、Ray ワーカー ノードが 0 にスケールダウンされることを意味します。 これは、すべてのシナリオで高速応答性を実現するのに理想的ではない可能性がありますが、有効にするとコストを大幅に削減できます。

オートスケールモードでは、ワーカーをray.util.spark.MAX_NUM_WORKER_NODESに設定できません。

次の引数は、アップスケーリングとダウンスケーリングの速度を設定します。

  • autoscale_upscaling_speed 保留中にできるノードの数を、現在のノード数の倍数で表します。 値が大きいほど、アップスケーリングがアグレッシブになります。 たとえば、これを 1.0 に設定すると、クラスターのサイズはいつでも最大 100% まで拡大できます。

  • autoscale_idle_timeout_minutes オートスケーラーがアイドル状態のワーカーノードを削除するまでに経過する必要がある分数を表します。 値が小さいほど、ダウンスケーリングがアグレッシブになります。

Ray 2.9.0 以降では、Ray クラスターがアイドル状態のときに Ray クラスターがワーカー 0 にスケールダウンしてクラスターが終了するのを防ぐために、 autoscale_min_worker_nodesを設定することもできます。

Rayヘッドノードが使用するリソースを構成する

デフォルトでは、Ray on Spark 構成の場合、Databricks は Ray ヘッド ノードに割り当てられるリソースを次のように制限します。

  • 0 CPU コア

  • 0 GPU

  • 128 MB のヒープ メモリ

  • 128 MB のオブジェクト ストア メモリ

これは、Ray ヘッド ノードが通常、Ray タスクの実行ではなく、グローバル調整にのみ使用されるためです。 Apache Spark ドライバー ノードのリソースは複数のユーザーと共有されるため、デフォルト設定では Apache Spark ドライバー側のリソースが節約されます。 Ray 2.8.0 以降では、Ray ヘッド ノードで使用されるリソースを構成できます。 setup_ray_cluster API では次の引数を使用します。

  • num_cpus_head_node: Rayヘッドノードで使用するCPUコアの設定

  • num_gpus_head_node:Rayヘッドノードが使用するGPUの設定

  • object_store_memory_head_node: Ray headノードによるオブジェクトストアのメモリサイズ設定

異機種クラスターのサポート

より効率的でコスト効率の高いトレーニング実行のために Ray on Spark クラスターを作成し、Ray ヘッド ノードと Ray ワーカー ノード間で異なる構成を設定できます。 ただし、すべての Ray ワーカー ノードは同じ構成である必要があります。 Databricksクラスターは異種クラスターを完全にサポートしていませんが、クラスターポリシーを設定することで、異なるドライバーおよびワーカー インスタンス タイプを持つDatabricksクラスターを作成できます。 例えば:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Rayクラスター構成の調整

各 Ray ワーカー ノードの推奨構成は次のとおりです: Ray ワーカー ノードあたり最低 4 個の CPU コア。 各 Ray ワーカー ノードに最低 10 GB のヒープ メモリ。

したがって、 ray.util.spark.setup_ray_clusterを呼び出す場合、Databricks ではnum_cpus_per_node 4 以上の値に設定することをお勧めします。

各 Ray ワーカー ノードのヒープ メモリの調整の詳細については、次のセクションを参照してください。

Rayワーカーノードのメモリ割り当て

各 Ray ワーカー ノードは、ヒープ メモリとオブジェクト ストア メモリの 2 種類のメモリを使用します。

各タイプに割り当てられるメモリサイズは、以下のように決定されます。

各 Ray ワーカー ノードに割り当てられるメモリの合計は次のとおりです。 RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES Apache Spark ワーカー ノードで起動できる Ray ワーカー ノードの最大数です。 これは、引数 num_cpus_per_node または num_gpus_per_nodeによって決定されます。

引数object_store_memory_per_nodeを設定しない場合、各 Ray ワーカー ノードに割り当てられるヒープ メモリ サイズとオブジェクト ストア メモリ サイズは次のようになります。 RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7 OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

引数を object_store_memory_per_nodeに設定すると、次のようになります。 RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

さらに、Ray ワーカー ノードごとのオブジェクト ストアのメモリ サイズは、オペレーティング システムの共有メモリによって制限されます。 最大値は次のとおりです。 OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY Apache Spark ワーカー ノード用に構成された/dev/shmディスク サイズです。

スケーリングのベストプラクティス

各レイワーカーノードのCPUとGPUの数を設定する

引数num_cpus_worker_nodeを Apache Spark ワーカー ノードあたりの CPU コアの数に設定することをお勧めします。 同様に、 num_gpus_worker_nodeを Apache Spark ワーカーノードあたりの GPU の数に設定するのが最適です。 この構成では、各 Apache Spark ワーカー ノードは 1 つの Ray ワーカー ノードを起動し、各 Apache Spark ワーカー ノードのリソースを最大限に活用します。

Apache Spark クラスターを起動するときに、Databricks クラスター構成内で環境変数RAY_memory_monitor_refresh_ms0に設定します。

Apache Spark および Ray ハイブリッド ワークロードのメモリ リソース構成

クラスターでハイブリッド および Ray ワークロードを実行する場合、SparkDatabricksDatabricks Sparkでは メモリを小さな値に減らすことを推奨しています。たとえば、Databricks クラスター構成でspark.executor.memory 4gを設定します。

Apache Sparkエグゼキューターは GC を遅延トリガーするJavaプロセスであり、 Apache Sparkキャッシュは大量のApache Sparkエグゼキューター メモリを使用します。 これにより、Ray が使用できるメモリが減少します。 メモリ不足エラーを回避するには、 spark.executor.memory 構成を減らします。

Apache Spark および Ray ハイブリッド ワークロードの計算リソース構成

Databricks クラスターでハイブリッド Spark および Ray ワークロードを実行する場合は、クラスター ノードまたは Ray ワーカー ノードのいずれかを自動スケーラブルにすることをお勧めします。 例えば:

Databricksクラスターを起動するために使用できるワーカー ノードの数が固定されている場合は、Ray-on- Sparkオートスケールを有効にすることをお勧めします。 Ray ワークロードが実行されていない場合、Ray クラスターはスケールダウンされ、リソースが解放されて Apache Spark タスクで使用できるようになります。 Apache Spark タスクが終了し、Ray が再度使用されると、Ray-on-Spark クラスターは需要を満たすために再びスケールアップします。

さらに、Databricks クラスターと Ray-on-spark クラスターを自動スケーラブルにすることができます。 たとえば、Databricks クラスターの自動スケーラブル ノードを最大 10 ノードに構成し、Ray-on-Spark ワーカー ノードを最大 4 ノードに構成し、各 Apache Spark ワーカーのリソースをフルに活用するように各 Ray ワーカー ノードを構成した場合、Ray ワークロードはそのようなクラスター構成で最大 4 つのノード リソースを使用できます。 これに対し、Apache Spark ジョブは、最大 6 ノード相当のリソースを割り当てることができます。