Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable UCX-Py in Dask tests #5843

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/run_cuml_dask_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@
# Support invoking run_cuml_dask_pytests.sh outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cuml/tests/dask

rapids-logger "pytest cuml-dask (No UCX-Py/UCXX)"
python -m pytest --cache-clear "$@" .

rapids-logger "pytest cuml-dask (UCX-Py only)"
python -m pytest --cache-clear --run_ucx "$@" .
1 change: 0 additions & 1 deletion ci/test_python_dask.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ EXITCODE=0
trap "EXITCODE=1" ERR
set +e

rapids-logger "pytest cuml-dask"
./ci/run_cuml_dask_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cuml-dask.xml" \
--cov-config=../../../.coveragerc \
Expand Down
1 change: 0 additions & 1 deletion ci/test_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ else
-k 'test_sparse_pca_inputs' \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cuml-sparse-pca.xml"

rapids-logger "pytest cuml-dask"
./ci/run_cuml_dask_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cuml-dask.xml"
fi
Expand Down
35 changes: 24 additions & 11 deletions python/cuml/tests/dask/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

import pytest

Expand Down Expand Up @@ -34,18 +34,8 @@ def client(cluster):

@pytest.fixture(scope="module")
def ucx_cluster():
initialize.initialize(
create_cuda_context=True,
enable_tcp_over_ucx=enable_tcp_over_ucx,
enable_nvlink=enable_nvlink,
enable_infiniband=enable_infiniband,
)
cluster = LocalCUDACluster(
protocol="ucx",
enable_tcp_over_ucx=enable_tcp_over_ucx,
enable_nvlink=enable_nvlink,
enable_infiniband=enable_infiniband,
worker_class=IncreasedCloseTimeoutNanny,
)
yield cluster
cluster.close()
Expand All @@ -57,3 +47,26 @@ def ucx_client(ucx_cluster):
client = Client(ucx_cluster)
yield client
client.close()


def pytest_addoption(parser):
group = parser.getgroup("Dask cuML Custom Options")

group.addoption(
"--run_ucx", action="store_true", help="run _only_ UCX-Py tests"
)


def pytest_collection_modifyitems(config, items):
if config.getoption("--run_ucx"):
skip_others = pytest.mark.skip(
reason="only runs when --run_ucx is not specified"
)
for item in items:
if "ucx" not in item.keywords:
item.add_marker(skip_others)
else:
skip_ucx = pytest.mark.skip(reason="requires --run_ucx to run")
for item in items:
if "ucx" in item.keywords:
item.add_marker(skip_ucx)
167 changes: 139 additions & 28 deletions python/cuml/tests/dask/test_dask_nearest_neighbors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,33 +81,18 @@ def _scale_rows(client, nrows):
return n_workers * nrows


