Connect Ray and Spark on Databricks

With Databricks, you can run Ray and Spark operations in the same execution environment to leverage the strengths of both distributed computing engines.

Ray and Spark integration is supported by Delta Lake and Unity Catalog, which provide robust data management, secure access, and lineage tracking.

This article shows you how to connect Ray and Spark operations according to the following use cases:

  • Write Spark data to Ray data: Efficiently transfer data in-memory to Ray.

  • Write Ray data to Spark: Output data from Ray back to Delta Lake or other storage solutions to ensure compatibility and access.

  • Connect external Ray applications to Unity Catalog: Connect Ray applications outside of Databricks to load data from a Databricks Unity Catalog table.

For more information on when to use Ray, Spark, or both, see When to use Spark vs. Ray.

Write Spark data to Ray

To write Spark data to Ray, you can use the ray.data.from_spark() function to directly read a Spark DataFrame from Ray without needing to write the data to any location.

In-memory Spark to Ray transfers are available on Databricks Runtime ML 15.0 and above.

To enable this feature, you must do the following:

  • Set the Spark cluster config spark.databricks.pyspark.dataFrameChunk.enabled to true before starting your cluster.

  • Ensure that the environment variable "_RAY_UC_VOLUMES_FUSE_TEMP_DIR" is set to a valid and accessible Unity Catalog volume path, like "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData"

import ray.data

source_table = "my_db.my_table"

# Read a Spark DataFrame from a Delta table in Unity Catalog
df = spark.read.table(source_table)
ray_ds = ray.data.from_spark(df)

Warning

Autoscaling clusters must set the use_spark_chunk_api parameter to False to use the from_spark() function. Otherwise, the API call will cause cache misses when reading from Spark executors that have started but lack task references for the dataset.

ray_ds = ray.data.from_spark(df, use_spark_chunk_api=False)

In Databricks Runtime ML below 15.0, you can write directly to an object store location using the Ray Parquet writer, ray_dataset.write_parquet() from the ray.data module. Spark can read this Parquet data with native readers.

Write Ray Data to Spark

To write Ray data to Spark, you must write the dataset to a location that Spark can access.

For Unity Catalog enabled workspaces, use the ray.data.Dataset.write_databricks_table function to write to a Unity Catalog table.

This function temporarily stores the Ray dataset in Unity Catalog Volumes, reads from Unity Catalog volumes with Spark, and finally writes to a Unity Catalog table.

ds = ray.data
ds.write_databricks_table()

For workspaces that do not have Unity Catalog enabled, you can manually store a Ray Data dataset as a temporary file, such as a Parquet file in DBFS, and then read the data file with Spark.

ds.write_parquet(tmp_path)
df = spark.read.parquet(tmp_path)
df.write.format("delta").saveAsTable(table_name)

Write data from Ray core applications to Spark

Databricks can also integrate Ray Core applications with Spark, allowing you to run Ray Core (the lower-level APIs of Ray) and Spark workloads within the same environment and enabling data exchange between them. This integration offers several patterns to suit different workloads and data management needs, ensuring a simplified experience using both frameworks.

There are three main patterns for writing data from Ray to Spark.

  • Persist output in a temporary location: Temporarily store Ray task outputs in DBFS or Unity Catalog volumes before consolidating them into a Spark DataFrame.

  • Connect with Spark Connect: Directly connect Ray tasks to a Spark cluster, enabling Ray to interact with Spark DataFrames and tables.

  • Use third-party libraries: Use external libraries, such as deltalake or deltaray, to write data from Ray Core tasks to Delta Lake or Spark tables.

Pattern 1: Persist output in a temporary location

The most common pattern for writing data from Ray to Spark is to store the output data in a temporary location, such as Unity Catalog volumes or DBFS. After storing the data, the Ray driver thread reads each part of the files on the worker nodes and consolidates them into a final DataFrame for further processing. Typically, the temporary files are in a standard format like CSV. This approach works best when the output data is in tabular form, such as a Pandas DataFrame generated by a Ray Core task.

Use this method when the output from Ray tasks is too large to fit in the memory of the driver node or the shared object-store. If you need to handle large datasets without persisting data to storage, consider increasing the memory allocated to the driver node in your Databricks cluster to improve performance.

import os
import uuid
import numpy as np
import pandas as pd

@ray.remote
def write_example(task_id, path_prefix):

  num_rows = 100

  df = pd.DataFrame({
      'foo': np.random.rand(num_rows),
      'bar': np.random.rand(num_rows)
  })

  # Write the DataFrame to a CSV file
  df.to_csv(os.path.join(path_prefix, f"result_part_{task_id}.csv"))

n_tasks = 10

# Put a unique DBFS prefix for the temporary file path
dbfs_prefix = f"/dbfs/<USERNAME>"

