Skip to content

Commit

Permalink
[pyspark] rework the log (#10077)
Browse files Browse the repository at this point in the history
  • Loading branch information
wbo4958 authored Feb 29, 2024
1 parent 5ac2332 commit d24df52
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
26 changes: 17 additions & 9 deletions python-package/xgboost/spark/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
deserialize_xgb_model,
get_class_name,
get_logger,
get_logger_level,
serialize_booster,
use_cuda,
)
Expand Down Expand Up @@ -181,6 +182,8 @@

_INIT_BOOSTER_SAVE_PATH = "init_booster.json"

_LOG_TAG = "XGBoost-PySpark"


class _SparkXGBParams(
HasFeaturesCol,
Expand Down Expand Up @@ -1034,6 +1037,8 @@ def _fit(self, dataset: DataFrame) -> "_SparkXGBModel":

num_workers = self.getOrDefault(self.num_workers)

log_level = get_logger_level(_LOG_TAG)

def _train_booster(
pandas_df_iter: Iterator[pd.DataFrame],
) -> Iterator[pd.DataFrame]:
Expand All @@ -1047,7 +1052,7 @@ def _train_booster(

dev_ordinal = None
use_qdm = _can_use_qdm(booster_params.get("tree_method", None))

msg = "Training on CPUs"
if run_on_gpu:
dev_ordinal = (
context.partitionId() if is_local else _get_gpu_id(context)
Expand All @@ -1058,10 +1063,9 @@ def _train_booster(
# Note: Checking `is_cudf_available` in spark worker side because
# spark worker might has different python environment with driver side.
use_qdm = use_qdm and is_cudf_available()
get_logger("XGBoost-PySpark").info(
"Leveraging %s to train with QDM: %s",
booster_params["device"],
"on" if use_qdm else "off",
msg = (
f"Leveraging {booster_params['device']} to train with "
f"QDM: {'on' if use_qdm else 'off'}"
)

if use_qdm and (booster_params.get("max_bin", None) is not None):
Expand All @@ -1070,6 +1074,7 @@ def _train_booster(
_rabit_args = {}
if context.partitionId() == 0:
_rabit_args = _get_rabit_args(context, num_workers)
get_logger(_LOG_TAG, log_level).info(msg)

worker_message = {
"rabit_msg": _rabit_args,
Expand Down Expand Up @@ -1127,7 +1132,7 @@ def _run_job() -> Tuple[str, str]:
ret = rdd_with_resource.collect()[0]
return ret[0], ret[1]

get_logger("XGBoost-PySpark").info(
get_logger(_LOG_TAG).info(
"Running xgboost-%s on %s workers with"
"\n\tbooster params: %s"
"\n\ttrain_call_kwargs_params: %s"
Expand All @@ -1139,7 +1144,7 @@ def _run_job() -> Tuple[str, str]:
dmatrix_kwargs,
)
(config, booster) = _run_job()
get_logger("XGBoost-PySpark").info("Finished xgboost training!")
get_logger(_LOG_TAG).info("Finished xgboost training!")

result_xgb_model = self._convert_to_sklearn_model(
bytearray(booster, "utf-8"), config
Expand Down Expand Up @@ -1342,7 +1347,7 @@ def _run_on_gpu(self) -> bool:
# User don't set gpu configurations, just use cpu
if gpu_per_task is None:
if use_gpu_by_params:
get_logger("XGBoost-PySpark").warning(
get_logger(_LOG_TAG).warning(
"Do the prediction on the CPUs since "
"no gpu configurations are set"
)
Expand Down Expand Up @@ -1377,6 +1382,8 @@ def _transform(self, dataset: DataFrame) -> DataFrame:
is_local = _is_local(_get_spark_session().sparkContext)
run_on_gpu = self._run_on_gpu()

log_level = get_logger_level(_LOG_TAG)

@pandas_udf(schema) # type: ignore
def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
assert xgb_sklearn_model is not None
Expand Down Expand Up @@ -1413,7 +1420,8 @@ def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
else:
msg = "CUDF or Cupy is unavailable, fallback the inference on the CPUs"

get_logger("XGBoost-PySpark").info(msg)
if context.partitionId() == 0:
get_logger(_LOG_TAG, log_level).info(msg)

def to_gpu_if_possible(data: ArrayLike) -> ArrayLike:
"""Move the data to gpu if possible"""
Expand Down
17 changes: 14 additions & 3 deletions python-package/xgboost/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
import uuid
from threading import Thread
from typing import Any, Callable, Dict, Optional, Set, Type
from typing import Any, Callable, Dict, Optional, Set, Type, Union

import pyspark
from pyspark import BarrierTaskContext, SparkConf, SparkContext, SparkFiles, TaskContext
Expand Down Expand Up @@ -98,10 +98,15 @@ def _get_spark_session() -> SparkSession:
return SparkSession.builder.getOrCreate()


def get_logger(name: str, level: str = "INFO") -> logging.Logger:
def get_logger(name: str, level: Optional[Union[str, int]] = None) -> logging.Logger:
"""Gets a logger by name, or creates and configures it for the first time."""
logger = logging.getLogger(name)
logger.setLevel(level)
if level is not None:
logger.setLevel(level)
else:
# Default to info if not set.
if logger.level == logging.NOTSET:
logger.setLevel(logging.INFO)
# If the logger is configured, skip the configure
if not logger.handlers and not logging.getLogger().handlers:
handler = logging.StreamHandler(sys.stderr)
Expand All @@ -113,6 +118,12 @@ def get_logger(name: str, level: str = "INFO") -> logging.Logger:
return logger


def get_logger_level(name: str) -> Optional[int]:
"""Get the logger level for the given log name"""
logger = logging.getLogger(name)
return None if logger.level == logging.NOTSET else logger.level


def _get_max_num_concurrent_tasks(spark_context: SparkContext) -> int:
"""Gets the current max number of concurrent tasks."""
# pylint: disable=protected-access
Expand Down

0 comments on commit d24df52

Please sign in to comment.