Skip to content

Commit

Permalink
Merge pull request ofiwg#2 from sunkuamzn/peer-api-rma
Browse files Browse the repository at this point in the history
Peer API RMA changes + SHM clean up
  • Loading branch information
shijin-aws authored Mar 20, 2023
2 parents 34943f5 + c1deeec commit f8ea3f9
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 301 deletions.
2 changes: 0 additions & 2 deletions prov/efa/src/rdm/rxr_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ ssize_t rxr_atomic_generic_efa(struct rxr_ep *rxr_ep,
err = rxr_pkt_post_req(rxr_ep,
tx_entry,
RXR_DC_WRITE_RTA_PKT,
0,
0);
} else {
/*
Expand All @@ -211,7 +210,6 @@ ssize_t rxr_atomic_generic_efa(struct rxr_ep *rxr_ep,
err = rxr_pkt_post_req(rxr_ep,
tx_entry,
req_pkt_type_list[op],
0,
0);
}

Expand Down
14 changes: 6 additions & 8 deletions prov/efa/src/rdm/rxr_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -782,12 +782,13 @@ void rxr_ep_set_use_shm_for_tx(struct rxr_ep *ep)

/* App provided hints supercede environmental variables.
*
* Using the shm provider comes with some overheads, particularly in the
* progress engine when polling an empty completion queue, so avoid
* Using the shm provider comes with some overheads, so avoid
* initializing the provider if the app provides a hint that it does not
* require node-local communication. We can still loopback over the EFA
* device in cases where the app violates the hint and continues
* communicating with node-local peers.
*
* aws-ofi-nccl relies on this feature.
*/
if (ep->user_info
/* If the app requires explicitly remote communication */
Expand Down Expand Up @@ -1934,8 +1935,7 @@ void rxr_ep_progress_internal(struct rxr_ep *ep)
continue;

assert(op_entry->rxr_flags & RXR_OP_ENTRY_QUEUED_CTRL);
ret = rxr_pkt_post(ep, op_entry, op_entry->queued_ctrl.type,
op_entry->queued_ctrl.inject, 0);
ret = rxr_pkt_post(ep, op_entry, op_entry->queued_ctrl_type, 0);
if (ret == -FI_EAGAIN)
break;

Expand Down Expand Up @@ -2004,7 +2004,7 @@ void rxr_ep_progress_internal(struct rxr_ep *ep)

if (peer->flags & EFA_RDM_PEER_IN_BACKOFF)
break;
ret = rxr_pkt_post(ep, op_entry, RXR_DATA_PKT, false, flags);
ret = rxr_pkt_post(ep, op_entry, RXR_DATA_PKT, flags);
if (OFI_UNLIKELY(ret)) {
if (ret == -FI_EAGAIN)
goto out;
Expand Down Expand Up @@ -2067,9 +2067,7 @@ void rxr_ep_progress_internal(struct rxr_ep *ep)
* The core's TX queue is full so we can't do any
* additional work.
*/
bool use_shm = peer->is_local && ep->use_shm_for_tx;

if (!use_shm && ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
if (ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
goto out;

ret = rxr_op_entry_post_remote_read(op_entry);
Expand Down
4 changes: 2 additions & 2 deletions prov/efa/src/rdm/rxr_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ ssize_t rxr_msg_post_rtm(struct rxr_ep *ep, struct rxr_op_entry *tx_entry, int u

if (rtm_type < RXR_EXTRA_REQ_PKT_BEGIN) {
/* rtm requires only baseline feature, which peer should always support. */
return rxr_pkt_post_req(ep, tx_entry, rtm_type, 0, 0);
return rxr_pkt_post_req(ep, tx_entry, rtm_type, 0);
}

/*
Expand All @@ -172,7 +172,7 @@ ssize_t rxr_msg_post_rtm(struct rxr_ep *ep, struct rxr_op_entry *tx_entry, int u
if (!rxr_pkt_req_supported_by_peer(rtm_type, peer))
return -FI_EOPNOTSUPP;

return rxr_pkt_post_req(ep, tx_entry, rtm_type, 0, 0);
return rxr_pkt_post_req(ep, tx_entry, rtm_type, 0);
}

ssize_t rxr_msg_generic_send(struct fid_ep *ep, const struct fi_msg *msg,
Expand Down
83 changes: 12 additions & 71 deletions prov/efa/src/rdm/rxr_op_entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,25 +225,17 @@ void rxr_rx_entry_release(struct rxr_op_entry *rx_entry)
* user's data buffer is on host memory (Though user can register
* its buffer, and provide its descriptor as an optimization).
*
* However, there are a few occations that EFA device and shm
* However, there are a few occations that EFA device
* require memory to be register with them:
*
* First, when EFA device is used to send data:
* When EFA device is used to send data:
*
* If a non-read based protocol (such as eager, meidum, longcts)
* is used, the send buffer must be registered with EFA device.
*
* If a read based protocol is used, both send buffer
* and receive buffer must be registered with EFA device.
*
* Second, when shm is used:
* If eager protocol is used, no registration is needed (because
* shm does not require registration for local buffer)
*
* If a read based protocol is used, the send buffer must
* be registered with shm, because send buffer is used as
* remote buffer in a read based protocol.
*
* Therefore, when user did not provide descriptors for the buffer(s),
* EFA provider need to bridge the gap.
*
Expand All @@ -258,9 +250,7 @@ void rxr_rx_entry_release(struct rxr_op_entry *rx_entry)
* Because of the high cost of memory registration, this happens
* only when MR cache is available, which is checked by the caller
* of this function on sender side. (this happens when
*
* 1. EFA device is used with non-eager protocols and
* 2. SHM is used with long read protocol
* EFA device is used with non-eager protocols and
*
* This function is not guaranteed to fill all descriptors (which
* is why the function name has try). When memory registration fail due
Expand All @@ -287,36 +277,15 @@ void rxr_rx_entry_release(struct rxr_op_entry *rx_entry)
void rxr_op_entry_try_fill_desc(struct rxr_op_entry *op_entry, int mr_iov_start, uint64_t access)
{
int i, err;
struct efa_rdm_peer *peer;

peer = rxr_ep_get_peer(op_entry->ep, op_entry->addr);

for (i = mr_iov_start; i < op_entry->iov_count; ++i) {
if (op_entry->desc[i])
continue;


if (peer->is_local && op_entry->ep->use_shm_for_tx) {
if (access == FI_REMOTE_READ) {
/* this happens when longread protocol message protocl was used
* with shm. The send buffer is going to be read by receiver,
* therefore must be registered with shm provider.
*/
assert(op_entry->type == RXR_TX_ENTRY);
err = efa_mr_reg_shm(&rxr_ep_domain(op_entry->ep)->util_domain.domain_fid,
op_entry->iov + i,
access, &op_entry->mr[i]);
} else {
assert(access == FI_SEND || access == FI_RECV);
/* shm does not require registration for send and recv */
err = 0;
}
} else {
err = fi_mr_regv(&rxr_ep_domain(op_entry->ep)->util_domain.domain_fid,
op_entry->iov + i, 1,
access,
0, 0, 0, &op_entry->mr[i], NULL);
}
err = fi_mr_regv(
&rxr_ep_domain(op_entry->ep)->util_domain.domain_fid,
op_entry->iov + i, 1, access, 0, 0, 0, &op_entry->mr[i],
NULL);

if (err) {
EFA_WARN(FI_LOG_EP_CTRL,
Expand Down Expand Up @@ -441,10 +410,6 @@ size_t rxr_tx_entry_max_req_data_capacity(struct rxr_ep *ep, struct rxr_op_entry
peer = rxr_ep_get_peer(ep, tx_entry->addr);
assert(peer);

if (peer->is_local && ep->use_shm_for_tx) {
return rxr_env.shm_max_medium_size;
}

if (efa_rdm_peer_need_raw_addr_hdr(peer))
header_flags |= RXR_REQ_OPT_RAW_ADDR_HDR;
else if (efa_rdm_peer_need_connid(peer))
Expand Down Expand Up @@ -1041,8 +1006,6 @@ void rxr_op_entry_handle_recv_completed(struct rxr_op_entry *op_entry)
{
struct rxr_op_entry *tx_entry = NULL;
struct rxr_op_entry *rx_entry = NULL;
struct efa_rdm_peer *peer;
bool inject;
int err;

/* It is important to write completion before sending ctrl packet, because the
Expand Down Expand Up @@ -1106,19 +1069,11 @@ void rxr_op_entry_handle_recv_completed(struct rxr_op_entry *op_entry)
*
* Hence, the rx_entry can be safely released only when we got
* the send completion of the ctrl packet.
*
* Another interesting point is that when inject was used, the
* rx_entry was released by rxr_pkt_post_or_queue(), because
* when inject was used, lower device will not provider send
* completion for the ctrl packet.
*/
if (op_entry->rxr_flags & RXR_TX_ENTRY_DELIVERY_COMPLETE_REQUESTED) {
assert(op_entry->type == RXR_RX_ENTRY);
rx_entry = op_entry; /* Intentionally assigned for easier understanding */
peer = rxr_ep_get_peer(rx_entry->ep, rx_entry->addr);
assert(peer);
inject = peer->is_local && rx_entry->ep->use_shm_for_tx;
err = rxr_pkt_post_or_queue(rx_entry->ep, rx_entry, RXR_RECEIPT_PKT, inject);
err = rxr_pkt_post_or_queue(rx_entry->ep, rx_entry, RXR_RECEIPT_PKT);
if (OFI_UNLIKELY(err)) {
EFA_WARN(FI_LOG_CQ,
"Posting of ctrl packet failed when complete rx! err=%s(%d)\n",
Expand Down Expand Up @@ -1254,17 +1209,13 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)
int iov_idx = 0, rma_iov_idx = 0;
size_t iov_offset = 0, rma_iov_offset = 0;
size_t read_once_len, max_read_once_len;
bool use_shm;
struct rxr_ep *ep;
struct efa_rdm_peer *peer;
struct rxr_pkt_entry *pkt_entry;

assert(op_entry->iov_count > 0);
assert(op_entry->rma_iov_count > 0);

ep = op_entry->ep;
peer = rxr_ep_get_peer(ep, op_entry->addr);
use_shm = peer->is_local && ep->use_shm_for_tx;

if (op_entry->bytes_read_total_len == 0) {

Expand All @@ -1275,10 +1226,7 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)
* Note that because send operation used a pkt_entry as wr_id,
* we had to use a pkt_entry as context for read too.
*/
if (use_shm)
pkt_entry = rxr_pkt_entry_alloc(ep, ep->shm_tx_pkt_pool, RXR_PKT_FROM_SHM_TX_POOL);
else
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);

if (OFI_UNLIKELY(!pkt_entry))
return -FI_EAGAIN;
Expand All @@ -1297,11 +1245,9 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)

assert(op_entry->bytes_read_submitted < op_entry->bytes_read_total_len);

if (!use_shm) {
rxr_op_entry_try_fill_desc(op_entry, 0, FI_RECV);
}
rxr_op_entry_try_fill_desc(op_entry, 0, FI_RECV);

max_read_once_len = use_shm ? SIZE_MAX : MIN(rxr_env.efa_read_segment_size, rxr_ep_domain(ep)->device->max_rdma_size);
max_read_once_len = MIN(rxr_env.efa_read_segment_size, rxr_ep_domain(ep)->device->max_rdma_size);
assert(max_read_once_len > 0);

err = rxr_locate_iov_pos(op_entry->iov, op_entry->iov_count,
Expand All @@ -1327,7 +1273,6 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)
assert(rma_iov_idx < op_entry->rma_iov_count);
assert(rma_iov_offset < op_entry->rma_iov[rma_iov_idx].len);

if (!use_shm) {
if (ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
return -FI_EAGAIN;

Expand All @@ -1340,12 +1285,8 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)
*/
return -FI_EAGAIN;
}
}

if (use_shm)
pkt_entry = rxr_pkt_entry_alloc(ep, ep->shm_tx_pkt_pool, RXR_PKT_FROM_SHM_TX_POOL);
else
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);

if (OFI_UNLIKELY(!pkt_entry))
return -FI_EAGAIN;
Expand Down
7 changes: 1 addition & 6 deletions prov/efa/src/rdm/rxr_op_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ enum rxr_op_comm_type {
RXR_RX_RECV, /* rx_entry large msg recv data pkts */
};

struct rxr_queued_ctrl_info {
int type;
int inject;
};

struct rxr_atomic_hdr {
/* atomic_op is different from tx_op */
uint32_t atomic_op;
Expand Down Expand Up @@ -116,7 +111,7 @@ struct rxr_op_entry {
uint64_t total_len;

enum rxr_op_comm_type state;
struct rxr_queued_ctrl_info queued_ctrl;
int queued_ctrl_type;

uint64_t fi_flags;
uint16_t rxr_flags;
Expand Down
Loading

0 comments on commit f8ea3f9

Please sign in to comment.