Conectar o Ray e o Spark no Databricks

Com o Databricks, o senhor pode executar operações do Ray e do Spark no mesmo ambiente de execução para aproveitar os pontos fortes dos dois mecanismos de computação distribuída.

A integração do Ray e do Spark é suportada pelos sites Delta Lake e Unity Catalog, que oferecem gerenciamento robusto de dados, acesso seguro e acompanhamento de linhagem.

Este artigo mostra aos senhores como conectar o Ray e Spark operações de acordo com os seguintes casos de uso:

  • Gravar dados do Spark em dados do Ray: Transferência eficiente de dados na memória para o Ray.

  • Gravar dados do Ray no Spark: Envie os dados do Ray de volta ao Delta Lake ou a outras soluções de armazenamento para garantir a compatibilidade e o acesso.

  • Conectar aplicativos Ray externos ao Unity Catalog: Conecte aplicativos Ray fora do Databricks para carregar dados de uma tabela do Databricks Unity Catalog.

Para obter mais informações sobre quando usar Ray, Spark, ou ambos, consulte When to use Spark vs. Ray.

Criar um conjunto de dados Ray distribuído a partir de um Spark DataFrame

Para criar um Ray dataset distribuído a partir de um Spark DataFrame, o senhor pode usar a função ray.data.from_spark() para ler diretamente um Spark DataFrame do Ray sem precisar gravar os dados em nenhum local.

As transferências na memória de Spark para Ray estão disponíveis em Databricks Runtime ML 15.0 e acima.

Para ativar esse recurso, o senhor deve fazer o seguinte:

  • Defina a configuração de cluster do Spark spark.databricks.pyspark.dataFrameChunk.enabled para true antes de iniciar o cluster.

import ray.data

source_table = "my_db.my_table"

# Read a Spark DataFrame from a Delta table in Unity Catalog
df = spark.read.table(source_table)
ray_ds = ray.data.from_spark(df)

Aviso

autoscale Spark clusters (inclusive os que usam instâncias pontuais) devem definir o parâmetro use_spark_chunk_api como False para usar a função from_spark(). Caso contrário, a chamada à API resultará em falhas de cache porque o cache em um executor do Spark é perdido quando o executor é encerrado.

ray_ds = ray.data.from_spark(df, use_spark_chunk_api=False)

Gravar dados de raio no Spark

Para gravar dados Ray em Spark, o senhor deve gravar o dataset em um local que o Spark possa acessar.

No Databricks Runtime ML abaixo da versão 15.0, o senhor pode gravar diretamente em um local de armazenamento de objetos usando o Ray Parquet writer, ray_dataset.write_parquet() do módulo ray.data. O Spark pode ler esses dados do Parquet com leitores nativos.

No espaço de trabalho habilitado para Unity Catalog, use a função ray.data.Dataset.write_databricks_table para gravar em uma tabela Unity Catalog.

Essa função armazena temporariamente o Ray dataset em Unity Catalog Volumes, lê de Unity Catalog volumes com Spark e, finalmente, grava em uma tabela Unity Catalog. Antes de chamar a função ray.data.Dataset.write_databricks_table, certifique-se de que a variável de ambiente "_RAY_UC_VOLUMES_FUSE_TEMP_DIR" esteja definida como um caminho de volume válido e acessível do Unity Catalog, como "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData".

ds = ray.data
ds.write_databricks_table()

Para espaços de trabalho que não têm o Unity Catalog habilitado, o senhor pode armazenar manualmente um Ray Data dataset como um arquivo temporário, como um arquivo Parquet em DBFS, e depois ler o arquivo de dados com Spark.

ds.write_parquet(tmp_path)
df = spark.read.parquet(tmp_path)
df.write.format("delta").saveAsTable(table_name)

Gravar dados dos aplicativos centrais do Ray no Spark

Databricks também pode integrar os aplicativos Ray Core com o Spark, permitindo que o senhor execute cargas de trabalho do Ray Core (o APIs de nível inferior do Ray) e do Spark no mesmo ambiente e possibilitando a troca de dados entre eles. Essa integração oferece vários padrões para atender a diferentes cargas de trabalho e necessidades de gerenciamento de dados, garantindo uma experiência simplificada no uso de ambas as estruturas.

