Create and connect to Ray clusters on Databricks

Learn how to create, configure, and run Ray compute clusters on Databricks

Requirements

To create a Ray cluster, you must have access to a Databricks all-purpose compute resource with the following settings:

  • Databricks Runtime 12.2 LTS ML and above.

  • Access mode must be either Single user or No isolation shared.

Note

Ray clusters are currently not supported on serverless compute.

Install Ray

With Databricks Runtime ML 15.0 onwards, Ray is preinstalled on Databricks clusters.

For runtimes released prior to 15.0, use pip to install Ray on your cluster:

%pip install ray[default]>=2.3.0

Create a user-specific Ray cluster in a Databricks cluster

To create a Ray cluster, use the ray.util.spark.setup_ray_cluster API.

Note

When you create a Ray cluster in a notebook, it is only available to the current notebook user. The Ray cluster is automatically shut down after the notebook is detached from the cluster or after 30 minutes of inactivity (no tasks have been submitted to Ray). If you want to create a Ray cluster that is shared with all users and is not subject to an actively running notebook, use the ray.util.spark.setup_global_ray_cluster API instead.

Fixed-size Ray cluster

In any Databricks notebook that is attached to a Databricks cluster, you can run the following command to start a fixed-size Ray cluster:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Auto-scaling Ray cluster

To learn how to start an auto-scaling Ray cluster, see Scale Ray clusters on Databricks.

Starting a global mode Ray cluster

Using Ray 2.9.0 and above, you can create a global mode Ray cluster on a Databricks cluster. A global mode Ray cluster allows all users attached to the Databricks cluster to also use the Ray cluster. This mode of running a Ray cluster doesn’t have the active timeout functionality that a single-user cluster has when running a single-user Ray cluster instance.

To start a global ray cluster that multiple users can attach to and run Ray tasks on, start by creating a Databricks notebook job and attach it to a shared mode Databricks cluster, then run the following command:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

This is a blocking call that will remain active until you interrupt the call by clicking the “Interrupt” button on the notebook command cell, detaching the notebook from the Databricks cluster, or terminating the Databricks cluster. Otherwise, the global mode Ray cluster will continue to run and be available for task submission by authorized users. For more information on global mode clusters, see Ray API Documentation.

Global mode clusters have the following properties:

  • In a Databricks cluster, you can only create one active global mode Ray cluster at a time.

  • In a Databricks cluster, the active global mode Ray cluster can be used by all users in any attached Databricks notebook. You can run ray.init() to connect to the active global mode Ray cluster. Because multiple users can access this Ray cluster, resource contention might be an issue.

  • Global mode Ray cluster is up until the setup_ray_cluster call is interrupted. It does not have an automatic shutdown timeout as single-user Ray clusters do.

Create a Ray GPU cluster

For GPU clusters, these resources can be added to the Ray cluster in the following way:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Connect to remote Ray cluster using Ray client

In Ray version 2.3.0 and above, you can create a Ray cluster using the setup_ray_cluster API, and in the same notebook, you can call ray.init() API to connect to this Ray cluster. To get the remote connection string, use the following:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Then, you can connect the remote cluster using the above remote connection string:

import ray
ray.init(remote_conn_str)

The Ray client does not support the Ray dataset API defined in the ray.data module. As a workaround, you can wrap your code that calls the Ray dataset API inside a remote Ray task, as shown in the following code:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

The values that need to be configured are the Databricks workspace URL, starting with https://, and then the values found after the /driver-proxy/o/ are found in the Ray Dashboard proxy URL displayed after the Ray cluster is started.

The Ray Job CLI is used for submitting jobs to a Ray cluster from external systems but is not required for submitting jobs on Ray clusters on Databricks. It is recommended that the job be deployed using Databricks Workflows, a Ray cluster per application be created, and existing Databricks tooling, such as Databricks Asset Bundles or Workflow Triggers, be used to trigger the job.

Set a log output location

You can set the argument collect_log_to_path to specify the destination path where you want to collect the Ray cluster logs. Log collection runs after the Ray cluster is shut down.

Databricks recommends setting a path starting with /dbfs/ or Unity Catalog Volume path to preserve the logs even if you terminate the Apache Spark cluster. Otherwise, your logs are not recoverable since the local storage on the cluster is deleted when the cluster is shut down.

After creating a Ray cluster, you can run any Ray application code directly in your notebook. Click Open Ray Cluster Dashboard in a new tab to view the Ray dashboard for the cluster.

Enable stack traces and flame graphs on the Ray Dashboard Actors page

On the Ray Dashboard Actors page, you can view stack traces and flame graphs for active Ray actors. To view this information, use the following command to install py-spy before you start the Ray cluster:

%pip install py-spy

Create and configure best practices

