Skip to content

Commit

Permalink
prov/net: Enhance FI_PEEK to support FI_DISCARD
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Oost <[email protected]>
  • Loading branch information
ooststep authored and shefty committed Sep 29, 2022
1 parent 5f8d2b4 commit 6f368da
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
3 changes: 3 additions & 0 deletions prov/net/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ static inline void xnet_signal_progress(struct xnet_progress *progress)
#define XNET_NEED_DYN_RBUF BIT(4)
#define XNET_ASYNC BIT(5)
#define XNET_INJECT_OP BIT(6)
#define XNET_FREE_BUF BIT(7)
#define XNET_MULTI_RECV FI_MULTI_RECV /* BIT(16) */

struct xnet_xfer_entry {
Expand Down Expand Up @@ -544,6 +545,8 @@ static inline void
xnet_free_xfer(struct xnet_ep *ep, struct xnet_xfer_entry *xfer)
{
assert(xnet_progress_locked(xnet_ep2_progress(ep)));
if (xfer->ctrl_flags & XNET_FREE_BUF)
free(xfer->iov[0].iov_base);
xfer->hdr.base_hdr.flags = 0;
xfer->cq_flags = 0;
xfer->cntr_inc = NULL;
Expand Down
29 changes: 22 additions & 7 deletions prov/net/src/xnet_srx.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ xnet_srx_claim(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry,
if (!ep)
return -FI_ENOMSG;

if (flags & FI_DISCARD) {
msg_len = ep->cur_rx.hdr.base_hdr.size -
ep->cur_rx.hdr.base_hdr.hdr_size;
recv_entry->iov[0].iov_base = calloc(1, msg_len);
if (!recv_entry->iov[0].iov_base)
return -FI_ENOMEM;
recv_entry->iov[0].iov_len = msg_len;
recv_entry->ctrl_flags |= XNET_FREE_BUF;
}

ret = xnet_start_recv(ep, recv_entry);
if (ret && !OFI_SOCK_TRY_SND_RCV_AGAIN(-ret))
xnet_ep_disable(ep, 0, NULL, 0);
Expand All @@ -278,12 +288,13 @@ xnet_srx_claim(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry,
return FI_SUCCESS;
}

static void
static ssize_t
xnet_srx_peek(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry,
uint64_t flags)
{
struct xnet_ep *ep;
struct fi_cq_err_entry err_entry;
ssize_t ret = FI_ENOMSG;

assert(xnet_progress_locked(xnet_srx2_progress(srx)));
assert(srx->rdm);
Expand All @@ -296,7 +307,7 @@ xnet_srx_peek(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry,
(size_t) ep->cur_rx.hdr.base_hdr.hdr_size);
recv_entry->cq_flags |= xnet_rx_completion_flag(ep);

if (flags & FI_CLAIM) {
if (flags & (FI_CLAIM | FI_DISCARD)) {
FI_DBG(&xnet_prov, FI_LOG_EP_DATA, "Marking message for Claim\n");
if (ep->cur_rx.hdr.base_hdr.flags & XNET_REMOTE_CQ_DATA)
ep->cur_rx.hdr.tag_data_hdr.tag |= XNET_CLAIM_TAG_BIT;
Expand All @@ -305,16 +316,20 @@ xnet_srx_peek(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry,
ep->cur_rx.claim_ctx = recv_entry->context;
}

if (flags & FI_DISCARD)
return xnet_srx_claim(srx, recv_entry, flags);

xnet_report_success(ep, &srx->cq->util_cq, recv_entry);
return;
return FI_SUCCESS;

nomatch:
memset(&err_entry, 0, sizeof(err_entry));
err_entry.op_context = recv_entry->context;
err_entry.flags = FI_RECV | FI_TAGGED;
err_entry.tag = recv_entry->tag;
err_entry.err =FI_ENOMSG;
err_entry.err = ret;
ofi_cq_write_error(&srx->cq->util_cq, &err_entry);
return FI_SUCCESS;
}

/* It's possible that an endpoint may be waiting for the message being
Expand Down Expand Up @@ -392,9 +407,9 @@ xnet_srx_trecvmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *msg,
recv_entry->context = msg->context;

if (flags & FI_PEEK) {
xnet_srx_peek(srx, recv_entry, flags);
ofi_buf_free(recv_entry);
ret = 0;
ret = xnet_srx_peek(srx, recv_entry, flags);
if (ret || !(flags & FI_DISCARD))
ofi_buf_free(recv_entry);
goto unlock;
}

Expand Down

0 comments on commit 6f368da

Please sign in to comment.