Use Ray on Databricks

Preview

This feature is in Public Preview.

With Ray 2.3.0 and above, you can create Ray clusters and run Ray applications on Apache Spark clusters with Databricks. For information about getting started with machine learning on Ray, including tutorials and examples, see the Ray documentation. For more information about the Ray and Apache Spark integration, see the Ray on Spark API documentation.

Requirements

  • Databricks Runtime 12.0 ML and above.

  • Databricks Runtime cluster access mode must be either “Assigned” mode or “No isolation shared” mode.

Install Ray

Use the following command to install Ray. The [default] extension is required by the Ray dashboard component.

%pip install ray[default]>=2.3.0

Create a user-specific Ray cluster in a Databricks cluster

To create a Ray cluster, use the ray.util.spark.setup_ray_cluster API.

In any Databricks notebook that is attached to a Databricks cluster, you can run the following command:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

The ray.util.spark.setup_ray_cluster API creates a Ray cluster on Spark. Internally, it creates a background Spark job. Each Spark task in the job creates a Ray worker node, and the Ray head node is created on the driver. The argument num_worker_nodes represents the number of Ray worker nodes to create. To specify the number of CPU or GPU cores assigned to each Ray worker node, set the argument num_cpus_per_node or num_gpus_per_node.

After a Ray cluster is created, you can run any Ray application code directly in your notebook. Click Open Ray Cluster Dashboard in a new tab to view the Ray dashboard for the cluster.

Tip

If you’re using a Databricks single user cluster, you can set num_worker_nodes to ray.util.spark.MAX_NUM_WORKER_NODES to use all available resources for your Ray cluster.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

You can set the argument collect_log_to_path to specify the destination path where you want to collect the Ray cluster logs. Log collection runs after the Ray cluster is shut down. Databricks recommends that you set a path starting with /dbfs/ so that the logs are preserved even if you terminate the Spark cluster. Otherwise, your logs are not recoverable since the local storage on the cluster is deleted when the cluster is shut down.

Note

“To have your Ray application automatically use the Ray cluster that was created, call ray.util.spark.setup_ray_cluster to set the RAY_ADDRESS environment variable to the address of the Ray cluster.” You can specify an alternative cluster address using the address argument of the ray.init API.

Run a Ray application

After the Ray cluster has been created, you can run any Ray application code in a Databricks notebook.

Important

Databricks recommends that you install any necessary libraries for your application with %pip install <your-library-dependency> to ensure they are available to your Ray cluster and application accordingly. Specifying dependencies in the Ray init function call installs the dependencies in a location inaccessible to the Spark worker nodes, which results in version incompatibilities and import errors.

For example, you can run a simple Ray application in a Databricks notebook as follows:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Create a Ray cluster in autoscaling mode

In Ray 2.8.0 and above, Ray clusters started on Databricks support integration with Databricks autoscaling. See Databricks cluster autoscaling.

With Ray 2.8.0 and above, you can create a Ray cluster on a Databricks cluster that supports scaling up or down according to workloads. This autoscaling integration triggers Databricks cluster autoscaling internally within the Databricks environment.

To enable autoscaling, run the following command:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

If autoscaling is enabled, num_worker_nodes indicates the maximum number of Ray worker nodes. The default minimum number of Ray worker nodes is zero. This default setting means that when the Ray cluster is idle, it scales down to zero Ray worker nodes. This may not be ideal for fast responsiveness in all scenarios, but when enabled, can greatly reduce costs.

In autoscaling mode, num_worker_nodes cannot be set to ray.util.spark.MAX_NUM_WORKER_NODES.

The following arguments configure the upscaling and downscaling speed:

  • autoscale_upscaling_speed represents the number of nodes allowed to be pending as a multiple of the current number of nodes. The higher the value, the more aggressive the upscaling. For example, if this is set to 1.0, the cluster can grow in size by at most 100% at any time.

  • autoscale_idle_timeout_minutes represents the number of minutes that need to pass before an idle worker node is removed by the autoscaler. The smaller the value, the more aggressive the downscaling.

With Ray 2.9.0 and above, you can also set autoscale_min_worker_nodes to prevent the Ray cluster from scaling down to zero workers when the Ray cluster is idle.

Connect to remote Ray cluster using Ray client

In Ray 2.9.0, you can create a Ray cluster using the setup_ray_cluster API, and in the same notebook, you can call ray.init() API to connect to this Ray cluster.

To get the remote connection string using the following:

from ray.util.spark import setup_ray_cluster

remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Then you can connect the remote cluster using the above remote connection string:

import ray
ray.init(remote_conn_str)

The Ray client does not support the Ray dataset API defined in the ray.data module. As a workaround, you can wrap your code that calls the Ray dataset API inside a remote Ray task, as shown in the following code:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Load data from a Spark DataFrame