This section covers best practices for creating and configuring Ray clusters.

Non-GPU workloads

The Ray cluster runs on top of a Databricks Spark cluster. A typical scenario is to use a Spark job and Spark UDF to do simple data preprocessing tasks that do not need GPU resources. Then, use Ray to run complicated machine learning tasks that benefit from GPUs. In this case, Databricks recommends setting the Apache Spark cluster level configuration parameter spark.task.resource.gpu.amount to 0 so that all Apache Spark DataFrame transformations and Apache Spark UDF executions do not use GPU resources.

The benefits of this configuration are the following:

  • It increases Apache Spark job parallelism because the GPU instance type usually has many more CPU cores than GPU devices.

  • If the Apache Spark cluster is shared with multiple users, this configuration prevents Apache Spark jobs from competing for GPU resources with concurrently running Ray workloads.

Disable transformers trainer MLflow integration if using it in Ray tasks

The transformers trainer MLflow integration is enabled by default from within the transformers library. If you use Ray train to fine-tune a transformers model, Ray tasks will fail due to a credential issue. However, this issue does not apply if you directly use MLflow for training. To avoid this issue, you can set the DISABLE_MLFLOW_INTEGRATION environment variable to ‘TRUE’ from within the Databricks cluster configuration when starting your Apache Spark cluster.

Address Ray remote function pickling error

To run Ray tasks, Ray pickles the task function. If you find pickling failed, you must diagnose which part of your code causes the failure. Common causes of pickling errors are the handling of external references, closures, and references to stateful objects. One of the easiest errors to verify and quickly correct can be remedied by moving import statements within the task function declaration.

For example, datasets.load_dataset is a widely used function that is patched in Databricks Runtime driver side, rendering the reference unpickle-able. To address it, you can simply write the task function as follows:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Disable Ray memory monitor if the Ray task is unexpectedly killed with an out-of-memory (OOM) error

In Ray 2.9.3, Ray memory monitor has several known issues that can cause Ray tasks to be inadvertently stopped without cause. To address the issue, you can disable the Ray memory monitor by setting the environment variable RAY_memory_monitor_refresh_ms to 0 within the Databricks cluster configuration when starting your Apache Spark cluster.

Read Spark data from Ray

A common use case is to read data from a Spark DataFrame into Ray for further processing. In Databricks Runtime 15.0 for ML and above, a function is available to simplify loading the data contained within a Spark DataFrame directly into Ray.

In order to effectively use this feature, ensure that the Spark cluster configuration spark.databricks.pyspark.dataFrameChunk.enabled is set to true before running ray.init() to start your Ray cluster.

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

Ray will fetch the Spark DataFrame contents directly without the need for a temporary write of the data, making it directly available for processing.

Read Ray Data from Spark

Similarly to reading Spark data from Ray, the ability to read the results from a Ray task back as a Spark DataFrame is supported using Unity Catalog. To use this feature, you must be running Databricks Runtime 15.0 for ML and above from within a Unity Catalog enabled workspace.

In order to use this feature, ensure that the environment variable "_RAY_UC_VOLUMES_FUST_TEMP_DIR" is set to a valid and accessible Unity Catalog Volume path, like "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData"

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

# Write to the specified (via environment variable) UC Volume from Ray
ray_dataset.write_databricks_table()

In Databricks Runtime versions earlier than 15.0 for ML, you can write directly to an object store location by using the Ray parquet writer, ray_dataset.write_parquet() from the ray.data module. Spark can read this parquet data with native readers.

Applying transformation functions to batches of data

When processing data in batches, it is recommended to use the Ray Data API with the map_batches function. This approach can be more efficient and scalable, especially for large datasets or complex computations that benefit from batch processing. Any Spark DataFrame can be converted to a Ray Dataset using the ray.data.from_spark API. The processed output from calling this transformation API can be written out to Databricks UC tables using the API ray.data.write_databricks_table.

Using MLflow in Ray tasks To use MLflow in Ray tasks, you will need to : - Define Databricks MLflow credentials within Ray tasks. - Create MLflow runs within the Apache Spark Driver and pass the created run_id to the Ray tasks.

The following code example demonstrates how to do this:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars( <Databricks>")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Use notebook-scoped Python libraries or cluster Python libraries in Ray tasks

Currently, Ray has a known issue where Ray tasks can’t use notebook scoped python libraries or cluster python libraries. To utilize additional dependencies within your Ray jobs, you must manually install libraries using the %pip magic command before launching a Ray-on-Spark cluster that will use these dependencies within tasks. For example, to update the version of Ray that will be used to start the Ray cluster, you can run the following command in your notebook:

%pip install ray==<The Ray version you want to use> --force-reinstall

Then, run the following command in your notebook to restart the Python kernel:

dbutils.library.restartPython()