Há três padrões principais para gravar dados do Ray para o Spark.

  • Persistir a saída em um local temporário: Armazene temporariamente as saídas da tarefa Ray em volumes DBFS ou Unity Catalog antes de consolidá-las em um Spark DataFrame.

  • Conecte-se com Spark Connect: Conecta diretamente a tarefa Ray a um Spark cluster, permitindo que Ray interaja com Spark DataFrames e tabelas.

  • Usar biblioteca de terceiros: use uma biblioteca externa, como deltalake ou deltaray, para gravar dados da tarefa do Ray Core nas tabelas Delta Lake ou Spark.

Padrão 1: Persista a saída em um local temporário

O padrão mais comum para gravar dados do Ray no Spark é armazenar os dados de saída em um local temporário, como volumes do Unity Catalog ou DBFS. Depois de armazenar os dados, o thread do driver Ray lê cada parte dos arquivos nos nós do worker e os consolida em um DataFrame final para processamento posterior. Normalmente, os arquivos temporários estão em um formato padrão, como CSV. Essa abordagem funciona melhor quando os dados de saída estão em formato tabular, como um Pandas DataFrame gerado por uma tarefa do Ray Core.

Use esse método quando a saída do Ray tarefa for muito grande para caber na memória do nó do driver ou do armazenamento de objetos compartilhados. Se o senhor precisar lidar com grandes conjuntos de dados sem persistir os dados no armazenamento, considere aumentar a memória alocada para o nó do driver no site Databricks cluster para melhorar o desempenho.

import os
import uuid
import numpy as np
import pandas as pd

@ray.remote
def write_example(task_id, path_prefix):

  num_rows = 100

  df = pd.DataFrame({
      'foo': np.random.rand(num_rows),
      'bar': np.random.rand(num_rows)
  })

  # Write the DataFrame to a CSV file
  df.to_csv(os.path.join(path_prefix, f"result_part_{task_id}.csv"))

n_tasks = 10

# Put a unique DBFS prefix for the temporary file path
dbfs_prefix = f"/dbfs/<USERNAME>"

# Create a unique path for the temporary files
path_prefix = os.path.join(dbfs_prefix, f"/ray_tmp/write_task_{uuid.uuid4()}")

tasks = ray.get([write_example.remote(i, path_prefix) for i in range(n_tasks)])

# Read all CSV files in the directory into a single DataFrame
df = spark.read.csv(path_prefix.replace("/dbfs", "dbfs:"), header=True, inferSchema=True)

Padrão 2: conectar usando o Spark Connect

Outra maneira de o Ray Core tarefa interagir com o Spark dentro da tarefa remota é usar o Spark Connect. Isso permite que o senhor configure o contexto do Spark no Ray worker para apontar para o cluster do Spark em execução no nó do driver.

Para configurar isso, o senhor deve configurar o recurso Ray cluster para alocar espaço para Spark. Por exemplo, se um nó worker tiver 8 CPUs, defina num_cpus_worker_node como 7, deixando 1 CPU para Spark. Para Spark tarefas maiores, recomenda-se alocar uma parcela maior de recurso.

from databricks.connect import DatabricksSession
import ray

@ray.remote
class SparkHandler(object):

   def __init__(self, access_token=None, cluster_id=None, host_url=None):
       self.spark = (DatabricksSession
                     .builder
                     .remote(host=host_url,
                             token=access_token,
                             cluster_id=cluster_id)
                     .getOrCreate()
                     )
   def test(self):
       df = self.spark.sql("select * from samples.nyctaxi.trips")

       df.write.format("delta").mode(
"overwrite").saveAsTable("catalog.schema.taxi_trips")
       return df.count()

access_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
cluster_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().clusterId().get()
host_url = f"https://{dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get('browserHostName').get()}"

sh = SparkHandler.remote(access_token=access_token,
                        cluster_id=cluster_id,
                        host_url=host_url)
print(ray.get(sh.test.remote()))

Este exemplo usa os tokens gerados pelo Notebook. No entanto, o site Databricks recomenda que os casos de uso de produção utilizem um access token armazenado em Databricks secrets.

Como esse processo chama um único driver Spark, ele cria um bloqueio de threading que faz com que todas as tarefas aguardem a conclusão da tarefa Spark anterior. Portanto, é recomendável usá-lo quando não houver muitas tarefas concorrentes, pois todas elas terão um comportamento sequencial à medida que a Spark tarefa for concluída. Nessas situações, é melhor manter a saída e, em seguida, combiná-la em um único dataframe do Spark no final e, depois, gravar em uma tabela de saída.

