Skip to content

Commit

Permalink
More fixes (#55)
Browse files Browse the repository at this point in the history
Fixes various recently discovered issues

- Reenable several Python async tests in build_and_run.sh
- Progress the worker when probing for tag
- Prevent enabling delayed submission in non-thread progress mode
- Prevent canceling/freeing request that has completed

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - AJ Schmidt (https://github.com/ajschmidt8)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #55
  • Loading branch information
pentschev authored Jun 9, 2023
1 parent c7e4f09 commit d6243f2
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 27 deletions.
16 changes: 8 additions & 8 deletions build_and_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,14 @@ if [[ $RUN_PY_TESTS != 0 ]]; then
fi
if [[ $RUN_PY_ASYNC_TESTS != 0 ]]; then
# run_tests_async PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE SKIP
run_tests_async polling 0 0 1 # Unstable
run_tests_async polling 0 1 1 # Unstable
run_tests_async polling 1 0 1 # Unstable
run_tests_async polling 1 1 1 # Unstable
run_tests_async thread-polling 0 0 1 # Unstable
run_tests_async thread-polling 0 1 1 # Unstable
run_tests_async thread-polling 1 0 1 # Unstable
run_tests_async thread-polling 1 1 1 # Unstable
run_tests_async polling 0 0 0
run_tests_async polling 0 1 0
run_tests_async polling 1 0 1 # Delayed submission can't be used with polling
run_tests_async polling 1 1 1 # Delayed submission can't be used with polling
run_tests_async thread-polling 0 0 0
run_tests_async thread-polling 0 1 0
run_tests_async thread-polling 1 0 0
run_tests_async thread-polling 1 1 0
run_tests_async thread 0 0 0
run_tests_async thread 0 1 0
run_tests_async thread 1 0 0
Expand Down
20 changes: 19 additions & 1 deletion ci/test_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,25 @@ run_benchmark() {
rapids-logger "Running: \n - ${CMD_LINE_SERVER}\n - ${CMD_LINE_CLIENT}"
UCX_TCP_CM_REUSEADDR=y timeout 1m ${BINARY_PATH}/benchmarks/libucxx/ucxx_perftest -s 8388608 -r -n 20 -m ${PROGRESS_MODE} &
sleep 1
timeout 1m ${BINARY_PATH}/benchmarks/libucxx/ucxx_perftest -s 8388608 -r -n 20 -m ${PROGRESS_MODE} 127.0.0.1

MAX_ATTEMPTS=10

set +e
for attempt in $(seq 1 ${MAX_ATTEMPTS}); do
echo "Attempt ${attempt}/${MAX_ATTEMPTS} to run client"
timeout 1m ${BINARY_PATH}/benchmarks/libucxx/ucxx_perftest -s 8388608 -r -n 20 -m ${PROGRESS_MODE} 127.0.0.1
LAST_STATUS=$?
if [ ${LAST_STATUS} -eq 0 ]; then
break;
fi
sleep 1
done
set -e

if [ ${LAST_STATUS} -ne 0 ]; then
echo "Failure running benchmark client after ${MAX_ATTEMPTS} attempts"
exit $LAST_STATUS
fi
}

run_example() {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void Request::cancel()
ucs_status_string(status));
} else {
ucxx_trace_req_f(_ownerString.c_str(), _request, _operationName.c_str(), "canceling");
ucp_request_cancel(_worker->getHandle(), _request);
if (_request != nullptr) ucp_request_cancel(_worker->getHandle(), _request);
}
} else {
auto status = _status.load();
Expand Down Expand Up @@ -105,7 +105,7 @@ void Request::callback(void* request, ucs_status_t status)
_callback.target<void (*)(void)>());
if (_callback) _callback(status, _callbackData);

ucp_request_free(request);
if (_request != nullptr) ucp_request_free(request);

ucxx_trace("Request completed: %p, handle: %p", this, request);
setStatus(status);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ void Worker::removeInflightRequest(const Request* const request)

