Use Ray on Databricks

Preview

This feature is in Public Preview.

Ray 2.3.0 and above supports creating Ray clusters and running Ray applications on Apache Spark clusters with Databricks. For information about getting started with machine learning on Ray, including tutorials and examples, see the Ray documentation. For more information about the Ray and Apache Spark integration, see the Ray on Spark API documentation.

Requirements

  • Databricks Runtime 12.0 ML and above.

  • Databricks Runtime cluster access mode must be either “assigned” mode or “no isolation shared” mode.

Install Ray

Use the following command to install Ray. The [default] extension is required by the Ray dashboard component.

%pip install ray[default]>=2.3.0

Create a Ray cluster

To create a Ray cluster, use the ray.util.spark.setup_ray_cluster API.

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

The ray.util.spark.setup_ray_cluster API creates a Ray cluster on Spark. Internally, it creates a background Spark job. Each Spark task in the job creates a Ray worker node, and the Ray head node is created on the driver. The argument num_worker_nodes represents the number of Ray worker nodes to create. To specify the number of CPU or GPU cores assigned to each Ray worker node, set the argument num_cpus_per_node or num_gpus_per_node.

After a Ray cluster is created, you can run any Ray application code directly in your notebook. An HTML link Open Ray Cluster Dashboard in a new tab is also displayed, allowing you to view the Ray dashboard for the cluster.

Tip

If you’re using a Databricks assigned mode cluster, you can set num_worker_nodes to ray.util.spark.MAX_NUM_WORKER_NODES in order to use all available resources for your Ray cluster.

setup_ray_cluster(
 # ...
 num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

You can set the argument collect_log_to_path to specify the destination path where you want to collect the Ray cluster logs. Log collection runs after the Ray cluster is shut down. Databricks recommends that you set a path starting with /dbfs/ so that the logs are preserved even if you terminate the Spark cluster. Otherwise, your logs are not recoverable since the local storage on the cluster is deleted when the cluster is shut down.

Note

Calling ray.util.spark.setup_ray_cluster sets the RAY_ADDRESS environment variable to the address of the created Ray cluster, so your Ray application automatically uses this Ray cluster. You can specify an alternative cluster address using the address argument of the ray.init API.

Run a Ray application

After the Ray cluster has been created, you can run any Ray application code in a Databricks notebook.

Important

Databricks recommends that you install any necessary libraries for your application with %pip install <your-library-dependency> to ensure they are available to your Ray cluster and application accordingly. Specifying dependencies in the Ray init function call installs the dependencies in a location inaccessible to the Spark worker nodes, which results in version incompatibilities and import errors.

For example, you can run a simple Ray application in a Databricks notebook as follows:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Load data from a Spark DataFrame

To load a Spark DataFrame as a Ray Dataset, firstly you need to save the spark DataFrame to DBFS using Parquet or Delta format. In order to control DBFS access securely, Databricks recommends that you mount cloud object storage to DBFS. Then, you can create a ray.data.Dataset instance from the saved Spark DataFrame path using the following helper method:

import ray
import os
from urllib.parse import urlparse


def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_df.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Tune the Ray cluster configuration

The recommended configuration for each Ray worker node is:

  • Minimum 4 CPU cores per Ray worker node.

  • Minimum 10GB heap memory for each Ray worker node.

So, when calling ray.util.spark.setup_ray_cluster, Databricks recommends setting num_cpus_per_node to a value >=4.

See Memory allocation for Ray worker nodes for details about tuning heap memory for each Ray worker node.

Memory allocation for Ray worker nodes

Each Ray worker node uses two types of memory: heap memory and object store memory. The allocated memory size for each type is determined as described below.

The total memory allocated to each Ray worker node is:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES is the maximum number of Ray worker nodes that can be launched on the Spark worker node. This is determined by the argument num_cpus_per_node or num_gpus_per_node.

If you do not set the argument object_store_memory_per_node, then the heap memory size and object store memory size allocated to each Ray worker node are:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

If you do set the argument object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

In addition, the object store memory size per Ray worker node is limited by the shared memory of the operating system. The maximum value is:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY is the /dev/shm disk size configured for the Spark worker node.

Enable stack traces and flame graphs on the Ray Dashboard Actors page

On the Ray Dashboard Actors page, you can view stack traces and flame graphs for active Ray actors. To view this information, use the following command to install “py-spy” before you starting the Ray cluster:

%pip install py-spy

Shut down a Ray cluster

To shut down a Ray cluster running on Databricks, you can call the ray.utils.spark.shutdown_ray_cluster API.

Note

Ray clusters also shut down when:

  • You detach your interactive notebook from your Databricks cluster.

  • Your Databricks job completes.

  • Your Databricks cluster is restarted or terminated.

  • There’s no activity for the specified idle time.

Example notebook

The following notebook demonstrates how to create a Ray cluster and run a Ray application on Databricks.

Ray on Spark starter notebook

Open notebook in new tab

Limitations

  • Ray cluster autoscaling is not supported yet. The API ray.util.spark.setup_ray_cluster can only start Ray cluster with fixed number of Ray worker nodes.

  • Multi-user shared Databricks clusters (isolation mode enabled) are not supported.

  • When using %pip to install packages, the Ray cluster will shut down. Make sure to start Ray after you’re done installing all of your libraries with %pip.

  • Using integrations that override the configuration from ray.util.spark.setup_ray_cluster can cause the Ray cluster to become unstable and can crash the Ray context. For example, using the xgboost_ray package and setting RayParams with an actor or cpus_per_actor configuration in excess of the Ray cluster configuration can silently crash the Ray cluster.