Padrão 3: biblioteca de terceiros

Outra opção é usar uma biblioteca de terceiros que interaja com Delta Lake e Spark. Databricks não oferece suporte oficial a essas bibliotecas de terceiros. Um exemplo disso é a biblioteca deltalake do projeto delta-rs. Atualmente, essa abordagem só funciona com as tabelas Hive metastore e não com as tabelas Unity Catalog.

from deltalake import DeltaTable, write_deltalake
import pandas as pd
import numpy as np
import ray

@ray.remote
def write_test(table_name):
   random_df_id_vals = [int(np.random.randint(1000)), int(np.random.randint(1000))]
   pdf = pd.DataFrame({"id": random_df_id_vals, "value": ["foo", "bar"]})
   write_deltalake(table_name, pdf, mode="append")


def main():
   table_name = "database.mytable"
   ray.get([write_test.remote(table_name) for _ in range(100)])

Outra biblioteca de terceiros disponível é a biblioteca deltaray, disponível por meio do projeto Delta Incubator https://github.com/delta-incubator/deltaray)

# Standard Libraries
import pathlib

# External Libraries
import deltaray
import deltalake as dl
import pandas as pd

# Creating a Delta Table
cwd = pathlib.Path().resolve()
table_uri = f'{cwd}/tmp/delta-table'
df = pd.DataFrame({'id': [0, 1, 2, 3, 4, ], })
dl.write_deltalake(table_uri, df)

# Reading our Delta Table
ds = deltaray.read_delta(table_uri)
ds.show()

Conectar aplicativos Ray externos ao Databricks

Criar um conjunto de dados do Ray a partir da consulta do Databricks warehouse

No Ray 2.8.0 e acima, para conectar os aplicativos Ray fora do Databricks a tabelas dentro do Databricks, o senhor pode chamar o ray.data.read_databricks_tables API para carregar dados de uma tabela Unity Catalog.

Primeiro, defina a variável de ambiente DATABRICKS_TOKEN como SQL warehouse access token. Se o senhor não estiver executando o programa em Databricks Runtime, defina também a variável de ambiente DATABRICKS_HOST para o URL Databricks workspace , conforme mostrado a seguir:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Em seguida, chame ray.data.read_databricks_tables() para ler no site SQL warehouse.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity Catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Aviso

Os armazéns da Databricks só podem armazenar em cache os resultados das consultas por aproximadamente 2 horas. Para cargas de trabalho de longa duração, chame o método ray.data.Dataset.materialize para materializar o Ray dataset no armazenamento de objetos distribuídos Ray.

Criar um conjunto de dados Ray a partir da tabela de compartilhamento delta do Databricks

O senhor também pode ler dados das tabelas de compartilhamento delta do Databricks. A leitura de tabelas de compartilhamento delta é mais confiável do que a leitura de um cache de warehouse do Databricks.

O ray.data.read_delta_sharing_tables API está disponível no Ray 2.33 e no acima.

import ray

ds = ray.data.read_delta_sharing_tables(
    url=f"<profile-file-path>#<share-name>.<schema-name>.<table-name>",
    limit=100000,
    version=1,
)

Melhores práticas

  • Sempre use as técnicas descritas no guia de práticas recomendadas do Ray cluster para garantir que o cluster seja totalmente utilizado.

  • Considere o uso de volumes do Unity Catalog para armazenar dados de saída em um formato não tabular e fornecer governança.

  • Certifique-se de que a configuração num_cpus_worker_node esteja definida de modo que o número de núcleos de CPU corresponda ao do nó Spark worker . Da mesma forma, defina num_gpus_worker_node como o número de GPUs por nó Spark worker . Nessa configuração, cada nó Spark worker lança um nó Ray worker que utiliza totalmente o recurso do nó Spark worker .

Limitações

Atualmente, o Unity Catalog não compartilha credenciais para gravação em tabelas de gravadores que não sejam do Spark. Portanto, todos os dados que estão sendo gravados em uma tabela do Unity Catalog a partir de uma tarefa do Ray Core exigirão que os dados sejam mantidos e depois lidos com o Spark, ou que o Databricks Connect seja configurado na tarefa do Ray.