Treinamento distribuído de modelos XGBoost usando xgboost.spark
Visualização
Este recurso está em visualização pública.
O pacote Python xgboost>=1.7 contém um novo módulo xgboost.spark
. Este módulo inclui os estimadores xgboost PySpark xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
e xgboost.spark.SparkXGBRanker
. Essas novas classes suportam a inclusão de estimadores XGBoost no pipeline SparkML. Para obter detalhes da API, consulte o documento da API Spark XGBoost Python .
xgboost.spark
parâmetros
Os estimadores definidos no módulo xgboost.spark
suportam a maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.
Os parâmetros para o construtor de classe, método
fit
e métodopredict
são amplamente idênticos aos do móduloxgboost.sklearn
.A nomenclatura, os valores e default são praticamente idênticos aos descritos nos parâmetros do XGBoost.
As exceções são alguns parâmetros sem suporte (como
gpu_id
,nthread
,sample_weight
,eval_set
) e ospyspark
parâmetros específicos do estimador que foram adicionados (comofeaturesCol
,labelCol
,use_gpu
,validationIndicatorCol
). Para obter detalhes, consulte a documentação Spark API .
treinamento distribuído
Os estimadores do PySpark definidos no módulo xgboost.spark
suportam o treinamento distribuído do XGBoost usando o parâmetro num_workers
. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers
como o número de concorrentes executando Spark tarefa durante o treinamento distribuído. Para usar todos os Spark slots de tarefa, defina num_workers=sc.defaultParallelism
.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Observação
Você não pode usar
mlflow.xgboost.autolog
com XGBoost distribuído. Para logs um modelo xgboost Spark usando MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path)
.Você não pode usar o XGBoost distribuído em clusters com autoscale habilitado. Novos nós worker que começam neste paradigma de escala elástica não podem receber novos conjuntos de tarefas e permanecem parados. Para obter instruções sobre como desabilitar autoscale, consulte Habilitar autoscale.
Ativar otimização para treinamento em conjunto de dados de recursos esparsos
Estimadores PySpark definidos no módulo xgboost.spark
oferecem suporte à otimização para treinamento em dataset com recursos esparsos. Para ativar a otimização de conjuntos de recursos esparsos, você precisa fornecer um dataset para o método fit
que contém uma coluna de recursos que consiste em valores do tipo pyspark.ml.linalg.SparseVector
e definir o parâmetro do estimador enable_sparse_data_optim
como True
. Além disso, você precisa definir o parâmetro missing
como 0.0
.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU treinamento
Os estimadores PySpark definidos no módulo xgboost.spark
dão suporte ao treinamento em GPUs. Defina o parâmetro use_gpu
como True
para ativar o treinamento de GPU.
Observação
Para cada tarefa Spark usada no treinamento distribuído XGBoost, apenas uma GPU é usada no treinamento quando o argumento use_gpu
é definido como True
. Databricks recomenda usar o valor default de 1
para a configuração clusters Spark spark.task.resource.gpu.amount
. Caso contrário, as GPUs adicionais alocadas para esta tarefa do Spark serão interrompidas.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Solução de problemas
Durante o treinamento com vários nós, se o senhor encontrar uma mensagem NCCL failure: remote process exited or there was a network error
, isso normalmente indica um problema com a comunicação de rede entre as GPUs. Esse problema surge quando a NCCL (NVIDIA Collective Communications biblioteca) não pode usar determinadas interfaces de rede para comunicação com a GPU.
Para resolver, defina o sparkConf do cluster para spark.executorEnv.NCCL_SOCKET_IFNAME
como eth
. Isso basicamente define a variável de ambiente NCCL_SOCKET_IFNAME
para eth
para todos os trabalhadores em um nó.
Guia de migração para o módulo obsoleto sparkdl.xgboost
Substitua
from sparkdl.xgboost import XgboostRegressor
porfrom xgboost.spark import SparkXGBRegressor
e substituafrom sparkdl.xgboost import XgboostClassifier
porfrom xgboost.spark import SparkXGBClassifier
.Altere todos os nomes de parâmetro no construtor estimador do estilo camelCase para o estilo snake_case. Por exemplo, altere
XgboostRegressor(featuresCol=XXX)
paraSparkXGBRegressor(features_col=XXX)
.Os parâmetros
use_external_storage
eexternal_storage_precision
foram removidos. Os estimadoresxgboost.spark
usam a API de iteração de dados DMatrix para usar a memória com mais eficiência. Não há mais necessidade de usar o ineficiente modo de armazenamento externo. Para conjuntos de dados extremamente grandes, o site Databricks recomenda que o senhor aumente o parâmetronum_workers
, o que faz com que cada treinamento tarefa particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuraçãonum_workers = sc.defaultParallelism
, que definenum_workers
como o número total de Spark slots de tarefa em cluster.Para os estimadores definidos em
xgboost.spark
, a configuraçãonum_workers=1
executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificado pela configuração Spark clusterspark.task.cpus
, que é 1 por default. Para usar mais núcleos de CPU para ensinar o modelo, aumentenum_workers
ouspark.task.cpus
. O senhor não pode definir o parâmetronthread
oun_jobs
para os estimadores definidos emxgboost.spark
. Esse comportamento é diferente do comportamento anterior dos estimadores definidos no pacotesparkdl.xgboost
obsoleto.
Converter modelo sparkdl.xgboost
em modelo xgboost.spark
sparkdl.xgboost
os modelos são salvos em um formato diferente dos modelos xgboost.spark
e têm configurações de parâmetros diferentes. Use a seguinte função de utilidades para converter o modelo:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Se você tiver um modelo pyspark.ml.PipelineModel
contendo um modelo sparkdl.xgboost
como o último estágio, poderá substituir o estágio do modelo sparkdl.xgboost
pelo modelo xgboost.spark
convertido.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)