To load a Spark DataFrame as a Ray Dataset, first, you must save the Spark DataFrame to UC volumes or Databricks Filesystem (deprecated) using Parquet format. To control Databricks Filesystem access securely, Databricks recommends that you mount cloud object storage to DBFS. Then, you can create a ray.data.Dataset instance from the saved Spark DataFrame path using the following helper method:

import ray
import os
from urllib.parse import urlparse


def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Load data from a Unity Catalog table through Databricks SQL warehouse

For Ray 2.8.0 and above, you can call the ray.data.read_databricks_tables API to load data from a Databricks Unity Catalog table.

First, you need to set the DATABRICKS_TOKEN environment variable to your Databricks warehouse access token. If you’re not running your program on Databricks Runtime, also set the DATABRICKS_HOST environment variable to the Databricks workspace URL, as shown in the following:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Then, call ray.data.read_databricks_tables() to read from the Databricks SQL warehouse.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Configure resources used by Ray head node

By default, for the Ray on Spark configuration, Databricks restricts resources allocated to the Ray head node to:

  • 0 CPU cores

  • 0 GPUs

  • 128 MB heap memory

  • 128 MB object store memory

This is because the Ray head node is usually used only for global coordination, not for executing Ray tasks. The Spark driver node resources are shared with multiple users, so the default setting saves resources on the Spark driver side.

With Ray 2.8.0 and above, you can configure resources used by the Ray head node. Use the following arguments in the setup_ray_cluster API:

  • num_cpus_head_node: setting CPU cores used by Ray head node

  • num_gpus_head_node: setting GPU used by Ray head node

  • object_store_memory_head_node: setting object store memory size by Ray head node

Support for heterogeneous clusters

For more efficient and cost effective training runs, you can create a Ray on Spark cluster and set different configurations between the Ray head node and Ray worker nodes. However, all Ray worker nodes must have the same configuration. Databricks clusters do not fully support heterogeneous clusters, but you can create a Databricks cluster with different driver and worker instance types by setting a cluster policy.

For example:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Tune the Ray cluster configuration

The recommended configuration for each Ray worker node is:

  • Minimum 4 CPU cores per Ray worker node.

  • Minimum 10GB heap memory for each Ray worker node.

So, when calling ray.util.spark.setup_ray_cluster, Databricks recommends setting num_cpus_per_node to a value >=4.

See Memory allocation for Ray worker nodes for details about tuning heap memory for each Ray worker node.

Memory allocation for Ray worker nodes

Each Ray worker node uses two types of memory: heap memory and object store memory. The allocated memory size for each type is determined as described below.

The total memory allocated to each Ray worker node is:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES is the maximum number of Ray worker nodes that can be launched on the Spark worker node. This is determined by the argument num_cpus_per_node or num_gpus_per_node.

If you do not set the argument object_store_memory_per_node, then the heap memory size and object store memory size allocated to each Ray worker node are:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

If you do set the argument object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

In addition, the object store memory size per Ray worker node is limited by the shared memory of the operating system. The maximum value is:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY is the /dev/shm disk size configured for the Spark worker node.

Tips for Spark cluster configuration

The Ray cluster runs on top of a Databricks Spark cluster. A common scenario is to use a Spark job and Spark UDF to do simple data preprocessing tasks that do not need GPU resources. Then, use Ray to execute complicated machine learning tasks that benefit from GPUs. In this case, Databricks recommends setting the Spark cluster level configuration parameter spark.task.resource.gpu.amount to 0, so that all Spark DataFrame transformations and Spark UDF executions do not use GPU resources.

The benefits of this configuration are the following:

  • It increases Spark job parallelism, because the GPU instance type usually has many more CPU cores than GPU devices.

  • If the Spark cluster is shared with multiple users, this configuration prevents Spark jobs from competing for GPU resources with concurrently running Ray workloads.

Enable stack traces and flame graphs on the Ray Dashboard Actors page

On the Ray Dashboard Actors page, you can view stack traces and flame graphs for active Ray actors. To view this information, use the following command to install “py-spy” before you starting the Ray cluster:

%pip install py-spy

Shut down a Ray cluster

To shut down a Ray cluster running on Databricks, you can call the ray.utils.spark.shutdown_ray_cluster API.

Note

Ray clusters also shut down when:

  • You detach your interactive notebook from your Databricks cluster.

  • Your Databricks job completes.

  • Your Databricks cluster is restarted or terminated.

  • There’s no activity for the specified idle time.

Example notebook

The following notebook demonstrates how to create a Ray cluster and run a Ray application on Databricks.

Ray on Spark starter notebook

Open notebook in new tab

Limitations

  • Multi-user shared Databricks clusters (isolation mode enabled) are not supported.

  • When using %pip to install packages, the Ray cluster will shut down. Make sure to start Ray after you’re done installing all of your libraries with %pip.

  • Using integrations that override the configuration from ray.util.spark.setup_ray_cluster can cause the Ray cluster to become unstable and can crash the Ray context. For example, using the xgboost_ray package and setting RayParams with an actor or cpus_per_actor configuration in excess of the Ray cluster configuration can silently crash the Ray cluster.