Skip to content

Commit

Permalink
Merge pull request openucx#7 from alinask/topic/ucp-rndv-req-id-hash
Browse files Browse the repository at this point in the history
UCP: create a hash of req_id to req_ptrs to be used for rndv operations
  • Loading branch information
yosefe authored Apr 28, 2020
2 parents a89f885 + e3537fc commit ca9d458
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ struct ucp_request {
size_t length; /* Total length, in bytes */
ucs_memory_type_t mem_type; /* Memory type */
ucp_send_callback_t cb; /* Completion callback */
uint64_t rndv_req_id;

union {
ucp_wireup_msg_t wireup;
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1708,11 +1708,13 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
worker->num_ifaces = 0;
worker->am_message_id = ucs_generate_uuid(0);
worker->rkey_ptr_cb_id = UCS_CALLBACKQ_ID_NULL;
worker->rndv_req_id = 1;
ucs_queue_head_init(&worker->rkey_ptr_reqs);
ucs_list_head_init(&worker->arm_ifaces);
ucs_list_head_init(&worker->stream_ready_eps);
ucs_list_head_init(&worker->all_eps);
kh_init_inplace(ucp_worker_ep_ptrs, &worker->ep_ptrs);
kh_init_inplace(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs);
ucp_ep_match_init(&worker->ep_match_ctx);
ucs_list_head_init(&worker->rndv_reqs_list);

Expand Down Expand Up @@ -1860,6 +1862,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
UCS_STATS_NODE_FREE(worker->stats);
err_free:
ucs_strided_alloc_cleanup(&worker->ep_alloc);
kh_destroy_inplace(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs);
kh_destroy_inplace(ucp_worker_ep_ptrs, &worker->ep_ptrs);
ucs_free(worker);
return status;
Expand Down Expand Up @@ -1908,6 +1911,7 @@ void ucp_worker_destroy(ucp_worker_h worker)
ucs_mpool_cleanup(&worker->req_mp, 1);
uct_worker_destroy(worker->uct);
ucs_async_context_cleanup(&worker->async);
kh_destroy_inplace(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs);
kh_destroy_inplace(ucp_worker_ep_ptrs, &worker->ep_ptrs);
ucp_ep_match_cleanup(&worker->ep_match_ctx);
ucs_strided_alloc_cleanup(&worker->ep_alloc);
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ typedef struct ucp_worker_am_entry {
} ucp_worker_am_entry_t;

KHASH_SET_INIT_INT64(ucp_worker_ep_ptrs)
KHASH_MAP_INIT_INT64(ucp_worker_rndv_req_ptrs, uintptr_t)

/**
* UCP worker (thread context).
Expand Down Expand Up @@ -230,6 +231,8 @@ typedef struct ucp_worker {
ucs_list_link_t all_eps; /* List of all endpoints */

khash_t(ucp_worker_ep_ptrs) ep_ptrs;
khash_t(ucp_worker_rndv_req_ptrs) rndv_req_ptrs;
uint64_t rndv_req_id;

ucp_ep_match_ctx_t ep_match_ctx; /* Endpoint-to-endpoint matching context */
ucp_worker_iface_t **ifaces; /* Array of pointers to interfaces,
Expand Down
95 changes: 85 additions & 10 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,28 @@ ucp_rndv_req_get_zcopy_rma_lane(ucp_request_t *rndv_req, ucp_lane_map_t ignore,

static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status)
{
ucp_worker_h worker;
khiter_t iter;

ucp_request_send_generic_dt_finish(sreq);
ucp_request_send_buffer_dereg(sreq);
if (sreq->flags & UCP_REQUEST_FLAG_CANCELED) {
ucs_list_del(&sreq->send.list);
}

/* remove from rndv sreqs hash */
worker = sreq->send.ep->worker;
iter = kh_get(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs,
sreq->send.rndv_req_id);
if (iter != kh_end(&worker->rndv_req_ptrs)) {
kh_del(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs, iter);
ucs_debug("removed sreq %p, key %zu, worker %p",
sreq, sreq->send.rndv_req_id, worker);
} else {
ucs_warn("sreq %p (req_id = %zu) does not exist on worker %p",
sreq, sreq->send.rndv_req_id, worker);
}

ucp_request_complete_send(sreq, status);
}

Expand All @@ -69,7 +86,7 @@ size_t ucp_tag_rndv_rts_pack(void *dest, void *arg)
ssize_t packed_rkey_size;

rndv_rts_hdr->super.tag = sreq->send.msg_proto.tag.tag;
rndv_rts_hdr->sreq.reqptr = (uintptr_t)sreq;
rndv_rts_hdr->sreq.reqptr = sreq->send.rndv_req_id;
rndv_rts_hdr->sreq.ep_ptr = ucp_request_get_dest_ep_ptr(sreq);
rndv_rts_hdr->size = sreq->send.length;
rndv_rts_hdr->status = UCS_OK;
Expand Down Expand Up @@ -135,7 +152,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_cancel, (self),
sreq->send.lane = ucp_ep_get_am_lane(ep);

rndv_rts_hdr.super.tag = sreq->send.msg_proto.tag.tag;
rndv_rts_hdr.sreq.reqptr = (uintptr_t)sreq;
rndv_rts_hdr.sreq.reqptr = sreq->send.rndv_req_id;
rndv_rts_hdr.sreq.ep_ptr = ucp_request_get_dest_ep_ptr(sreq);
rndv_rts_hdr.size = sreq->send.length;
rndv_rts_hdr.status = UCS_ERR_CANCELED;
Expand Down Expand Up @@ -240,7 +257,10 @@ ucs_status_t ucp_tag_rndv_reg_send_buffer(ucp_request_t *sreq)
ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq)
{
ucp_ep_h ep = sreq->send.ep;
ucp_worker_h worker = ep->worker;
ucs_status_t status;
khiter_t khiter;
int ret;

ucp_trace_req(sreq, "start_rndv to %s buffer %p length %zu",
ucp_ep_peer_name(ep), sreq->send.buffer,
Expand All @@ -262,6 +282,23 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq)
status = ucp_tag_rndv_reg_send_buffer(sreq);
}

