Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Active Messages support #48

Merged
merged 40 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e52c099
Fix and unify `waitRequests` in a template
pentschev May 15, 2023
2a3c0cb
Add new `RequestAM` class to handle Active Messages
pentschev May 15, 2023
b34d383
Register AM handler in `ucxx::Worker`
pentschev May 15, 2023
c9be9bd
Expose Active Messages via `ucxx::Endpoint`
pentschev May 15, 2023
8cfce99
Add C++ tests for Active Messages
pentschev May 15, 2023
7725db9
Document `ucxx::RequestAM`
pentschev May 15, 2023
d771d05
Use `shared_ptr` for `AmData::_worker``
pentschev May 16, 2023
6bcb5d4
Add Python interface for Active Messages
pentschev May 17, 2023
d6d03c5
Add `ucxx::Worker::amProbe`
pentschev May 17, 2023
89ed4f2
Add Python `amProbe` support
pentschev May 17, 2023
09040e7
Add different memory types support for Active Messages
pentschev May 17, 2023
e6d96b9
Add RMM AM allocator to Python
pentschev May 18, 2023
457c454
Make class name consistent
pentschev May 19, 2023
dece623
Fix `ucxx::RequestAm` trace logging
pentschev May 19, 2023
3146907
Merge remote-tracking branch 'upstream/branch-0.32' into active-messages
pentschev May 19, 2023
bbf6e27
Docstrings fixes
pentschev Jun 1, 2023
ffdac10
Delete `ucxx::internal::RecvAmMessage` default/move/copy constructors
pentschev Jun 1, 2023
1484242
Merge remote-tracking branch 'upstream/branch-0.32' into active-messages
pentschev Jun 1, 2023
bb4ff77
Use more appropriate typecasting
pentschev Jun 2, 2023
cbd9994
Move declaration to initialization
pentschev Jun 2, 2023
5f6a207
Improve handling of `recvPool` and `recvWait`
pentschev Jun 2, 2023
29a5516
Improve `waitRequests` C++ test util template
pentschev Jun 2, 2023
bd1787f
Improve Python logger string formatting
pentschev Jun 2, 2023
d7dcea0
Inherit `ucxx::Component` by `ucxx::DelayedSubmission`
pentschev Jun 5, 2023
6eb5f95
Clarify the status of `ucxx::Request::getRecvBuffer()`
pentschev Jun 5, 2023
89dceeb
Merge remote-tracking branch 'upstream/branch-0.32' into active-messages
pentschev Jun 6, 2023
8770045
Fix for API changes
pentschev Jun 6, 2023
f52f145
Pass missing `enableFuture` argument to `ucxx::Python::createWorker()`
pentschev Jun 6, 2023
94d7175
Merge remote-tracking branch 'upstream/branch-0.33' into active-messages
pentschev Jun 6, 2023
e7e1290
Replace `posix` transport with `self` in `test_check_transport`
pentschev Jun 7, 2023
373802e
Fix null pointer dereferencing in log
pentschev Jun 7, 2023
5c55f82
Revert "Inherit `ucxx::Component` by `ucxx::DelayedSubmission`"
pentschev Jun 7, 2023
88e8b34
Ensure `ucxx::Request` lifetime for delayed submission
pentschev Jun 5, 2023
11769ea
Prevent enabling delayed submission in non-thread progress mode
pentschev May 31, 2023
9865724
Merge remote-tracking branch 'upstream/branch-0.33' into active-messages
pentschev Jun 9, 2023
09ebeda
Remove now irrelevant TODO comment
pentschev Jun 9, 2023
bd5ac66
Remove `const` qualifier from `ucxx::Worker::tagProbe()`
pentschev Jun 9, 2023
a24ca65
Replace `std::make_pair` by initializer list
pentschev Jun 9, 2023
3a3aa03
Limit progress mode for `test_send_recv_am`
pentschev Jun 9, 2023
9ebf7ec
Merge remote-tracking branch 'origin/active-messages' into active-mes…
pentschev Jun 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cpp/include/ucxx/delayed_submission.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,18 @@
#include <functional>
#include <memory>
#include <mutex>
#include <utility>
#include <vector>

#include <ucp/api/ucp.h>

#include <ucxx/component.h>
#include <ucxx/log.h>

namespace ucxx {

typedef std::function<void()> DelayedSubmissionCallbackType;

typedef std::shared_ptr<DelayedSubmissionCallbackType> DelayedSubmissionCallbackPtrType;

class DelayedSubmission : public Component {
class DelayedSubmission {
public:
bool _send{false}; ///< Whether this is a send (`true`) operation or recv (`false`)
void* _buffer{nullptr}; ///< Raw pointer to data buffer
Expand Down Expand Up @@ -60,7 +58,7 @@ class DelayedSubmission : public Component {

class DelayedSubmissionCollection {
private:
std::vector<DelayedSubmissionCallbackPtrType>
std::vector<std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType>>
_collection{}; ///< The collection of all known delayed submission operations.
std::mutex _mutex{}; ///< Mutex to provide access to the collection.

Expand Down Expand Up @@ -95,10 +93,12 @@ class DelayedSubmissionCollection {
* Register a request for delayed submission with a callback that will be executed when
* the request is in fact submitted when `process()` is called.
*
* @param[in] request the request to which the callback belongs, ensuring it remains
* alive until the callback is invoked.
* @param[in] callback the callback that will be executed by `process()` when the
* operation is submitted.
*/
void registerRequest(DelayedSubmissionCallbackType callback);
void registerRequest(std::shared_ptr<Request> request, DelayedSubmissionCallbackType callback);
};

} // namespace ucxx
5 changes: 4 additions & 1 deletion cpp/include/ucxx/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,13 @@ class Worker : public Component {
* thread, thus decreasing computation on the caller thread, but potentially increasing
* transfer latency.
*
* @param[in] request the request to which the callback belongs, ensuring it remains
* alive until the callback is invoked.
* @param[in] callback the callback set to execute the UCP transfer routine during the
* worker thread loop.
*/
void registerDelayedSubmission(DelayedSubmissionCallbackType callback);
void registerDelayedSubmission(std::shared_ptr<Request> request,
DelayedSubmissionCallbackType callback);

/**
* @brief Inquire if worker has been created with delayed submission enabled.
Expand Down
17 changes: 8 additions & 9 deletions cpp/src/delayed_submission.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,25 @@ void DelayedSubmissionCollection::process()
toProcess = std::move(_collection);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does std::move do to _collection? Does it just drain the vector, leaving it "as-if" it were empty again? I guess so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, that is the standard behavior for the move constructor and operator.

}

for (auto& callbackPtr : toProcess) {
auto& callback = *callbackPtr;
for (auto& pair : toProcess) {
auto& req = pair.first;
auto& callback = pair.second;

ucxx_trace_req("Submitting request: %p", callback.target<void (*)(std::shared_ptr<void>)>());
ucxx_trace_req("Submitting request: %p", req.get());

if (callback) callback();
}
}
}

void DelayedSubmissionCollection::registerRequest(DelayedSubmissionCallbackType callback)
void DelayedSubmissionCollection::registerRequest(std::shared_ptr<Request> request,
DelayedSubmissionCallbackType callback)
{
auto r = std::make_shared<DelayedSubmissionCallbackType>(callback);

{
std::lock_guard<std::mutex> lock(_mutex);
_collection.push_back(r);
_collection.push_back(std::make_pair(request, callback));
pentschev marked this conversation as resolved.
Show resolved Hide resolved
}
ucxx_trace_req("Registered submit request: %p",
callback.target<void (*)(std::shared_ptr<void>)>());
ucxx_trace_req("Registered submit request: %p", request.get());
}

} // namespace ucxx
1 change: 0 additions & 1 deletion cpp/src/internal/request_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ RecvAmMessage::RecvAmMessage(internal::AmData* amData,
{
_request->_delayedSubmission =
std::make_shared<DelayedSubmission>(false, _buffer->data(), _buffer->getSize());
_request->_delayedSubmission->setParent(_request);
}

void RecvAmMessage::setUcpRequest(void* request) { _request->_request = request; }
Expand Down
69 changes: 42 additions & 27 deletions cpp/src/request_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ std::shared_ptr<RequestAm> createRequestAmSend(
auto req = std::shared_ptr<RequestAm>(new RequestAm(
endpoint, buffer, length, memoryType, enablePythonFuture, callbackFunction, callbackData));

req->_delayedSubmission->setParent(req);
// A delayed notification request is not populated immediately, instead it is
// delayed to allow the worker progress thread to set its status, and more
// importantly the Python future later on, so that we don't need the GIL here.
req->_worker->registerDelayedSubmission(
req, std::bind(std::mem_fn(&Request::populateDelayedSubmission), req.get()));

return req;
}

Expand Down Expand Up @@ -61,12 +66,6 @@ RequestAm::RequestAm(std::shared_ptr<Endpoint> endpoint,
{
_callback = callbackFunction;
_callbackData = callbackData;

// A delayed notification request is not populated immediately, instead it is
// delayed to allow the worker progress thread to set its status, and more
// importantly the Python future later on, so that we don't need the GIL here.
_worker->registerDelayedSubmission(
std::bind(std::mem_fn(&Request::populateDelayedSubmission), this));
}

RequestAm::RequestAm(std::shared_ptr<Component> endpointOrWorker,
Expand Down Expand Up @@ -165,16 +164,24 @@ ucs_status_t RequestAm::recvCallback(void* arg,
ucs_status_ptr_t status =
ucp_am_recv_data_nbx(worker->getHandle(), data, buf->data(), length, &request_param);

ucxx_trace_req_f(
ownerString.c_str(),
status,
"amRecv rndv",
"ep %p, buffer %p, size %lu, future %p, future handle %p, populateDelayedSubmission",
ep,
buf->data(),
length,
req->_future.get(),
req->_future->getHandle());
if (req->_enablePythonFuture)
ucxx_trace_req_f(ownerString.c_str(),
status,
"amRecv rndv",
"ep %p, buffer %p, size %lu, future %p, future handle %p, recvCallback",
ep,
buf->data(),
length,
req->_future.get(),
req->_future->getHandle());
else
ucxx_trace_req_f(ownerString.c_str(),
status,
"amRecv rndv",
"ep %p, buffer %p, size %lu, recvCallback",
ep,
buf->data(),
length);

if (req->isCompleted()) {
// The request completed/errored immediately
Expand All @@ -198,16 +205,24 @@ ucs_status_t RequestAm::recvCallback(void* arg,
std::shared_ptr<Buffer> buf = amData->_allocators.at(UCS_MEMORY_TYPE_HOST)(length);
if (length > 0) memcpy(buf->data(), data, length);

ucxx_trace_req_f(
ownerString.c_str(),
nullptr,
"amRecv eager",
"ep: %p, buffer %p, size %lu, future %p, future handle %p, populateDelayedSubmission",
ep,
buf->data(),
length,
req->_future.get(),
req->_future->getHandle());
if (req->_enablePythonFuture)
ucxx_trace_req_f(ownerString.c_str(),
nullptr,
"amRecv eager",
"ep: %p, buffer %p, size %lu, future %p, future handle %p, recvCallback",
ep,
buf->data(),
length,
req->_future.get(),
req->_future->getHandle());
else
ucxx_trace_req_f(ownerString.c_str(),
nullptr,
"amRecv eager",
"ep: %p, buffer %p, size %lu, recvCallback",
ep,
buf->data(),
length);

internal::RecvAmMessage recvAmMessage(amData, ep, req, buf);
recvAmMessage.callback(nullptr, UCS_OK);
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/request_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ RequestStream::RequestStream(std::shared_ptr<Endpoint> endpoint,
enablePythonFuture),
_length(length)
{
auto worker = endpoint->getWorker();

// A delayed notification request is not populated immediately, instead it is
// delayed to allow the worker progress thread to set its status, and more
// importantly the Python future later on, so that we don't need the GIL here.
worker->registerDelayedSubmission(
std::bind(std::mem_fn(&Request::populateDelayedSubmission), this));
}

std::shared_ptr<RequestStream> createRequestStream(std::shared_ptr<Endpoint> endpoint,
Expand All @@ -41,7 +34,12 @@ std::shared_ptr<RequestStream> createRequestStream(std::shared_ptr<Endpoint> end
auto req = std::shared_ptr<RequestStream>(
new RequestStream(endpoint, send, buffer, length, enablePythonFuture));

req->_delayedSubmission->setParent(req);
// A delayed notification request is not populated immediately, instead it is
// delayed to allow the worker progress thread to set its status, and more
// importantly the Python future later on, so that we don't need the GIL here.
req->_worker->registerDelayedSubmission(
req, std::bind(std::mem_fn(&Request::populateDelayedSubmission), req.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this needs to be moved from the constructor because we need the shared_ptr to this which isn't available.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not the friendliest design, but it is the best we can do here (I think).


return req;
}

Expand Down
13 changes: 6 additions & 7 deletions cpp/src/request_tag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ std::shared_ptr<RequestTag> createRequestTag(std::shared_ptr<Component> endpoint
callbackFunction,
callbackData));

req->_delayedSubmission->setParent(req);
// A delayed notification request is not populated immediately, instead it is
// delayed to allow the worker progress thread to set its status, and more
// importantly the Python future later on, so that we don't need the GIL here.
req->_worker->registerDelayedSubmission(
req, std::bind(std::mem_fn(&Request::populateDelayedSubmission), req.get()));

return req;
}

Expand All @@ -53,12 +58,6 @@ RequestTag::RequestTag(std::shared_ptr<Component> endpointOrWorker,
throw ucxx::Error("An endpoint is required to send tag messages");
_callback = callbackFunction;
_callbackData = callbackData;

// A delayed notification request is not populated immediately, instead it is
// delayed to allow the worker progress thread to set its status, and more
// importantly the Python future later on, so that we don't need the GIL here.
_worker->registerDelayedSubmission(
std::bind(std::mem_fn(&Request::populateDelayedSubmission), this));
}

void RequestTag::callback(void* request, ucs_status_t status, const ucp_tag_recv_info_t* info)
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,13 @@ bool Worker::progress()
return ret;
}

void Worker::registerDelayedSubmission(DelayedSubmissionCallbackType callback)
void Worker::registerDelayedSubmission(std::shared_ptr<Request> request,
DelayedSubmissionCallbackType callback)
{
if (_delayedSubmissionCollection == nullptr) {
callback();
} else {
_delayedSubmissionCollection->registerRequest(callback);
_delayedSubmissionCollection->registerRequest(request, callback);

/* Waking the progress event is needed here because the UCX request is
* not dispatched immediately. Thus we must signal the progress task so
Expand Down