@pytest.mark.parametrize(
"nrows", [unit_param(300), quality_param(1e6), stress_param(5e8)]
)
@pytest.mark.parametrize("ncols", [10, 30])
@pytest.mark.parametrize(
"nclusters", [unit_param(5), quality_param(10), stress_param(15)]
)
@pytest.mark.parametrize(
"n_neighbors", [unit_param(10), quality_param(4), stress_param(100)]
)
@pytest.mark.parametrize(
"n_parts",
[unit_param(1), unit_param(5), quality_param(7), stress_param(50)],
)
@pytest.mark.parametrize(
"streams_per_handle,reverse_worker_order", [(5, True), (10, False)]
)
def test_compare_skl(
def _test_compare_skl(
nrows,
ncols,
nclusters,
n_parts,
n_neighbors,
streams_per_handle,
reverse_worker_order,
client,
dask_client,
request,
):
client = request.getfixturevalue(dask_client)

from cuml.dask.neighbors import NearestNeighbors as daskNN

Expand Down Expand Up @@ -162,11 +147,89 @@ def test_compare_skl(
assert array_equal(y_hat, skl_y_hat)


@pytest.mark.parametrize("nrows", [unit_param(1000), stress_param(1e5)])
@pytest.mark.parametrize("ncols", [unit_param(10), stress_param(500)])
@pytest.mark.parametrize("n_parts", [unit_param(10), stress_param(100)])
@pytest.mark.parametrize("batch_size", [unit_param(100), stress_param(1e3)])
def test_batch_size(nrows, ncols, n_parts, batch_size, client):
@pytest.mark.parametrize(
"nrows", [unit_param(300), quality_param(1e6), stress_param(5e8)]
)
@pytest.mark.parametrize("ncols", [10, 30])
@pytest.mark.parametrize(
"nclusters", [unit_param(5), quality_param(10), stress_param(15)]
)
@pytest.mark.parametrize(
"n_neighbors", [unit_param(10), quality_param(4), stress_param(100)]
)
@pytest.mark.parametrize(
"n_parts",
[unit_param(1), unit_param(5), quality_param(7), stress_param(50)],
)
@pytest.mark.parametrize(
"streams_per_handle,reverse_worker_order", [(5, True), (10, False)]
)
def test_compare_skl(
nrows,
ncols,
nclusters,
n_parts,
n_neighbors,
streams_per_handle,
reverse_worker_order,
request,
):
_test_compare_skl(
nrows,
ncols,
nclusters,
n_parts,
n_neighbors,
streams_per_handle,
reverse_worker_order,
"client",
request,
)


@pytest.mark.parametrize(
"nrows", [unit_param(300), quality_param(1e6), stress_param(5e8)]
)
@pytest.mark.parametrize("ncols", [10, 30])
@pytest.mark.parametrize(
"nclusters", [unit_param(5), quality_param(10), stress_param(15)]
)
@pytest.mark.parametrize(
"n_neighbors", [unit_param(10), quality_param(4), stress_param(100)]
)
@pytest.mark.parametrize(
"n_parts",
[unit_param(1), unit_param(5), quality_param(7), stress_param(50)],
)
@pytest.mark.parametrize(
"streams_per_handle,reverse_worker_order", [(5, True), (10, False)]
)
@pytest.mark.ucx
def test_compare_skl_ucx(
nrows,
ncols,
nclusters,
n_parts,
n_neighbors,
streams_per_handle,
reverse_worker_order,
request,
):
_test_compare_skl(
nrows,
ncols,
nclusters,
n_parts,
n_neighbors,
streams_per_handle,
reverse_worker_order,
"ucx_client",
request,
)


def _test_batch_size(nrows, ncols, n_parts, batch_size, dask_client, request):
client = request.getfixturevalue(dask_client)

n_neighbors = 10
n_clusters = 5
Expand Down Expand Up @@ -202,7 +265,25 @@ def test_batch_size(nrows, ncols, n_parts, batch_size, client):
assert array_equal(y_hat, y)


def test_return_distance(client):
@pytest.mark.parametrize("nrows", [unit_param(1000), stress_param(1e5)])
@pytest.mark.parametrize("ncols", [unit_param(10), stress_param(500)])
@pytest.mark.parametrize("n_parts", [unit_param(10), stress_param(100)])
@pytest.mark.parametrize("batch_size", [unit_param(100), stress_param(1e3)])
def test_batch_size(nrows, ncols, n_parts, batch_size, request):
_test_batch_size(nrows, ncols, n_parts, batch_size, "client", request)


@pytest.mark.parametrize("nrows", [unit_param(1000), stress_param(1e5)])
@pytest.mark.parametrize("ncols", [unit_param(10), stress_param(500)])
@pytest.mark.parametrize("n_parts", [unit_param(10), stress_param(100)])
@pytest.mark.parametrize("batch_size", [unit_param(100), stress_param(1e3)])
@pytest.mark.ucx
def test_batch_size_ucx(nrows, ncols, n_parts, batch_size, request):
_test_batch_size(nrows, ncols, n_parts, batch_size, "ucx_client", request)


def _test_return_distance(dask_client, request):
client = request.getfixturevalue(dask_client)

n_samples = 50
n_feats = 50
Expand Down Expand Up @@ -233,7 +314,17 @@ def test_return_distance(client):
assert len(ret) == 2


def test_default_n_neighbors(client):
def test_return_distance(request):
_test_return_distance("client", request)


@pytest.mark.ucx
def test_return_distance_ucx(request):
_test_return_distance("ucx_client", request)


def _test_default_n_neighbors(dask_client, request):
client = request.getfixturevalue(dask_client)

n_samples = 50
n_feats = 50
Expand Down Expand Up @@ -269,7 +360,18 @@ def test_default_n_neighbors(client):
assert ret.shape[1] == k


def test_one_query_partition(client):
def test_default_n_neighbors(request):
_test_default_n_neighbors("client", request)


@pytest.mark.ucx
def test_default_n_neighbors_ucx(request):
_test_default_n_neighbors("ucx_client", request)


def _test_one_query_partition(dask_client, request):
client = request.getfixturevalue(dask_client) # noqa

from cuml.dask.neighbors import NearestNeighbors as daskNN
from cuml.dask.datasets import make_blobs

Expand All @@ -280,3 +382,12 @@ def test_one_query_partition(client):
cumlModel = daskNN(n_neighbors=4)
cumlModel.fit(X_train)
cumlModel.kneighbors(X_test)


def test_one_query_partition(request):
_test_one_query_partition("client", request)


@pytest.mark.ucx
def test_one_query_partition_ucx(request):
_test_one_query_partition("ucx_client", request)
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ markers = [
"mg: Multi-GPU tests",
"memleak: Test that checks for memory leaks",
"no_bad_cuml_array_check: Test that should not check for bad CumlArray uses",
"ucx: Run _only_ Dask UCX-Py tests",
]

testpaths = "cuml/tests"
Expand Down
1 change: 1 addition & 0 deletions python/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ markers =
mg: Multi-GPU tests
memleak: Test that checks for memory leaks
no_bad_cuml_array_check: Test that should not check for bad CumlArray uses
ucx: Run _only_ Dask UCX-Py tests

testpaths =
cuml/tests
Expand Down
Loading