/* add the rndv send request to a hash on the worker. the key is a unique
* value on the worker */
khiter = kh_put(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs,
worker->rndv_req_id, &ret);
if (ret < 1 ) {
ucs_warn("failed to add rndv req id (%zu) to worker %p rndv req ptrs hash",
worker->rndv_req_id, worker);
}

sreq->send.rndv_req_id = worker->rndv_req_id;
kh_value(&worker->rndv_req_ptrs, khiter) = (uintptr_t)sreq;

ucs_debug("added sreq %p to hash with key %zu. worker %p",
sreq, sreq->send.rndv_req_id, worker);

worker->rndv_req_id++;

return status;
}

Expand Down Expand Up @@ -996,7 +1033,7 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr),
UCS_ASYNC_UNBLOCK(&worker->async);
}

static void ucp_rdnv_send_cancel_ack(ucp_worker_h worker,
static void ucp_rndv_send_cancel_ack(ucp_worker_h worker,
ucp_rndv_rts_hdr_t *rndv_rts_hdr)
{
ucp_request_t *req;
Expand Down Expand Up @@ -1036,7 +1073,7 @@ static void ucp_rndv_unexp_cancel(ucp_worker_h worker,
"tag %"PRIx64, UCP_RECV_DESC_ARG(rdesc),
ucp_rdesc_get_tag(rdesc));
ucp_tag_unexp_remove(rdesc);
ucp_rdnv_send_cancel_ack(worker, rndv_rts_hdr);
ucp_rndv_send_cancel_ack(worker, rndv_rts_hdr);
ucp_recv_desc_release(rdesc);
return;
}
Expand Down Expand Up @@ -1079,6 +1116,24 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length,
return status;
}

static inline ucp_request_t *ucp_rndv_get_sreq_by_reqptr(ucp_worker_h worker,
uintptr_t reqid)
{
uintptr_t sreq;
khiter_t iter;

/* FIXME rndv offload operations will get here too but their reqptr
* will not be found in the hash since their req_id was not used */
iter = kh_get(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs, reqid);
if (iter == kh_end(&worker->rndv_req_ptrs)) {
ucs_warn("reqid %zu does not exist on worker %p", reqid, worker);
return NULL;
}

sreq = kh_value(&worker->rndv_req_ptrs, iter);
return (ucp_request_t*) sreq;
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rts_handler,
(arg, data, length, tl_flags),
void *arg, void *data, size_t length, unsigned tl_flags)
Expand All @@ -1091,7 +1146,13 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_ats_handler,
void *arg, void *data, size_t length, unsigned flags)
{
ucp_reply_hdr_t *rep_hdr = data;
ucp_request_t *sreq = (ucp_request_t*) rep_hdr->reqptr;
ucp_worker_h worker = arg;
ucp_request_t *sreq;

sreq = ucp_rndv_get_sreq_by_reqptr(worker, rep_hdr->reqptr);
if (sreq == NULL) {
return UCS_OK;
}

/* dereg the original send request and set it to complete */
UCS_PROFILE_REQUEST_EVENT(sreq, "rndv_ats_recv", 0);
Expand Down Expand Up @@ -1458,7 +1519,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_atp_handler,
void *arg, void *data, size_t length, unsigned flags)
{
ucp_reply_hdr_t *rep_hdr = data;
ucp_request_t *req = (ucp_request_t*) rep_hdr->reqptr;
ucp_request_t *req;
ucp_request_t *rreq;
ucp_worker_h worker;
ucp_lane_index_t mem_type_rma_lane;
Expand All @@ -1467,6 +1528,11 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_atp_handler,
ucp_ep_h mem_type_ep;
size_t frag_size, frag_offset;

req = ucp_rndv_get_sreq_by_reqptr((ucp_worker_h)arg, rep_hdr->reqptr);
if (req == NULL) {
return UCS_OK;
}

if (req->recv.frag.rreq) {
/* atp for fragmented rndv request */
rreq = req->recv.frag.rreq;
Expand Down Expand Up @@ -1518,13 +1584,22 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rtr_handler,
void *arg, void *data, size_t length, unsigned flags)
{
ucp_rndv_rtr_hdr_t *rndv_rtr_hdr = data;
ucp_request_t *sreq = (ucp_request_t*)rndv_rtr_hdr->sreq_ptr;
ucp_ep_h ep = sreq->send.ep;
ucp_ep_config_t *ep_config = ucp_ep_config(ep);
ucp_context_h context = ep->worker->context;
ucp_request_t *sreq;
ucp_ep_h ep;
ucp_ep_config_t *ep_config;
ucp_context_h context;
ucs_status_t status;
int is_pipeline_rndv;

sreq = ucp_rndv_get_sreq_by_reqptr((ucp_worker_h)arg, rndv_rtr_hdr->sreq_ptr);
if (sreq == NULL) {
return UCS_OK;
}

ep = sreq->send.ep;
ep_config = ucp_ep_config(ep);
context = ep->worker->context;

ucp_trace_req(sreq, "received rtr address 0x%lx remote rreq 0x%lx",
rndv_rtr_hdr->address, rndv_rtr_hdr->rreq_ptr);
UCS_PROFILE_REQUEST_EVENT(sreq, "rndv_rtr_recv", 0);
Expand Down

0 comments on commit ca9d458

Please sign in to comment.