Skip to content

Commit

Permalink
Extract dask and spark test into distributed test. (#8395)
Browse files Browse the repository at this point in the history
- Move test files.
- Run spark and dask separately to prevent conflicts.
- Gather common code into the testing module.
  • Loading branch information
trivialfis authored Oct 28, 2022
1 parent f73520b commit cfd2a9f
Show file tree
Hide file tree
Showing 34 changed files with 406 additions and 338 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/python_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ jobs:
- name: Test Python package
shell: bash -l {0}
run: |
pytest -s -v ./tests/python
pytest -s -v -rxXs --durations=0 ./tests/python
python-tests-on-macos:
name: Test XGBoost Python package on ${{ matrix.config.os }}
Expand Down Expand Up @@ -177,4 +177,9 @@ jobs:
- name: Test Python package
shell: bash -l {0}
run: |
pytest -s -v ./tests/python
pytest -s -v -rxXs --durations=0 ./tests/python
- name: Test Dask Interface
shell: bash -l {0}
run: |
pytest -s -v -rxXs --durations=0 ./tests/test_distributed/test_with_dask
2 changes: 1 addition & 1 deletion python-package/xgboost/spark/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _fetch(self, data: Optional[Sequence[pd.DataFrame]]) -> Optional[pd.DataFram

# We must set the device after import cudf, which will change the device id to 0
# See https://github.com/rapidsai/cudf/issues/11386
cp.cuda.runtime.setDevice(self._device_id)
cp.cuda.runtime.setDevice(self._device_id) # pylint: disable=I1101
return cudf.DataFrame(data[self._iter])

return data[self._iter]
Expand Down
32 changes: 32 additions & 0 deletions python-package/xgboost/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,14 @@ def no_sklearn() -> PytestSkip:


def no_dask() -> PytestSkip:
if sys.platform.startswith("win"):
return {"reason": "Unsupported platform.", "condition": True}
return no_mod("dask")


def no_spark() -> PytestSkip:
if sys.platform.startswith("win") or sys.platform.startswith("darwin"):
return {"reason": "Unsupported platform.", "condition": True}
return no_mod("pyspark")


Expand Down Expand Up @@ -159,6 +163,10 @@ def no_graphviz() -> PytestSkip:
return no_mod("graphviz")


def no_rmm() -> PytestSkip:
return no_mod("rmm")


def no_multiple(*args: Any) -> PytestSkip:
condition = False
reason = ""
Expand Down Expand Up @@ -865,6 +873,30 @@ def timeout(sec: int, *args: Any, enable: bool = True, **kwargs: Any) -> Any:
return pytest.mark.timeout(None, *args, **kwargs)


def setup_rmm_pool(_: Any, pytestconfig: pytest.Config) -> None:
if pytestconfig.getoption("--use-rmm-pool"):
if no_rmm()["condition"]:
raise ImportError("The --use-rmm-pool option requires the RMM package")
if no_dask_cuda()["condition"]:
raise ImportError(
"The --use-rmm-pool option requires the dask_cuda package"
)
import rmm
from dask_cuda.utils import get_n_gpus

rmm.reinitialize(
pool_allocator=True,
initial_pool_size=1024 * 1024 * 1024,
devices=list(range(get_n_gpus())),
)


def get_client_workers(client: Any) -> List[str]:
"Get workers from a dask client."
workers = client.scheduler_info()["workers"]
return list(workers.keys())


def demo_dir(path: str) -> str:
"""Look for the demo directory based on the test file name."""
path = normpath(os.path.dirname(path))
Expand Down
49 changes: 49 additions & 0 deletions python-package/xgboost/testing/params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Strategies for updater tests."""

from typing import cast

import pytest

hypothesis = pytest.importorskip("hypothesis")
from hypothesis import strategies # pylint:disable=wrong-import-position

exact_parameter_strategy = strategies.fixed_dictionaries(
{
"nthread": strategies.integers(1, 4),
"max_depth": strategies.integers(1, 11),
"min_child_weight": strategies.floats(0.5, 2.0),
"alpha": strategies.floats(1e-5, 2.0),
"lambda": strategies.floats(1e-5, 2.0),
"eta": strategies.floats(0.01, 0.5),
"gamma": strategies.floats(1e-5, 2.0),
"seed": strategies.integers(0, 10),
# We cannot enable subsampling as the training loss can increase
# 'subsample': strategies.floats(0.5, 1.0),
"colsample_bytree": strategies.floats(0.5, 1.0),
"colsample_bylevel": strategies.floats(0.5, 1.0),
}
)

hist_parameter_strategy = strategies.fixed_dictionaries(
{
"max_depth": strategies.integers(1, 11),
"max_leaves": strategies.integers(0, 1024),
"max_bin": strategies.integers(2, 512),
"grow_policy": strategies.sampled_from(["lossguide", "depthwise"]),
"min_child_weight": strategies.floats(0.5, 2.0),
# We cannot enable subsampling as the training loss can increase
# 'subsample': strategies.floats(0.5, 1.0),
"colsample_bytree": strategies.floats(0.5, 1.0),
"colsample_bylevel": strategies.floats(0.5, 1.0),
}
).filter(
lambda x: (cast(int, x["max_depth"]) > 0 or cast(int, x["max_leaves"]) > 0)
and (cast(int, x["max_depth"]) > 0 or x["grow_policy"] == "lossguide")
)

cat_parameter_strategy = strategies.fixed_dictionaries(
{
"max_cat_to_onehot": strategies.integers(1, 128),
"max_cat_threshold": strategies.integers(1, 128),
}
)
95 changes: 95 additions & 0 deletions python-package/xgboost/testing/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Testing code shared by other tests."""
# pylint: disable=invalid-name
import collections
import importlib.util
import json
import os
import tempfile
from typing import Any, Callable, Dict, Type

import numpy as np
from xgboost._typing import ArrayLike

import xgboost as xgb


def validate_leaf_output(leaf: np.ndarray, num_parallel_tree: int) -> None:
"""Validate output for predict leaf tests."""
for i in range(leaf.shape[0]): # n_samples
for j in range(leaf.shape[1]): # n_rounds
for k in range(leaf.shape[2]): # n_classes
tree_group = leaf[i, j, k, :]
assert tree_group.shape[0] == num_parallel_tree
# No sampling, all trees within forest are the same
assert np.all(tree_group == tree_group[0])


def validate_data_initialization(
dmatrix: Type, model: Type[xgb.XGBModel], X: ArrayLike, y: ArrayLike
) -> None:
"""Assert that we don't create duplicated DMatrix."""

old_init = dmatrix.__init__
count = [0]

def new_init(self: Any, **kwargs: Any) -> Callable:
count[0] += 1
return old_init(self, **kwargs)

dmatrix.__init__ = new_init
model(n_estimators=1).fit(X, y, eval_set=[(X, y)])

assert count[0] == 1
count[0] = 0 # only 1 DMatrix is created.

y_copy = y.copy()
model(n_estimators=1).fit(X, y, eval_set=[(X, y_copy)])
assert count[0] == 2 # a different Python object is considered different

dmatrix.__init__ = old_init


# pylint: disable=too-many-arguments,too-many-locals
def get_feature_weights(
X: ArrayLike,
y: ArrayLike,
fw: np.ndarray,
parser_path: str,
tree_method: str,
model: Type[xgb.XGBModel] = xgb.XGBRegressor,
) -> np.ndarray:
"""Get feature weights using the demo parser."""
with tempfile.TemporaryDirectory() as tmpdir:
colsample_bynode = 0.5
reg = model(tree_method=tree_method, colsample_bynode=colsample_bynode)

reg.fit(X, y, feature_weights=fw)
model_path = os.path.join(tmpdir, "model.json")
reg.save_model(model_path)
with open(model_path, "r", encoding="utf-8") as fd:
model = json.load(fd)

spec = importlib.util.spec_from_file_location("JsonParser", parser_path)
assert spec is not None
jsonm = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(jsonm)
model = jsonm.Model(model)
splits: Dict[int, int] = {}
total_nodes = 0
for tree in model.trees:
n_nodes = len(tree.nodes)
total_nodes += n_nodes
for n in range(n_nodes):
if tree.is_leaf(n):
continue
if splits.get(tree.split_index(n), None) is None:
splits[tree.split_index(n)] = 1
else:
splits[tree.split_index(n)] += 1

od = collections.OrderedDict(sorted(splits.items()))
tuples = list(od.items())
k, v = list(zip(*tuples))
w = np.polyfit(k, v, deg=1)
return w
3 changes: 1 addition & 2 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ facilities.
dependencies for tests, see conda files in `ci_build`.
* python-gpu: Similar to python tests, but for GPU.
* travis: CI facilities for Travis.
* distributed: Legacy tests for distributed system. Most of the distributed tests are
in Python tests using `dask` and jvm package using `spark`.
* distributed: Test for distributed system.
* benchmark: Legacy benchmark code. There are a number of benchmark projects for
XGBoost with much better configurations.

Expand Down
2 changes: 2 additions & 0 deletions tests/ci_build/conda_env/python_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ dependencies:
- isort
- pyspark
- cloudpickle
- pytest
- hypothesis
17 changes: 11 additions & 6 deletions tests/ci_build/lint_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ def print_summary_map(result_map: Dict[str, Dict[str, int]]) -> int:


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(
description=(
"Run static checkers for XGBoost, see `python_lint.yml' "
"conda env file for a list of dependencies."
)
)
parser.add_argument("--format", type=int, choices=[0, 1], default=1)
parser.add_argument("--type-check", type=int, choices=[0, 1], default=1)
parser.add_argument("--pylint", type=int, choices=[0, 1], default=1)
Expand All @@ -125,11 +130,11 @@ def print_summary_map(result_map: Dict[str, Dict[str, int]]) -> int:
# tests
"tests/python/test_config.py",
"tests/python/test_data_iterator.py",
"tests/python/test_spark/",
"tests/python/test_quantile_dmatrix.py",
"tests/python-gpu/test_gpu_spark/",
"tests/python-gpu/test_gpu_data_iterator.py",
"tests/ci_build/lint_python.py",
"tests/test_distributed/test_with_spark/",
"tests/test_distributed/test_gpu_with_spark/",
# demo
"demo/guide-python/cat_in_the_dat.py",
"demo/guide-python/categorical.py",
Expand All @@ -146,11 +151,11 @@ def print_summary_map(result_map: Dict[str, Dict[str, int]]) -> int:
"demo/guide-python/external_memory.py",
"demo/guide-python/cat_in_the_dat.py",
"tests/python/test_data_iterator.py",
"tests/python/test_spark/test_data.py",
"tests/python-gpu/test_gpu_with_dask/test_gpu_with_dask.py",
"tests/python-gpu/test_gpu_data_iterator.py",
"tests/python-gpu/test_gpu_spark/test_data.py",
"tests/ci_build/lint_python.py",
"tests/test_distributed/test_with_spark/test_data.py",
"tests/test_distributed/test_gpu_with_spark/test_data.py",
"tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py",
]
):
sys.exit(-1)
Expand Down
4 changes: 4 additions & 0 deletions tests/ci_build/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ case "$suite" in
install_xgboost
setup_pyspark_envs
pytest -v -s -rxXs --fulltrace --durations=0 -m "mgpu" ${args} tests/python-gpu
pytest -v -s -rxXs --fulltrace --durations=0 -m "mgpu" ${args} tests/test_distributed/test_gpu_with_dask
pytest -v -s -rxXs --fulltrace --durations=0 -m "mgpu" ${args} tests/test_distributed/test_gpu_with_spark
unset_pyspark_envs
uninstall_xgboost
set +x
Expand All @@ -80,6 +82,8 @@ case "$suite" in
export RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE=1
setup_pyspark_envs
pytest -v -s -rxXs --fulltrace --durations=0 ${args} tests/python
pytest -v -s -rxXs --fulltrace --durations=0 ${args} tests/test_distributed/test_with_dask
pytest -v -s -rxXs --fulltrace --durations=0 ${args} tests/test_distributed/test_with_spark
unset_pyspark_envs
uninstall_xgboost
set +x
Expand Down
50 changes: 9 additions & 41 deletions tests/python-gpu/conftest.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,21 @@
import pytest

