Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More fixes #55

Merged
merged 5 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions build_and_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ if [[ $RUN_PY_TESTS != 0 ]]; then
fi
if [[ $RUN_PY_ASYNC_TESTS != 0 ]]; then
# run_tests_async PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE SKIP
run_tests_async polling 0 0 1 # Unstable
run_tests_async polling 0 1 1 # Unstable
run_tests_async polling 1 0 1 # Unstable
run_tests_async polling 1 1 1 # Unstable
run_tests_async thread-polling 0 0 1 # Unstable
run_tests_async thread-polling 0 1 1 # Unstable
run_tests_async thread-polling 1 0 1 # Unstable
run_tests_async thread-polling 1 1 1 # Unstable
run_tests_async polling 0 0 0
run_tests_async polling 0 1 0
run_tests_async polling 1 0 1 # Delayed submission can't be used with polling
run_tests_async polling 1 1 1 # Delayed submission can't be used with polling
run_tests_async thread-polling 0 0 0
run_tests_async thread-polling 0 1 0
run_tests_async thread-polling 1 0 0
run_tests_async thread-polling 1 1 0
run_tests_async thread 0 0 0
run_tests_async thread 0 1 0
run_tests_async thread 1 0 0
Expand Down
20 changes: 19 additions & 1 deletion ci/test_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,25 @@ run_benchmark() {
rapids-logger "Running: \n - ${CMD_LINE_SERVER}\n - ${CMD_LINE_CLIENT}"
UCX_TCP_CM_REUSEADDR=y timeout 1m ${BINARY_PATH}/benchmarks/libucxx/ucxx_perftest -s 8388608 -r -n 20 -m ${PROGRESS_MODE} &
sleep 1
timeout 1m ${BINARY_PATH}/benchmarks/libucxx/ucxx_perftest -s 8388608 -r -n 20 -m ${PROGRESS_MODE} 127.0.0.1

MAX_ATTEMPTS=10

set +e
for attempt in $(seq 1 ${MAX_ATTEMPTS}); do
echo "Attempt ${attempt}/${MAX_ATTEMPTS} to run client"
timeout 1m ${BINARY_PATH}/benchmarks/libucxx/ucxx_perftest -s 8388608 -r -n 20 -m ${PROGRESS_MODE} 127.0.0.1
LAST_STATUS=$?
if [ ${LAST_STATUS} -eq 0 ]; then
break;
fi
sleep 1
done
set -e

if [ ${LAST_STATUS} -ne 0 ]; then
echo "Failure running benchmark client after ${MAX_ATTEMPTS} attempts"
exit $LAST_STATUS
fi
}

run_example() {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void Request::cancel()
ucs_status_string(status));
} else {
ucxx_trace_req_f(_ownerString.c_str(), _request, _operationName.c_str(), "canceling");
ucp_request_cancel(_worker->getHandle(), _request);
if (_request != nullptr) ucp_request_cancel(_worker->getHandle(), _request);
}
} else {
auto status = _status.load();
Expand Down Expand Up @@ -105,7 +105,7 @@ void Request::callback(void* request, ucs_status_t status)
_callback.target<void (*)(void)>());
if (_callback) _callback(status, _callbackData);

ucp_request_free(request);
if (_request != nullptr) ucp_request_free(request);

ucxx_trace("Request completed: %p, handle: %p", this, request);
setStatus(status);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ void Worker::removeInflightRequest(const Request* const request)

