Skip to content

Commit

Permalink
prov/util: set srx completion flags and msg_len properly
Browse files Browse the repository at this point in the history
The peer srx should return entries with the FI_MSG/FI_TAGGED and
FI_RECV flags set

The msg_size field in the peer_rx_entry needs to be set in the
expected path to the number of bytes allowed to be copied.
This is either the size of the message (from the attr->msg_size
paramter) or, if the buffer is not large enough to hold the entire
message, the size of the buffer.

This also fixes setting the message size and flag fields on the unexpected
multi receive path. This case is a bit different because it not only has to
account for the message size and buffer size, but also for the owner entry's message
size and flags

Signed-off-by: Alexia Ingerson <[email protected]>
  • Loading branch information
aingerson committed Nov 13, 2024
1 parent c2d2efe commit f077d78
Showing 1 changed file with 40 additions and 13 deletions.
53 changes: 40 additions & 13 deletions prov/util/src/util_srx.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static void util_init_rx_entry(struct util_rx_entry *entry,
entry->peer_entry.context = context;
entry->peer_entry.tag = tag;
entry->peer_entry.flags = flags;
entry->peer_entry.msg_size = ofi_total_iov_len(iov, count);
}

static struct util_rx_entry *util_get_recv_entry(struct util_srx_ctx *srx,
Expand Down Expand Up @@ -191,6 +192,8 @@ static int util_match_msg(struct fid_peer_srx *srx,
util_entry->peer_entry.srx = srx;
srx_ctx->update_func(srx_ctx, util_entry);
}
util_entry->peer_entry.msg_size = MIN(util_entry->peer_entry.msg_size,
attr->msg_size);
*rx_entry = &util_entry->peer_entry;
return ret;
}
Expand Down Expand Up @@ -268,6 +271,8 @@ static int util_match_tag(struct fid_peer_srx *srx,
ret = -FI_ENOENT;
util_entry->peer_entry.srx = srx;
out:
util_entry->peer_entry.msg_size = MIN(util_entry->peer_entry.msg_size,
attr->msg_size);
*rx_entry = &util_entry->peer_entry;
return ret;
}
Expand Down Expand Up @@ -496,6 +501,33 @@ static struct util_rx_entry *util_search_unexp_msg(struct util_srx_ctx *srx,
return util_search_peer_msg(ofi_array_at(&srx->src_unexp_peers, addr));
}

static bool util_unexp_mrecv(struct util_srx_ctx *srx,
struct util_rx_entry *mrecv_entry,
struct util_rx_entry *rx_entry)
{
mrecv_entry->multi_recv_ref++;
rx_entry->peer_entry.owner_context = mrecv_entry;

rx_entry->peer_entry.iov[0].iov_base =
mrecv_entry->peer_entry.iov->iov_base;
rx_entry->peer_entry.iov->iov_len =
MIN(mrecv_entry->peer_entry.iov->iov_len,
rx_entry->peer_entry.msg_size);
*rx_entry->peer_entry.desc = mrecv_entry->peer_entry.desc[0];

rx_entry->peer_entry.count = 1;
rx_entry->peer_entry.addr = mrecv_entry->peer_entry.addr;
rx_entry->peer_entry.context = mrecv_entry->peer_entry.context;
rx_entry->peer_entry.tag = mrecv_entry->peer_entry.tag;
rx_entry->peer_entry.flags |= mrecv_entry->peer_entry.flags &
~FI_MULTI_RECV;
rx_entry->peer_entry.msg_size = rx_entry->peer_entry.iov->iov_len;

return util_adjust_multi_recv(srx, &mrecv_entry->peer_entry,
rx_entry->peer_entry.msg_size);

}

static ssize_t util_generic_mrecv(struct util_srx_ctx *srx,
const struct iovec *iov, void **desc, size_t iov_count,
fi_addr_t addr, void *context, uint64_t flags)
Expand All @@ -510,7 +542,8 @@ static ssize_t util_generic_mrecv(struct util_srx_ctx *srx,

ofi_genlock_lock(srx->lock);
mrecv_entry = util_get_recv_entry(srx, iov, desc, iov_count, addr,
context, 0, 0, flags);
context, 0, 0,
flags | FI_MSG | FI_RECV);
if (!mrecv_entry) {
ret = -FI_ENOMEM;
goto out;
Expand All @@ -520,15 +553,7 @@ static ssize_t util_generic_mrecv(struct util_srx_ctx *srx,

rx_entry = util_search_unexp_msg(srx, addr);
while (rx_entry) {
util_init_rx_entry(rx_entry, mrecv_entry->peer_entry.iov, desc,
iov_count, addr, context, 0,
flags & (~FI_MULTI_RECV));
mrecv_entry->multi_recv_ref++;
rx_entry->peer_entry.owner_context = mrecv_entry;

if (util_adjust_multi_recv(srx, &mrecv_entry->peer_entry,
rx_entry->peer_entry.msg_size))
buf_done = true;
buf_done = util_unexp_mrecv(srx, mrecv_entry, rx_entry);

srx->update_func(srx, rx_entry);
ret = rx_entry->peer_entry.srx->peer_ops->start_msg(
Expand Down Expand Up @@ -695,7 +720,8 @@ ssize_t util_srx_generic_trecv(struct fid_ep *ep_fid, const struct iovec *iov,
assert(queue);
rx_entry = util_get_recv_entry(srx, iov, desc,
iov_count, addr, context, tag,
ignore, flags);
ignore,
flags | FI_TAGGED | FI_RECV);
if (!rx_entry)
ret = -FI_ENOMEM;
else
Expand Down Expand Up @@ -741,10 +767,11 @@ ssize_t util_srx_generic_recv(struct fid_ep *ep_fid, const struct iovec *iov,
ofi_array_at(&srx->src_recv_queues, addr);
assert(queue);
rx_entry = util_get_recv_entry(srx, iov, desc, iov_count, addr,
context, 0, 0, flags);
context, 0, 0,
flags | FI_MSG | FI_RECV);
if (!rx_entry)
ret = -FI_ENOMEM;
else
else
slist_insert_tail((struct slist_entry *)
(&rx_entry->peer_entry), queue);
goto out;
Expand Down

0 comments on commit f077d78

Please sign in to comment.