escala Ray clusters on Databricks

Saiba como ajustar o tamanho do seu Ray cluster para obter o desempenho ideal, incluindo autoescala, configuração do nó principal, clusters heterogêneo e alocação de recursos.

Criar um Ray cluster no modo de autoescala

Nas versões 2.8.0 e superiores do Ray, o Ray clusters começará em Databricks e oferecerá suporte à integração com Databricks autoscale. Essa integração de autoescala aciona o Databricks cluster autoscale internamente no ambiente Databricks.

Para ativar o autoscale, execute o seguinte comando:

Para a versão do Ray abaixo de 2.10:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
)

Para o Ray versão 2.10 e posteriores:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

A API ray.util.spark.setup_ray_cluster cria um cluster Ray no Apache Spark. Internamente, ele cria um plano de fundo Apache Spark Job. Cada tarefa do Apache Spark no Job cria um nó Ray worker e o nó Ray head é criado no driver. Os argumentos min_worker_nodes e max_worker_nodes representam o intervalo de nós do Ray worker a serem criados e utilizados para as cargas de trabalho do Ray. Se o argumento min_worker_nodes for deixado indefinido, um Ray cluster de tamanho fixo será iniciado com max_worker_nodes número de trabalhadores disponíveis. Para especificar o número de núcleos de CPU ou GPU atribuídos a cada nó do Ray worker, defina o argumento num_cpus_worker_node (default value: 1) ou num_gpus_worker_node (default valor: 0).

Para o Ray versão abaixo de 2.10, se a autoescala estiver ativada, num_worker_nodes indica o número máximo de nós do Ray worker. O default número mínimo de nós do Ray worker é zero. Essa configuração default significa que, quando o Ray cluster é parado, ele escala até zero nós do Ray worker. Isso pode não ser ideal para a capacidade de resposta rápida em todos os cenários, mas pode reduzir significativamente os custos quando ativado.

No modo de autoescala, worker não pode ser definido como ray.util.spark.MAX_NUM_WORKER_NODES.

Os argumentos a seguir configuram a velocidade de upscaling e downscaling:

  • autoscale_upscaling_speed representa o número de nós que podem estar pendentes como um múltiplo do número atual de nós. Quanto maior o valor, mais agressivo será o aumento de escala. Por exemplo, se isso for definido como 1.0, o cluster poderá aumentar de tamanho em no máximo 100% a qualquer momento.

  • autoscale_idle_timeout_minutes representa o número de minutos que precisam passar antes que o autoscaler remova um nó parado worker. Quanto menor o valor, mais agressivo é o downscaling.

Com o Ray 2.9.0 e o acima, o senhor também pode definir autoscale_min_worker_nodes para evitar que o Ray cluster reduza a escala para zero trabalhador quando o Ray cluster estiver parado, o que faria com que o cluster fosse encerrado.

Configurar o recurso usado pelo nó principal do Ray

Em default, para a configuração Ray on Spark, Databricks restringe o recurso alocado para o nó principal Ray a:

  • 0 núcleos de CPU

  • 0 GPUs

  • 128 MB de memória heap

  • 128 MB de memória de armazenamento de objetos

Isso ocorre porque o nó principal do Ray é normalmente usado apenas para coordenação global, não para executar a tarefa Ray. O recurso do nó do driver Apache Spark é compartilhado com vários usuários, portanto, a configuração default salva o recurso no lado do driver Apache Spark. Com o Ray 2.8.0 e o acima, o senhor pode configurar o recurso usado pelo nó principal do Ray. Use os seguintes argumentos na API setup_ray_cluster:

  • num_cpus_head_nodeNúcleos de CPU usados pelo nó principal do Ray

  • num_gpus_head_nodeConfiguração da GPU usada pelo nó de cabeça de raio

  • object_store_memory_head_nodeDefinição do tamanho da memória do armazenamento de objetos pelo Ray head node

Suporte para clusters heterogêneos

O senhor pode criar um Ray no site Spark cluster para obter uma execução de treinamento mais eficiente e econômica e definir configurações diferentes entre o nó principal do Ray e os nós do Ray worker. No entanto, todos os nós do Ray worker devem ter a mesma configuração. Databricks clusters não oferecem suporte total a clusters heterogêneo, mas o senhor pode criar um Databricks cluster com diferentes tipos de driver e de instância worker definindo uma política de cluster. Por exemplo:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Ajustar a configuração do Ray cluster

