Databricks で Ray クラスターを起動する

Databricks は、Apache Spark ジョブと同じ方法でクラスターとジョブの構成を処理することにより、Ray クラスターの起動プロセスを簡素化します。 これは、Ray クラスターが実際には管理された Apache Spark クラスター上で起動されるためです。

ローカルマシンでRayを実行する

import ray

ray.init()

DatabricksでRayを実行する

from ray.util.spark import setup_ray_cluster
import ray

# If the cluster has four workers with 8 CPUs each as an example
setup_ray_cluster(num_worker_nodes=4, num_cpus_per_worker=8)

# Pass any custom configuration to ray.init
ray.init(ignore_reinit_error=True)

このアプローチは、数個のノードから数百個のノードまでのあらゆるクラスター規模で機能します。 Databricksの Ray クラスターもオートスケールをサポートしています。

Ray クラスターを作成したら、Databricks ノートブックで任意の Ray アプリケーション コードを実行できます。

重要

Databricks では、Ray クラスターとアプリケーションで適切に使用できるように、 %pip install <your-library-dependency>を使用してアプリケーションに必要なライブラリをインストールすることをお勧めします。 Ray init 関数呼び出しで依存関係を指定すると、Apache Spark ワーカー ノードがアクセスできない場所に依存関係がインストールされ、バージョンの非互換性やインポート エラーが発生します。

たとえば、次のようにして、Databricks ノートブックで単純な Ray アプリケーションを実行できます。

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Ray クラスターをシャットダウンする

レイ クラスターは、次の状況下では自動的にシャットダウンします。

  • 対話型ノートブックを Databricks クラスターから切り離します。

  • Databricks ジョブが完了しました。

  • Databricks クラスターが再起動または終了しました。

  • 指定したアイドル時間にはアクティビティがありません。

Databricks 上で実行されている Ray クラスターをシャットダウンするには、 ray.utils.spark.shutdown_ray_cluster API を呼び出します。

from ray.utils.spark import shutdown_ray_cluster
import ray

shutdown_ray_cluster()
ray.shutdown()