Skip to content

Commit

Permalink
Prevent enabling delayed submission in non-thread progress mode
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev committed Jun 1, 2023
1 parent a1a338d commit 3133ed3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
16 changes: 13 additions & 3 deletions python/ucxx/_lib_async/application_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def __init__(
self.progress_mode = ApplicationContext._check_progress_mode(progress_mode)

enable_delayed_submission = ApplicationContext._check_enable_delayed_submission(
enable_delayed_submission
enable_delayed_submission,
self.progress_mode,
)
enable_python_future = ApplicationContext._check_enable_python_future(
enable_python_future, self.progress_mode
Expand Down Expand Up @@ -84,7 +85,7 @@ def _check_progress_mode(progress_mode):
return progress_mode

@staticmethod
def _check_enable_delayed_submission(enable_delayed_submission):
def _check_enable_delayed_submission(enable_delayed_submission, progress_mode):
if enable_delayed_submission is None:
if "UCXPY_ENABLE_DELAYED_SUBMISSION" in os.environ:
explicit_enable_delayed_submission = (
Expand All @@ -93,10 +94,19 @@ def _check_enable_delayed_submission(enable_delayed_submission):
else True
)
else:
explicit_enable_delayed_submission = True
explicit_enable_delayed_submission = progress_mode.startswith("thread")
else:
explicit_enable_delayed_submission = enable_delayed_submission

if (
not progress_mode.startswith("thread")
and explicit_enable_delayed_submission
):
raise ValueError(
f"Delayed submission requested, but {progress_mode} does not "
"support it, 'thread' or 'thread-polling' progress mode required."
)

return explicit_enable_delayed_submission

@staticmethod
Expand Down
43 changes: 32 additions & 11 deletions python/ucxx/_lib_async/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@
async def test_worker_capabilities_args(
enable_delayed_submission, enable_python_future
):
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)
progress_mode = os.getenv("UCXPY_PROGRESS_MODE", "thread")

worker = ucxx.core._get_ctx().worker
if enable_delayed_submission and not progress_mode.startswith("thread"):
with pytest.raises(ValueError, match="Delayed submission requested, but"):
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)
else:
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
assert worker.is_python_future_enabled() is enable_python_future
worker = ucxx.core._get_ctx().worker

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
if progress_mode.startswith("thread"):
assert worker.is_python_future_enabled() is enable_python_future
else:
assert worker.is_python_future_enabled() is False


@pytest.mark.asyncio
Expand All @@ -39,9 +51,18 @@ async def test_worker_capabilities_env(enable_delayed_submission, enable_python_
"UCXPY_ENABLE_PYTHON_FUTURE": "1" if enable_python_future else "0",
},
):
ucxx.init()
progress_mode = os.getenv("UCXPY_PROGRESS_MODE", "thread")

worker = ucxx.core._get_ctx().worker
if enable_delayed_submission and not progress_mode.startswith("thread"):
with pytest.raises(ValueError, match="Delayed submission requested, but"):
ucxx.init()
else:
ucxx.init()

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
assert worker.is_python_future_enabled() is enable_python_future
worker = ucxx.core._get_ctx().worker

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
if progress_mode.startswith("thread"):
assert worker.is_python_future_enabled() is enable_python_future
else:
assert worker.is_python_future_enabled() is False

0 comments on commit 3133ed3

Please sign in to comment.