Skip to content

Commit

Permalink
Merge pull request #7291 from shefty/verbs
Browse files Browse the repository at this point in the history
prov/verbs: Flush any pending send requests on EP destruction
  • Loading branch information
shefty authored Dec 9, 2021
2 parents 10158d4 + 12c3e17 commit ee06e44
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 14 deletions.
18 changes: 13 additions & 5 deletions prov/verbs/src/fi_verbs.h
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ struct vrb_ep {
/* Protected by send CQ lock */
uint64_t sq_credits;
uint64_t peer_rq_credits;
struct slist sq_list;
/* Protected by recv CQ lock */
int64_t rq_credits_avail;
int64_t threshold;
Expand Down Expand Up @@ -619,15 +620,22 @@ struct vrb_ep {
};


/* Must be cast-able to struct fi_context */
enum vrb_op_ctx {
VRB_POST_SQ,
VRB_POST_RQ,
VRB_POST_SRQ,
};

struct vrb_context {
struct vrb_ep *ep;
struct vrb_srq_ep *srx;
struct slist_entry entry;
union {
struct vrb_ep *ep;
struct vrb_srq_ep *srx;
};
void *user_ctx;
uint32_t flags;
enum vrb_op_ctx op_ctx;
};


#define VERBS_XRC_EP_MAGIC 0x1F3D5B79
struct vrb_xrc_ep {
/* Must be first */
Expand Down
14 changes: 9 additions & 5 deletions prov/verbs/src/verbs_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,22 @@ int vrb_poll_cq(struct vrb_cq *cq, struct ibv_wc *wc)

ctx = (struct vrb_context *) (uintptr_t) wc->wr_id;
wc->wr_id = (uintptr_t) ctx->user_ctx;
if (ctx->flags & FI_TRANSMIT) {
if (ctx->op_ctx == VRB_POST_SQ) {
assert(ctx->ep);
assert(!slist_empty(&ctx->ep->sq_list));
assert(ctx->ep->sq_list.head == &ctx->entry);
(void) slist_remove_head(&ctx->ep->sq_list);
cq->credits++;
ctx->ep->sq_credits++;
}

if (wc->status) {
if (ctx->flags & FI_RECV)
wc->opcode |= IBV_WC_RECV;
else
if (ctx->op_ctx == VRB_POST_SQ)
wc->opcode &= ~IBV_WC_RECV;
else
wc->opcode |= IBV_WC_RECV;
}
if (ctx->srx) {
if (ctx->op_ctx == VRB_POST_SRQ) {
ofi_mutex_lock(&ctx->srx->ctx_lock);
ofi_buf_free(ctx);
ofi_mutex_unlock(&ctx->srx->ctx_lock);
Expand Down
47 changes: 43 additions & 4 deletions prov/verbs/src/verbs_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr)
if (!ctx)
goto unlock;

ctx->ep = ep;
OFI_DBG_SET(ctx->ep, ep);
ctx->user_ctx = (void *) (uintptr_t) wr->wr_id;
ctx->flags = FI_RECV;
ctx->op_ctx = VRB_POST_RQ;
wr->wr_id = (uintptr_t) ctx;

ret = ibv_post_recv(ep->ibv_qp, wr, &bad_wr);
Expand Down Expand Up @@ -143,7 +143,7 @@ ssize_t vrb_post_send(struct vrb_ep *ep, struct ibv_send_wr *wr, uint64_t flags)

ctx->ep = ep;
ctx->user_ctx = (void *) (uintptr_t) wr->wr_id;
ctx->flags = FI_TRANSMIT | flags;
ctx->op_ctx = VRB_POST_SQ;
wr->wr_id = (uintptr_t) ctx;

ret = ibv_post_send(ep->ibv_qp, wr, &bad_wr);
Expand All @@ -153,6 +153,7 @@ ssize_t vrb_post_send(struct vrb_ep *ep, struct ibv_send_wr *wr, uint64_t flags)
vrb_convert_ret(ret));
goto credits;
}
slist_insert_tail(&ctx->entry, &ep->sq_list);
cq->util_cq.cq_mutex_unlock(&cq->util_cq.cq_lock);

return 0;
Expand Down Expand Up @@ -391,6 +392,7 @@ vrb_alloc_init_ep(struct fi_info *info, struct vrb_domain *domain,
goto err2;
}

slist_init(&ep->sq_list);
ep->util_ep.ep_fid.msg = calloc(1, sizeof(*ep->util_ep.ep_fid.msg));
if (!ep->util_ep.ep_fid.msg)
goto err3;
Expand All @@ -405,6 +407,41 @@ vrb_alloc_init_ep(struct fi_info *info, struct vrb_domain *domain,
return NULL;
}

/* Generate flush completion entries for any queued send requests.
* We only need to record the wr_id and that the entry was not a
* receive (indicated by lack of IBV_WC_RECV flag).
*/
static void vrb_flush_sq(struct vrb_ep *ep)
{
struct vrb_context *ctx;
struct vrb_cq *cq;
struct slist_entry *entry;
struct ibv_wc wc = {0};

if (!ep->util_ep.tx_cq)
return;

cq = container_of(ep->util_ep.tx_cq, struct vrb_cq, util_cq);
wc.status = IBV_WC_WR_FLUSH_ERR;
wc.vendor_err = FI_ECANCELED;

cq->util_cq.cq_mutex_lock(&cq->util_cq.cq_lock);
while (!slist_empty(&ep->sq_list)) {
entry = slist_remove_head(&ep->sq_list);
ctx = container_of(entry, struct vrb_context, entry);
assert(ctx->op_ctx == VRB_POST_SQ);

wc.wr_id = (uintptr_t) ctx->user_ctx;
cq->credits++;
ctx->ep->sq_credits++;
ofi_buf_free(ctx);

if (wc.wr_id != VERBS_NO_COMP_FLAG)
vrb_save_wc(cq, &wc);
}
cq->util_cq.cq_mutex_unlock(&cq->util_cq.cq_lock);
}

static int vrb_close_free_ep(struct vrb_ep *ep)
{
struct vrb_cq *cq;
Expand Down Expand Up @@ -478,6 +515,7 @@ static int vrb_ep_close(fid_t fid)
if (ep->eq)
ofi_mutex_unlock(&ep->eq->lock);
vrb_cleanup_cq(ep);
vrb_flush_sq(ep);
break;
case FI_EP_DGRAM:
fab = container_of(&ep->util_ep.domain->fabric->fabric_fid,
Expand All @@ -490,6 +528,7 @@ static int vrb_ep_close(fid_t fid)
return -errno;
}
vrb_cleanup_cq(ep);
vrb_flush_sq(ep);
break;
default:
VRB_WARN(FI_LOG_DOMAIN, "Unknown EP type\n");
Expand Down Expand Up @@ -1464,7 +1503,7 @@ ssize_t vrb_post_srq(struct vrb_srq_ep *ep, struct ibv_recv_wr *wr)

ctx->srx = ep;
ctx->user_ctx = (void *) (uintptr_t) wr->wr_id;
ctx->flags = FI_RECV;
ctx->op_ctx = VRB_POST_SRQ;
wr->wr_id = (uintptr_t) ctx;

ret = ibv_post_srq_recv(ep->srq, wr, &bad_wr);
Expand Down

0 comments on commit ee06e44

Please sign in to comment.