Skip to content

Commit

Permalink
Reorganize delayed submission with operation-specific data and expose…
Browse files Browse the repository at this point in the history
… tag mask (#121)

Add new class to organize operation-specific data, making members optional depending on what a specific transfer operation requires and checking their validity at construction time.

Expose tag mask to tag and multi-buffer tag APIs.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #121
  • Loading branch information
pentschev authored Dec 12, 2023
1 parent 13a663b commit c84c0f9
Show file tree
Hide file tree
Showing 44 changed files with 1,244 additions and 745 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ add_library(
src/log.cpp
src/request.cpp
src/request_am.cpp
src/request_data.cpp
src/request_helper.cpp
src/request_stream.cpp
src/request_tag.cpp
Expand Down
12 changes: 7 additions & 5 deletions cpp/benchmarks/perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enum class ProgressMode {
enum transfer_type_t { SEND, RECV };

typedef std::unordered_map<transfer_type_t, std::vector<char>> BufferMap;
typedef std::unordered_map<transfer_type_t, ucp_tag_t> TagMap;
typedef std::unordered_map<transfer_type_t, ucxx::Tag> TagMap;

typedef std::shared_ptr<BufferMap> BufferMapPtr;
typedef std::shared_ptr<TagMap> TagMapPtr;
Expand Down Expand Up @@ -267,7 +267,8 @@ auto doTransfer(const app_context_t& app_context,
auto start = std::chrono::high_resolution_clock::now();
std::vector<std::shared_ptr<ucxx::Request>> requests = {
endpoint->tagSend((*bufferMap)[SEND].data(), app_context.message_size, (*tagMap)[SEND]),
endpoint->tagRecv((*bufferMap)[RECV].data(), app_context.message_size, (*tagMap)[RECV])};
endpoint->tagRecv(
(*bufferMap)[RECV].data(), app_context.message_size, (*tagMap)[RECV], ucxx::TagMaskFull)};

// Wait for requests and clear requests
waitRequests(app_context.progress_mode, worker, requests);
Expand All @@ -292,8 +293,8 @@ int main(int argc, char** argv)

bool is_server = app_context.server_addr == NULL;
auto tagMap = std::make_shared<TagMap>(TagMap{
{SEND, is_server ? 0 : 1},
{RECV, is_server ? 1 : 0},
{SEND, is_server ? ucxx::Tag{0} : ucxx::Tag{1}},
{RECV, is_server ? ucxx::Tag{1} : ucxx::Tag{0}},
});

std::shared_ptr<ListenerContext> listener_ctx;
Expand Down Expand Up @@ -337,7 +338,8 @@ int main(int argc, char** argv)
(*tagMap)[SEND]));
requests.push_back(endpoint->tagRecv((*wireupBufferMap)[RECV].data(),
(*wireupBufferMap)[RECV].size() * sizeof(int),
(*tagMap)[RECV]));
(*tagMap)[RECV],
ucxx::TagMaskFull));

// Wait for wireup requests and clear requests
waitRequests(app_context.progress_mode, worker, requests);
Expand Down
24 changes: 13 additions & 11 deletions cpp/examples/basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,25 +202,27 @@ int main(int argc, char** argv)

// Schedule small wireup messages to let UCX identify capabilities between endpoints
requests.push_back(listener_ctx->getEndpoint()->tagSend(
sendWireupBuffer.data(), sendWireupBuffer.size() * sizeof(int), 0));
requests.push_back(
endpoint->tagRecv(recvWireupBuffer.data(), sendWireupBuffer.size() * sizeof(int), 0));
sendWireupBuffer.data(), sendWireupBuffer.size() * sizeof(int), ucxx::Tag{0}));
requests.push_back(endpoint->tagRecv(recvWireupBuffer.data(),
sendWireupBuffer.size() * sizeof(int),
ucxx::Tag{0},
ucxx::TagMaskFull));
::waitRequests(progress_mode, worker, requests);
requests.clear();