bool Worker::tagProbe(ucp_tag_t tag)
{
// TODO: Fix temporary workaround, if progress thread is active we must wait for it
// to progress the worker instead.
progress();
wence- marked this conversation as resolved.
Show resolved Hide resolved

ucp_tag_recv_info_t info;
ucp_tag_message_h tag_message = ucp_tag_probe_nb(_handle, tag, -1, 0, &info);

Expand Down
16 changes: 13 additions & 3 deletions python/ucxx/_lib_async/application_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def __init__(
self.progress_mode = ApplicationContext._check_progress_mode(progress_mode)

enable_delayed_submission = ApplicationContext._check_enable_delayed_submission(
enable_delayed_submission
enable_delayed_submission,
self.progress_mode,
)
enable_python_future = ApplicationContext._check_enable_python_future(
enable_python_future, self.progress_mode
Expand Down Expand Up @@ -84,7 +85,7 @@ def _check_progress_mode(progress_mode):
return progress_mode

@staticmethod
def _check_enable_delayed_submission(enable_delayed_submission):
def _check_enable_delayed_submission(enable_delayed_submission, progress_mode):
if enable_delayed_submission is None:
if "UCXPY_ENABLE_DELAYED_SUBMISSION" in os.environ:
explicit_enable_delayed_submission = (
Expand All @@ -93,10 +94,19 @@ def _check_enable_delayed_submission(enable_delayed_submission):
else True
)
else:
explicit_enable_delayed_submission = True
explicit_enable_delayed_submission = progress_mode.startswith("thread")
else:
explicit_enable_delayed_submission = enable_delayed_submission

if (
not progress_mode.startswith("thread")
and explicit_enable_delayed_submission
):
raise ValueError(
f"Delayed submission requested, but {progress_mode} does not "
"support it, 'thread' or 'thread-polling' progress mode required."
)

return explicit_enable_delayed_submission

@staticmethod
Expand Down
14 changes: 12 additions & 2 deletions python/ucxx/_lib_async/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,15 @@ async def recv(self, buffer, tag=None, force_tag=False):
elif not force_tag:
tag = hash64bits(self._tags["msg_recv"], hash(tag))

if not self._ctx.worker.tag_probe(tag):
try:
self._ep.raise_on_error()
if self.closed():
raise UCXCloseError("Endpoint closed")
except Exception as e:
# Only probe the worker as last resort. To be reliable, probing for the tag
# requires progressing the worker, thus prevent that happening too often.
if not self._ctx.worker.tag_probe(tag):
raise e

if not isinstance(buffer, Array):
buffer = Array(buffer)
Expand Down Expand Up @@ -293,10 +298,15 @@ async def recv_multi(self, tag=None, force_tag=False):
elif not force_tag:
tag = hash64bits(self._tags["msg_recv"], hash(tag))

if not self._ctx.worker.tag_probe(tag):
try:
self._ep.raise_on_error()
if self.closed():
raise UCXCloseError("Endpoint closed")
except Exception as e:
# Only probe the worker as last resort. To be reliable, probing for the tag
# requires progressing the worker, thus prevent that happening too often.
if not self._ctx.worker.tag_probe(tag):
raise e

# Optimization to eliminate producing logger string overhead
if logger.isEnabledFor(logging.DEBUG):
Expand Down
43 changes: 32 additions & 11 deletions python/ucxx/_lib_async/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@
async def test_worker_capabilities_args(
enable_delayed_submission, enable_python_future
):
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)
progress_mode = os.getenv("UCXPY_PROGRESS_MODE", "thread")

worker = ucxx.core._get_ctx().worker
if enable_delayed_submission and not progress_mode.startswith("thread"):
with pytest.raises(ValueError, match="Delayed submission requested, but"):
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)
else:
ucxx.init(
enable_delayed_submission=enable_delayed_submission,
enable_python_future=enable_python_future,
)

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
assert worker.is_python_future_enabled() is enable_python_future
worker = ucxx.core._get_ctx().worker

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
if progress_mode.startswith("thread"):
assert worker.is_python_future_enabled() is enable_python_future
else:
assert worker.is_python_future_enabled() is False


@pytest.mark.asyncio
Expand All @@ -39,9 +51,18 @@ async def test_worker_capabilities_env(enable_delayed_submission, enable_python_
"UCXPY_ENABLE_PYTHON_FUTURE": "1" if enable_python_future else "0",
},
):
ucxx.init()
progress_mode = os.getenv("UCXPY_PROGRESS_MODE", "thread")

worker = ucxx.core._get_ctx().worker
if enable_delayed_submission and not progress_mode.startswith("thread"):
with pytest.raises(ValueError, match="Delayed submission requested, but"):
ucxx.init()
else:
ucxx.init()

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
assert worker.is_python_future_enabled() is enable_python_future
worker = ucxx.core._get_ctx().worker

assert worker.is_delayed_submission_enabled() is enable_delayed_submission
if progress_mode.startswith("thread"):
assert worker.is_python_future_enabled() is enable_python_future
else:
assert worker.is_python_future_enabled() is False