Create and connect to Ray clusters on Databricks
Learn how to create, configure, and run Ray compute clusters on Databricks
Requirements
To create a Ray cluster, you must have access to a Databricks all-purpose compute resource with the following settings:
Databricks Runtime 12.2 LTS ML and above.
Access mode must be either Single user or No isolation shared.
Note
Ray clusters are currently not supported on serverless compute.
Install Ray
With Databricks Runtime ML 15.0 onwards, Ray is preinstalled on Databricks clusters.
For runtimes released prior to 15.0, use pip to install Ray on your cluster:
%pip install ray[default]>=2.3.0
Create a user-specific Ray cluster in a Databricks cluster
To create a Ray cluster, use the ray.util.spark.setup_ray_cluster API.
Note
When you create a Ray cluster in a notebook, it is only available to the current notebook user. The Ray cluster is automatically shut down after the notebook is detached from the cluster or after 30 minutes of inactivity (no tasks have been submitted to Ray). If you want to create a Ray cluster that is shared with all users and is not subject to an actively running notebook, use the ray.util.spark.setup_global_ray_cluster
API instead.
Fixed-size Ray cluster
In any Databricks notebook that is attached to a Databricks cluster, you can run the following command to start a fixed-size Ray cluster:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
Auto-scaling Ray cluster
To learn how to start an auto-scaling Ray cluster, see Scale Ray clusters on Databricks.
Starting a global mode Ray cluster
Using Ray 2.9.0 and above, you can create a global mode Ray cluster on a Databricks cluster. A global mode Ray cluster allows all users attached to the Databricks cluster to also use the Ray cluster. This mode of running a Ray cluster doesn’t have the active timeout functionality that a single user cluster has when running a single user Ray cluster instance.
To start a global ray cluster that multiple users can attach to and run Ray tasks on, start by creating a Databricks notebook job and attach it to a shared mode Databricks cluster, then run the following command:
from ray.util.spark import setup_global_ray_cluster
setup_global_ray_cluster(
max_worker_nodes=2,
...
# other arguments are the same as with the `setup_global_ray` API.
)
This is a blocking call that will remain active until you interrupt the call by clicking the “Interrupt” button on the notebook command cell, detaching the notebook from the Databricks cluster, or terminating the Databricks cluster. Otherwise, the global mode Ray cluster will continue to run and be available for task submission by authorized users. For more information on global mode clusters, see Ray API Documentation.
Global mode clusters have the following properties:
In a Databricks cluster, you can only create one active global mode Ray cluster at a time.
In a Databricks cluster, the active global mode Ray cluster can be used by all users in any attached Databricks notebook. You can run
ray.init()
to connect to the active global mode Ray cluster. Because multiple users can access this Ray cluster, resource contention might be an issue.Global mode Ray cluster is up until the
setup_ray_cluster
call is interrupted. It does not have an automatic shutdown timeout as single user Ray clusters do.
Create a Ray GPU cluster
For GPU clusters, these resources can be added to the Ray cluster in the following way:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=8,
num_gpus_per_node=1,
num_cpus_head_node=8,
num_gpus_head_node=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
Connect to remote Ray cluster using Ray client
In Ray version 2.3.0 and above, you can create a Ray cluster using the setup_ray_cluster API, and in the same notebook, you can call ray.init() API to connect to this Ray cluster. To get the remote connection string, use the following:
from ray.util.spark import setup_ray_cluster
_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)
Then, you can connect the remote cluster using the above remote connection string:
import ray
ray.init(remote_conn_str)
The Ray client does not support the Ray dataset API defined in the ray.data module. As a workaround, you can wrap your code that calls the Ray dataset API inside a remote Ray task, as shown in the following code:
import ray
import pandas as pd
# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")
@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()
ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI
For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:
``` shell
ray job submit --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py
The values that need to be configured are the Databricks workspace URL, starting with https://
, and then the values found after the /driver-proxy/o/
are found in the Ray Dashboard proxy URL displayed after the Ray cluster is started.
The Ray Job CLI is used for submitting jobs to a Ray cluster from external systems but is not required for submitting jobs on Ray clusters on Databricks. It is recommended that the job be deployed using Databricks Jobs, a Ray cluster per application be created, and existing Databricks tooling, such as Databricks Asset Bundles or Workflow Triggers, be used to trigger the job.
Set a log output location
You can set the argument collect_log_to_path
to specify the destination path where you want to collect the Ray cluster logs. Log collection runs after the Ray cluster is shut down.
Databricks recommends setting a path starting with /dbfs/
or Unity Catalog Volume path to preserve the logs even if you terminate the Apache Spark cluster. Otherwise, your logs are not recoverable since the local storage on the cluster is deleted when the cluster is shut down.
After creating a Ray cluster, you can run any Ray application code directly in your notebook. Click Open Ray Cluster Dashboard in a new tab to view the Ray dashboard for the cluster.
Enable stack traces and flame graphs on the Ray Dashboard Actors page
On the Ray Dashboard Actors page, you can view stack traces and flame graphs for active Ray actors. To view this information, use the following command to install py-spy before you start the Ray cluster:
%pip install py-spy
Create and configure best practices
This section covers best practices for creating and configuring Ray clusters.
Non-GPU workloads
The Ray cluster runs on top of a Databricks Spark cluster. A typical scenario is to use a Spark job and Spark UDF to do simple data preprocessing tasks that do not need GPU resources. Then, use Ray to run complicated machine learning tasks that benefit from GPUs. In this case, Databricks recommends setting the Apache Spark cluster level configuration parameter spark.task.resource.gpu.amount to 0 so that all Apache Spark DataFrame transformations and Apache Spark UDF executions do not use GPU resources.
The benefits of this configuration are the following:
It increases Apache Spark job parallelism because the GPU instance type usually has many more CPU cores than GPU devices.
If the Apache Spark cluster is shared with multiple users, this configuration prevents Apache Spark jobs from competing for GPU resources with concurrently running Ray workloads.
Disable transformers
trainer MLflow integration if using it in Ray tasks
The transformers
trainer MLflow integration is enabled by default from within the transformers
library. If you use Ray train to fine-tune a transformers
model, Ray tasks will fail due to a credential issue. However, this issue does not apply if you directly use MLflow for training.
To avoid this issue, you can set the DISABLE_MLFLOW_INTEGRATION
environment variable to ‘TRUE’ from within the Databricks cluster configuration when starting your Apache Spark cluster.
Address Ray remote function pickling error
To run Ray tasks, Ray pickles the task function. If you find pickling failed, you must diagnose which part of your code causes the failure. Common causes of pickling errors are the handling of external references, closures, and references to stateful objects. One of the easiest errors to verify and quickly correct can be remedied by moving import statements within the task function declaration.
For example, datasets.load_dataset
is a widely used function that is patched in Databricks Runtime driver side, rendering the reference unpickle-able. To address it, you can simply write the task function as follows:
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...
Disable Ray memory monitor if the Ray task is unexpectedly killed with an out-of-memory (OOM) error
In Ray 2.9.3, Ray memory monitor has several known issues that can cause Ray tasks to be inadvertently stopped without cause.
To address the issue, you can disable the Ray memory monitor by setting the environment variable RAY_memory_monitor_refresh_ms
to 0
within the Databricks cluster configuration when starting your Apache Spark cluster.
Applying transformation functions to batches of data
When processing data in batches, it is recommended to use the Ray Data API with the map_batches
function. This approach can be more efficient and scalable, especially for large datasets or complex computations that benefit from batch processing. Any Spark DataFrame can be converted to a Ray Dataset using the ray.data.from_spark
API. The processed output from calling this transformation API can be written out to Databricks UC tables using the API ray.data.write_databricks_table
.
Using MLflow in Ray tasks
To use MLflow in Ray tasks, you will need to :
Define Databricks MLflow credentials within Ray tasks.
Create MLflow runs within the Apache Spark Driver and pass the created
run_id
to the Ray tasks.
The following code example demonstrates how to do this:
import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(experiment_name)
# We need to use the run created in <AS> driver side,
# and set `nested=True` to make it a nested run inside the
# parent run.
with mlflow.start_run(run_id=run_id, nested=True):
mlflow.log_metric(f"task_{x}_metric", x)
return x
with mlflow.start_run() as run: # create MLflow run in <AS> driver side.
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Use notebook-scoped Python libraries or cluster Python libraries in Ray tasks
Currently, Ray has a known issue where Ray tasks can’t use notebook scoped python libraries or
cluster python libraries. To utilize additional dependencies within your Ray jobs, you must manually install libraries using the %pip
magic command before launching a Ray-on-Spark cluster that will use these dependencies within tasks. For example, to update the version of Ray that will be used to start the Ray cluster, you can
run the following command in your notebook:
%pip install ray==<The Ray version you want to use> --force-reinstall
Then, run the following command in your notebook to restart the Python kernel:
dbutils.library.restartPython()