from xgboost import testing as tm # noqa
from xgboost import testing as tm


def has_rmm():
try:
import rmm
return True
except ImportError:
return False
return tm.no_rmm()["condition"]

@pytest.fixture(scope='session', autouse=True)

@pytest.fixture(scope="session", autouse=True)
def setup_rmm_pool(request, pytestconfig):
if pytestconfig.getoption('--use-rmm-pool'):
if not has_rmm():
raise ImportError('The --use-rmm-pool option requires the RMM package')
import rmm
from dask_cuda.utils import get_n_gpus
rmm.reinitialize(pool_allocator=True, initial_pool_size=1024*1024*1024,
devices=list(range(get_n_gpus())))
tm.setup_rmm_pool(request, pytestconfig)

@pytest.fixture(scope='class')
def local_cuda_client(request, pytestconfig):
kwargs = {}
if hasattr(request, 'param'):
kwargs.update(request.param)
if pytestconfig.getoption('--use-rmm-pool'):
if not has_rmm():
raise ImportError('The --use-rmm-pool option requires the RMM package')
import rmm
kwargs['rmm_pool_size'] = '2GB'
if tm.no_dask_cuda()['condition']:
raise ImportError('The local_cuda_cluster fixture requires dask_cuda package')
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
yield Client(LocalCUDACluster(**kwargs))

def pytest_addoption(parser):
parser.addoption('--use-rmm-pool', action='store_true', default=False, help='Use RMM pool')
def pytest_addoption(parser: pytest.Parser) -> None:
parser.addoption(
"--use-rmm-pool", action="store_true", default=False, help="Use RMM pool"
)


def pytest_collection_modifyitems(config, items):
Expand All @@ -53,13 +31,3 @@ def pytest_collection_modifyitems(config, items):
for item in items:
if any(item.nodeid.startswith(x) for x in blocklist):
item.add_marker(skip_mark)

# mark dask tests as `mgpu`.
mgpu_mark = pytest.mark.mgpu
for item in items:
if item.nodeid.startswith(
"python-gpu/test_gpu_with_dask/test_gpu_with_dask.py"
) or item.nodeid.startswith(
"python-gpu/test_gpu_spark/test_gpu_spark.py"
):
item.add_marker(mgpu_mark)
23 changes: 0 additions & 23 deletions tests/python-gpu/test_gpu_spark/test_data.py

This file was deleted.

Loading

0 comments on commit cfd2a9f

Please sign in to comment.