From 3133ed3020c24e0662af4685ebf6bf1d2fa601e9 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 31 May 2023 12:05:07 -0700 Subject: [PATCH] Prevent enabling delayed submission in non-thread progress mode --- python/ucxx/_lib_async/application_context.py | 16 +++++-- python/ucxx/_lib_async/tests/test_worker.py | 43 ++++++++++++++----- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/python/ucxx/_lib_async/application_context.py b/python/ucxx/_lib_async/application_context.py index d0c2ab5a..653ff6cd 100644 --- a/python/ucxx/_lib_async/application_context.py +++ b/python/ucxx/_lib_async/application_context.py @@ -41,7 +41,8 @@ def __init__( self.progress_mode = ApplicationContext._check_progress_mode(progress_mode) enable_delayed_submission = ApplicationContext._check_enable_delayed_submission( - enable_delayed_submission + enable_delayed_submission, + self.progress_mode, ) enable_python_future = ApplicationContext._check_enable_python_future( enable_python_future, self.progress_mode @@ -84,7 +85,7 @@ def _check_progress_mode(progress_mode): return progress_mode @staticmethod - def _check_enable_delayed_submission(enable_delayed_submission): + def _check_enable_delayed_submission(enable_delayed_submission, progress_mode): if enable_delayed_submission is None: if "UCXPY_ENABLE_DELAYED_SUBMISSION" in os.environ: explicit_enable_delayed_submission = ( @@ -93,10 +94,19 @@ def _check_enable_delayed_submission(enable_delayed_submission): else True ) else: - explicit_enable_delayed_submission = True + explicit_enable_delayed_submission = progress_mode.startswith("thread") else: explicit_enable_delayed_submission = enable_delayed_submission + if ( + not progress_mode.startswith("thread") + and explicit_enable_delayed_submission + ): + raise ValueError( + f"Delayed submission requested, but {progress_mode} does not " + "support it, 'thread' or 'thread-polling' progress mode required." + ) + return explicit_enable_delayed_submission @staticmethod diff --git a/python/ucxx/_lib_async/tests/test_worker.py b/python/ucxx/_lib_async/tests/test_worker.py index 71c33074..2dd73422 100644 --- a/python/ucxx/_lib_async/tests/test_worker.py +++ b/python/ucxx/_lib_async/tests/test_worker.py @@ -15,15 +15,27 @@ async def test_worker_capabilities_args( enable_delayed_submission, enable_python_future ): - ucxx.init( - enable_delayed_submission=enable_delayed_submission, - enable_python_future=enable_python_future, - ) + progress_mode = os.getenv("UCXPY_PROGRESS_MODE", "thread") - worker = ucxx.core._get_ctx().worker + if enable_delayed_submission and not progress_mode.startswith("thread"): + with pytest.raises(ValueError, match="Delayed submission requested, but"): + ucxx.init( + enable_delayed_submission=enable_delayed_submission, + enable_python_future=enable_python_future, + ) + else: + ucxx.init( + enable_delayed_submission=enable_delayed_submission, + enable_python_future=enable_python_future, + ) - assert worker.is_delayed_submission_enabled() is enable_delayed_submission - assert worker.is_python_future_enabled() is enable_python_future + worker = ucxx.core._get_ctx().worker + + assert worker.is_delayed_submission_enabled() is enable_delayed_submission + if progress_mode.startswith("thread"): + assert worker.is_python_future_enabled() is enable_python_future + else: + assert worker.is_python_future_enabled() is False @pytest.mark.asyncio @@ -39,9 +51,18 @@ async def test_worker_capabilities_env(enable_delayed_submission, enable_python_ "UCXPY_ENABLE_PYTHON_FUTURE": "1" if enable_python_future else "0", }, ): - ucxx.init() + progress_mode = os.getenv("UCXPY_PROGRESS_MODE", "thread") - worker = ucxx.core._get_ctx().worker + if enable_delayed_submission and not progress_mode.startswith("thread"): + with pytest.raises(ValueError, match="Delayed submission requested, but"): + ucxx.init() + else: + ucxx.init() - assert worker.is_delayed_submission_enabled() is enable_delayed_submission - assert worker.is_python_future_enabled() is enable_python_future + worker = ucxx.core._get_ctx().worker + + assert worker.is_delayed_submission_enabled() is enable_delayed_submission + if progress_mode.startswith("thread"): + assert worker.is_python_future_enabled() is enable_python_future + else: + assert worker.is_python_future_enabled() is False