# Create a unique path for the temporary files
path_prefix = os.path.join(dbfs_prefix, f"/ray_tmp/write_task_{uuid.uuid4()}")

tasks = ray.get([write_example.remote(i, path_prefix) for i in range(n_tasks)])

# Read all CSV files in the directory into a single DataFrame
df = spark.read.csv(path_prefix.replace("/dbfs", "dbfs:"), header=True, inferSchema=True)

Pattern 2: Connect using Spark Connect

Another way for Ray Core tasks to interact with Spark within the remote task is to use Spark Connect. This allows you to set up the Spark context on the Ray worker to point to the Spark cluster running from the driver node.

To set this up, you must configure Ray cluster resources to allocate space for Spark. For example, if a worker node has 8 CPUs, set num_cpus_worker_node to 7, leaving 1 CPU for Spark. For larger Spark tasks, it’s recommended to allocate a larger share of resources.

from databricks.connect import DatabricksSession
import ray

@ray.remote
class SparkHandler(object):

   def __init__(self, access_token=None, cluster_id=None, host_url=None):
       self.spark = (DatabricksSession
                     .builder
                     .remote(host=host_url,
                             token=access_token,
                             cluster_id=cluster_id)
                     .getOrCreate()
                     )
   def test(self):
       df = self.spark.sql("select * from samples.nyctaxi.trips")

       df.write.format("delta").mode(
"overwrite").saveAsTable("catalog.schema.taxi_trips")
       return df.count()

access_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
cluster_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().clusterId().get()
host_url = f"https://{dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get('browserHostName').get()}"

sh = SparkHandler.remote(access_token=access_token,
                        cluster_id=cluster_id,
                        host_url=host_url)
print(ray.get(sh.test.remote()))

This example uses the notebook-generated token. However, Databricks recommends that production use cases use an access token stored in Databricks secrets.

Since this process calls a single Spark driver, it creates a threading lock which causes all tasks to wait for the preceding Spark tasks to complete. Therefore, it is recommended to use this when there are not many concurrent tasks since they all will have sequential behavior as the Spark tasks complete. For these situations, it is better to persist the output and then combine into a single Spark dataframe at the end and then write out to an output table.

Pattern 3: Third-party libraries

Another option is using third-party libraries that interact with Delta Lake and Spark. Databricks does not officially support these third-party libraries. An example of this is the deltalake library from the delta-rs project. This approach currently only works with Hive metastore tables, not Unity Catalog tables.

from deltalake import DeltaTable, write_deltalake
import pandas as pd
import numpy as np
import ray

@ray.remote
def write_test(table_name):
   random_df_id_vals = [int(np.random.randint(1000)), int(np.random.randint(1000))]
   pdf = pd.DataFrame({"id": random_df_id_vals, "value": ["foo", "bar"]})
   write_deltalake(table_name, pdf, mode="append")


def main():
   table_name = "database.mytable"
   ray.get([write_test.remote(table_name) for _ in range(100)])

Another third party library available is the deltaray library available through the Delta Incubator project https://github.com/delta-incubator/deltaray)

# Standard Libraries
import pathlib

# External Libraries
import deltaray
import deltalake as dl
import pandas as pd

# Creating a Delta Table
cwd = pathlib.Path().resolve()
table_uri = f'{cwd}/tmp/delta-table'
df = pd.DataFrame({'id': [0, 1, 2, 3, 4, ], })
dl.write_deltalake(table_uri, df)

# Reading our Delta Table
ds = deltaray.read_delta(table_uri)
ds.show()

Connect external Ray applications to Databricks

For Ray 2.8.0 and above, to connect Ray applications outside of Databricks to tables inside Databricks, you can call the ray.data.read_databricks_tables API to load data from a Unity Catalog table.

First, set the DATABRICKS_TOKEN environment variable to your SQL warehouse access token. If you’re not running your program on Databricks Runtime, also set the DATABRICKS_HOST environment variable to the Databricks workspace URL, as shown in the following:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Then, call ray.data.read_databricks_tables() to read from the SQL warehouse.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity Catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Best Practices

  • Always use the techniques described in the Ray cluster best practice guide to ensure the cluster is fully utilized.

  • Consider using Unity Catalog volumes to store output data in a non-tabular format and provide governance.

  • Ensure that the num_cpus_worker_node configuration is set so that the number of CPU cores matches that of the Spark worker node. Similarly, set num_gpus_worker_node to the number of GPUs per Spark worker node. In this configuration, each Spark worker node launches one Ray worker node that fully utilizes the resources of the Spark worker node.

Limitations

Unity Catalog currently does not share credentials for writing to tables from non-Spark writers. Therefore, all data being written to a Unity Catalog table from a Ray Core task will require that the data be persisted and then read with Spark, or Databricks Connect must be set up within the Ray task.