// Schedule send and recv messages on different tags and different ordering
requests.push_back(listener_ctx->getEndpoint()->tagSend(
sendBuffers[0].data(), sendBuffers[0].size() * sizeof(int), 0));
sendBuffers[0].data(), sendBuffers[0].size() * sizeof(int), ucxx::Tag{0}));
requests.push_back(listener_ctx->getEndpoint()->tagRecv(
recvBuffers[1].data(), recvBuffers[1].size() * sizeof(int), 1));
recvBuffers[1].data(), recvBuffers[1].size() * sizeof(int), ucxx::Tag{1}, ucxx::TagMaskFull));
requests.push_back(listener_ctx->getEndpoint()->tagSend(
sendBuffers[2].data(), sendBuffers[2].size() * sizeof(int), 2));
requests.push_back(
endpoint->tagRecv(recvBuffers[2].data(), recvBuffers[2].size() * sizeof(int), 2));
requests.push_back(
endpoint->tagSend(sendBuffers[1].data(), sendBuffers[1].size() * sizeof(int), 1));
sendBuffers[2].data(), sendBuffers[2].size() * sizeof(int), ucxx::Tag{2}, ucxx::TagMaskFull));
requests.push_back(endpoint->tagRecv(
recvBuffers[2].data(), recvBuffers[2].size() * sizeof(int), ucxx::Tag{2}, ucxx::TagMaskFull));
requests.push_back(
endpoint->tagRecv(recvBuffers[0].data(), recvBuffers[0].size() * sizeof(int), 0));
endpoint->tagSend(sendBuffers[1].data(), sendBuffers[1].size() * sizeof(int), ucxx::Tag{1}));
requests.push_back(endpoint->tagRecv(
recvBuffers[0].data(), recvBuffers[0].size() * sizeof(int), ucxx::Tag{0}, ucxx::TagMaskFull));

// Wait for requests to be set, i.e., transfers complete
::waitRequests(progress_mode, worker, requests);
Expand Down
1 change: 1 addition & 0 deletions cpp/include/ucxx/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
#include <ucxx/request.h>
#include <ucxx/request_tag_multi.h>
#include <ucxx/typedefs.h>
#include <ucxx/utils/callback_notifier.h>
#include <ucxx/worker.h>
62 changes: 24 additions & 38 deletions cpp/include/ucxx/constructors.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <string>
#include <vector>

#include <ucxx/request_data.h>
#include <ucxx/typedefs.h>