bool Worker::tagProbe(ucp_tag_t tag)
{
// TODO: Fix temporary workaround, if progress thread is active we must wait for it
// to progress the worker instead.
progress();

ucp_tag_recv_info_t info;
ucp_tag_message_h tag_message = ucp_tag_probe_nb(_handle, tag, -1, 0, &info);

Expand Down
16 changes: 13 additions & 3 deletions python/ucxx/_lib_async/application_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def __init__(
self.progress_mode = ApplicationContext._check_progress_mode(progress_mode)

enable_delayed_submission = ApplicationContext._check_enable_delayed_submission(
enable_delayed_submission
enable_delayed_submission,
self.progress_mode,
)
enable_python_future = ApplicationContext._check_enable_python_future(
enable_python_future, self.progress_mode
Expand Down Expand Up @@ -84,7 +85,7 @@ def _check_progress_mode(progress_mode):
return progress_mode

@staticmethod
def _check_enable_delayed_submission(enable_delayed_submission):
def _check_enable_delayed_submission(enable_delayed_submission, progress_mode):
if enable_delayed_submission is None:
if "UCXPY_ENABLE_DELAYED_SUBMISSION" in os.environ:
explicit_enable_delayed_submission = (
Expand All @@ -93,10 +94,19 @@ def _check_enable_delayed_submission(enable_delayed_submission):
else True
)
else:
explicit_enable_delayed_submission = True
explicit_enable_delayed_submission = progress_mode.startswith("thread")
else:
explicit_enable_delayed_submission = enable_delayed_submission

if (
not progress_mode.startswith("thread")
and explicit_enable_delayed_submission
):
raise ValueError(
f"Delayed submission requested, but {progress_mode} does not "
"support it, 'thread' or 'thread-polling' progress mode required."
)

return explicit_enable_delayed_submission

@staticmethod
Expand Down
14 changes: 12 additions & 2 deletions python/ucxx/_lib_async/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,15 @@ async def recv(self, buffer, tag=None, force_tag=False):
elif not force_tag:
tag = hash64bits(self._tags["msg_recv"], hash(tag))

if not self._ctx.worker.tag_probe(tag):
try:
self._ep.raise_on_error()
if self.closed():
raise UCXCloseError("Endpoint closed")
except Exception as e:
# Only probe the worker as last resort. To be reliable, probing for the tag
# requires progressing the worker, thus prevent that happening too often.
if not self._ctx.worker.tag_probe(tag):
raise e

if not isinstance(buffer, Array):
buffer = Array(buffer)
Expand Down Expand Up @@ -293,10 +298,15 @@ async def recv_multi(self, tag=None, force_tag=False):
elif not force_tag:
tag = hash64bits(self._tags["msg_recv"], hash(tag))

if not self._ctx.worker.tag_probe(tag):
try:
self._ep.raise_on_error()
if self.closed():
raise UCXCloseError("Endpoint closed")
except Exception as e:
# Only probe the worker as last resort. To be reliable, probing for the tag
# requires progressing the worker, thus prevent that happening too often.
if not self._ctx.worker.tag_probe(tag):
raise e

# Optimization to eliminate producing logger string overhead
if logger.isEnabledFor(logging.DEBUG):
Expand Down
43 changes: 32 additions & 11 deletions python/ucxx/_lib_async/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@
async def test_worker_capabilities_args(
enable_delayed_submission, enable_python_future
):
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)
progress_mode = os.getenv("UCXPY_PROGRESS_MODE", "thread")

worker = ucxx.core._get_ctx().worker
if enable_delayed_submission and not progress_mode.startswith("thread"):
with pytest.raises(ValueError, match="Delayed submission requested, but"):
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)
else:
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
assert worker.is_python_future_enabled() is enable_python_future
worker = ucxx.core._get_ctx().worker

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
if progress_mode.startswith("thread"):
assert worker.is_python_future_enabled() is enable_python_future
else:
assert worker.is_python_future_enabled() is False


@pytest.mark.asyncio
Expand All @@ -39,9 +51,18 @@ async def test_worker_capabilities_env(enable_delayed_submission, enable_python_
"UCXPY_ENABLE_PYTHON_FUTURE": "1" if enable_python_future else "0",
},
):
ucxx.init()
progress_mode = os.getenv("UCXPY_PROGRESS_MODE", "thread")

worker = ucxx.core._get_ctx().worker
if enable_delayed_submission and not progress_mode.startswith("thread"):
with pytest.raises(ValueError, match="Delayed submission requested, but"):
ucxx.init()
else:
ucxx.init()

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
assert worker.is_python_future_enabled() is enable_python_future
worker = ucxx.core._get_ctx().worker

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
if progress_mode.startswith("thread"):
assert worker.is_python_future_enabled() is enable_python_future
else:
assert worker.is_python_future_enabled() is False

0 comments on commit d6243f2

Please sign in to comment.