diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 1babaa2c5..0b0f9d5b7 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -239,7 +239,8 @@ def test_spill_on_demand(root_dir): @pytest.mark.parametrize("jit_unspill", [True, False]) -def test_local_cuda_cluster(jit_unspill): +@gen_test(timeout=20) +async def test_local_cuda_cluster(jit_unspill): """Testing spilling of a proxied cudf dataframe in a local cuda cluster""" cudf = pytest.importorskip("cudf") dask_cudf = pytest.importorskip("dask_cudf") @@ -256,14 +257,17 @@ def task(x): return x # Notice, setting `device_memory_limit=1B` to trigger spilling - with dask_cuda.LocalCUDACluster( - n_workers=1, device_memory_limit="1B", jit_unspill=jit_unspill + async with dask_cuda.LocalCUDACluster( + n_workers=1, + device_memory_limit="1B", + jit_unspill=jit_unspill, + asynchronous=True, ) as cluster: - with Client(cluster): + async with Client(cluster, asynchronous=True) as client: df = cudf.DataFrame({"a": range(10)}) ddf = dask_cudf.from_cudf(df, npartitions=1) ddf = ddf.map_partitions(task, meta=df.head()) - got = ddf.compute() + got = await client.compute(ddf) assert_frame_equal(got.to_pandas(), df.to_pandas()) @@ -381,15 +385,18 @@ def test_incompatible_types(root_dir): @pytest.mark.parametrize("npartitions", [1, 2, 3]) @pytest.mark.parametrize("compatibility_mode", [True, False]) -def test_compatibility_mode_dataframe_shuffle(compatibility_mode, npartitions): +@gen_test(timeout=20) +async def test_compatibility_mode_dataframe_shuffle(compatibility_mode, npartitions): cudf = pytest.importorskip("cudf") def is_proxy_object(x): return "ProxyObject" in str(type(x)) with dask.config.set(jit_unspill_compatibility_mode=compatibility_mode): - with dask_cuda.LocalCUDACluster(n_workers=1, jit_unspill=True) as cluster: - with Client(cluster): + async with dask_cuda.LocalCUDACluster( + n_workers=1, jit_unspill=True, asynchronous=True + ) as cluster: + async with Client(cluster, asynchronous=True) as client: ddf = dask.dataframe.from_pandas( cudf.DataFrame({"key": np.arange(10)}), npartitions=npartitions ) @@ -397,8 +404,8 @@ def is_proxy_object(x): # With compatibility mode on, we shouldn't encounter any proxy objects if compatibility_mode: - assert "ProxyObject" not in str(type(res.compute())) - res = res.map_partitions(is_proxy_object).compute() + assert "ProxyObject" not in str(type(await client.compute(res))) + res = await client.compute(res.map_partitions(is_proxy_object)) res = res.to_list() if compatibility_mode: diff --git a/pyproject.toml b/pyproject.toml index 7a88741ea..f8d98957a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -121,6 +121,8 @@ filterwarnings = [ # tornado 6.2, remove when dask/distributed#6669 is fixed "ignore:clear_current is deprecated:DeprecationWarning:", "ignore:make_current is deprecated:DeprecationWarning:", + # remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed + "ignore:There is no current event loop:DeprecationWarning:tornado", ] [tool.setuptools]