namespace ucxx {
Expand Down Expand Up @@ -55,43 +56,28 @@ std::shared_ptr<Worker> createWorker(std::shared_ptr<Context> context,
const bool enableFuture);

// Transfers
std::shared_ptr<RequestAm> createRequestAmSend(std::shared_ptr<Endpoint> endpoint,
void* buffer,
size_t length,
ucs_memory_type_t memoryType,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestAm> createRequestAmRecv(std::shared_ptr<Endpoint> endpoint,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestStream> createRequestStream(std::shared_ptr<Endpoint> endpoint,
bool send,
void* buffer,
size_t length,
const bool enablePythonFuture);

std::shared_ptr<RequestTag> createRequestTag(std::shared_ptr<Component> endpointOrWorker,
bool send,
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestTagMulti> createRequestTagMultiSend(std::shared_ptr<Endpoint> endpoint,
const std::vector<void*>& buffer,
const std::vector<size_t>& size,
const std::vector<int>& isCUDA,
const ucp_tag_t tag,
const bool enablePythonFuture);

std::shared_ptr<RequestTagMulti> createRequestTagMultiRecv(std::shared_ptr<Endpoint> endpoint,
const ucp_tag_t tag,
const bool enablePythonFuture);
std::shared_ptr<RequestAm> createRequestAm(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::AmSend, data::AmReceive> requestData,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestStream> createRequestStream(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::StreamSend, data::StreamReceive> requestData,
const bool enablePythonFuture);

std::shared_ptr<RequestTag> createRequestTag(
std::shared_ptr<Component> endpointOrWorker,
const std::variant<data::TagSend, data::TagReceive> requestData,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestTagMulti> createRequestTagMulti(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::TagMultiSend, data::TagMultiReceive> requestData,
const bool enablePythonFuture);

} // namespace ucxx
43 changes: 5 additions & 38 deletions cpp/include/ucxx/delayed_submission.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,56 +7,23 @@
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include <ucp/api/ucp.h>
#include <ucs/memory/memory_type.h>

#include <ucxx/log.h>
#include <ucxx/request_data.h>

namespace ucxx {

typedef std::function<void()> DelayedSubmissionCallbackType;

class DelayedSubmission {
public:
bool _send{false}; ///< Whether this is a send (`true`) operation or recv (`false`)
void* _buffer{nullptr}; ///< Raw pointer to data buffer
size_t _length{0}; ///< Length of the message in bytes
ucp_tag_t _tag{0}; ///< Tag to match
ucs_memory_type_t _memoryType{UCS_MEMORY_TYPE_UNKNOWN}; ///< Buffer memory type

DelayedSubmission() = delete;

/**
* @brief Constructor for a delayed submission operation.
*
* Construct a delayed submission operation. Delayed submission means that a transfer
* operation will not be submitted immediately, but will rather be delayed for the next
* progress iteration.
*
* This may be useful to avoid any transfer operations to be executed directly in the
* application thread, delaying all of them for the worker progress thread when enabled.
* With this approach any perceived overhead will be removed from the application thread,
* and thus provide some speedup in certain situations. It may be also useful to prevent
* a multi-threaded application for blocking while waiting for the UCX spinlock, since
* all transfer operations may be pushed to the worker progress thread.
*
* @param[in] send whether this is a send (`true`) or receive (`false`) operation.
* @param[in] buffer a raw pointer to the data being transferred.
* @param[in] length the size in bytes of the message being transfer.
* @param[in] tag tag to match for this operation (only applies for tag
* operations).
* @param[in] memoryType the memory type of the buffer.
*/
DelayedSubmission(const bool send,
void* buffer,
const size_t length,
const ucp_tag_t tag = 0,
const ucs_memory_type_t memoryType = UCS_MEMORY_TYPE_UNKNOWN);
};

template <typename T>
class BaseDelayedSubmissionCollection {
protected:
Expand Down
13 changes: 9 additions & 4 deletions cpp/include/ucxx/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ class Endpoint : public Component {
*/
std::shared_ptr<Request> tagSend(void* buffer,
size_t length,
ucp_tag_t tag,
Tag tag,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);
Expand All @@ -408,6 +408,7 @@ class Endpoint : public Component {
* data will be stored.
* @param[in] length the size in bytes of the tag message to be received.
* @param[in] tag the tag to match.
* @param[in] tagMask the tag mask to use.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
* @param[in] callbackFunction user-defined callback function to call upon completion.
Expand All @@ -417,7 +418,8 @@ class Endpoint : public Component {
*/
std::shared_ptr<Request> tagRecv(void* buffer,
size_t length,
ucp_tag_t tag,
Tag tag,
TagMask tagMask,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);
Expand Down Expand Up @@ -460,7 +462,7 @@ class Endpoint : public Component {
std::shared_ptr<Request> tagMultiSend(const std::vector<void*>& buffer,
const std::vector<size_t>& size,
const std::vector<int>& isCUDA,
const ucp_tag_t tag,
const Tag tag,
const bool enablePythonFuture);

/**
Expand All @@ -479,12 +481,15 @@ class Endpoint : public Component {
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] tag the tag to match.
* @param[in] tagMask the tag mask to use.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> tagMultiRecv(const ucp_tag_t tag, const bool enablePythonFuture);
std::shared_ptr<Request> tagMultiRecv(const Tag tag,
const TagMask tagMask,
const bool enablePythonFuture);

/**
* @brief Get `ucxx::Worker` component from a worker or listener object.
Expand Down
8 changes: 4 additions & 4 deletions cpp/include/ucxx/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <ucxx/component.h>
#include <ucxx/endpoint.h>
#include <ucxx/future.h>
#include <ucxx/request_data.h>
#include <ucxx/typedefs.h>

#define ucxx_trace_req_f(_owner, _req, _name, _message, ...) \
Expand All @@ -34,9 +35,8 @@ class Request : public Component {
std::shared_ptr<Endpoint> _endpoint{
nullptr}; ///< Endpoint that generated request (if not from worker)
std::string _ownerString{
"undetermined owner"}; ///< String to print owner (endpoint or worker) when logging
std::shared_ptr<DelayedSubmission> _delayedSubmission{
nullptr}; ///< The submission object that will dispatch the request
"undetermined owner"}; ///< String to print owner (endpoint or worker) when logging
data::RequestData _requestData{}; ///< The operation-specific data to be used in the request
std::string _operationName{
"request_undefined"}; ///< Human-readable operation name, mostly used for log messages
std::recursive_mutex _mutex{}; ///< Mutex to prevent checking status while it's being set
Expand All @@ -62,7 +62,7 @@ class Request : public Component {
* subsequently notified.
*/
Request(std::shared_ptr<Component> endpointOrWorker,
std::shared_ptr<DelayedSubmission> delayedSubmission,
const data::RequestData requestData,
const std::string operationName,
const bool enablePythonFuture = false);

Expand Down
Loading

0 comments on commit c84c0f9

Please sign in to comment.