A configuração recomendada para cada nó do Ray worker é a seguinte: Mínimo de 4 núcleos de CPU por nó do Ray worker. Memória heap mínima de 10 GB para cada nó do Ray worker.

Portanto, ao chamar ray.util.spark.setup_ray_cluster, a Databricks recomenda definir num_cpus_per_node como um valor maior ou igual a 4.

Consulte a próxima seção para obter detalhes sobre o ajuste da memória heap para cada nó do Ray worker.

Alocação de memória para nós de trabalho Ray

Cada nó do Ray worker usa dois tipos de memória: memória heap e memória de armazenamento de objetos.

O tamanho da memória alocada para cada tipo é determinado conforme descrito abaixo.

A memória total alocada para cada nó do Ray worker é: RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES é o número máximo de nós Ray worker que podem ser iniciados no nó Apache Spark worker . Isso é determinado pelo argumento num_cpus_per_node ou num_gpus_per_node.

Se o senhor não definir o argumento object_store_memory_per_node, o tamanho da memória heap e o tamanho da memória do armazenamento de objetos alocados para cada nó do Ray worker serão: RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7 OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

Se o senhor definir o argumento object_store_memory_per_node: RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

Além disso, o tamanho da memória do armazenamento de objetos por nó do Ray worker é limitado pela memória compartilhada do sistema operacional. O valor máximo é: OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY é o tamanho do disco /dev/shm configurado para o nó Apache Spark worker .

Ampliação das melhores práticas

Definir o número de CPU e GPU para cada nó de trabalho do Ray

É recomendável definir o argumento num_cpus_worker_node como o número de núcleos de CPU por nó Apache Spark worker . Da mesma forma, definir num_gpus_worker_node como o número de GPUs por nó Apache Spark worker é o ideal. Com essa configuração, cada nó Apache Spark worker lança um nó Ray worker que utilizará totalmente o recurso de cada nó Apache Spark worker .

Defina a variável de ambiente RAY_memory_monitor_refresh_ms como 0 na configuração do cluster da Databricks ao iniciar o cluster do Apache Spark.

Configuração de recurso de memória para cargas de trabalho híbridas Apache Spark e Ray

Se o senhor executar cargas de trabalho híbridas Spark e Ray em um Databricks cluster, Databricks recomenda reduzir a memória Spark executor para um valor pequeno. Por exemplo, definir spark.executor.memory 4g na configuração do cluster do Databricks.

O Apache Spark executor é um processo Java que aciona o GC de forma preguiçosa, e o cache Apache Spark dataset usa muita memória Apache Spark executor . Isso reduz a memória disponível que o Ray pode usar. Para evitar possíveis erros de falta de memória, reduza a configuração spark.executor.memory.

Configuração de recurso de computação para cargas de trabalho híbridas Apache Spark e Ray

Se o senhor executar cargas de trabalho híbridas Spark e Ray em um Databricks cluster, recomendamos que torne os nós cluster ou os nós Ray worker autoescaláveis. Por exemplo:

Se o senhor tiver um número fixo de worker nós disponíveis para começar a Databricks cluster, recomendamos que ative o Ray-on-Spark autoscale. Quando nenhuma carga de trabalho do Ray estiver em execução, o Ray cluster será desativado, permitindo que o recurso seja liberado para uso pela tarefa Apache Spark. Quando a Apache Spark tarefa for concluída e o Ray voltar a ser usado, o Ray-on-Spark cluster voltará a escalar para atender à demanda.

Além disso, o senhor pode tornar os clusters do Databricks e do Ray-on-spark autoescaláveis. Por exemplo, se o senhor configurar os nós autoescaláveis do Databricks clusterpara um máximo de 10 nós, configurar os nós Ray-on-Spark worker para um máximo de quatro nós e configurar cada nó Ray worker para utilizar totalmente o recurso de cada Apache Spark worker, as cargas de trabalho Ray poderão usar no máximo quatro nós recurso nessa configuração cluster. Em comparação, o Apache Spark Job pode alocar no máximo seis nós de recurso.