Skip to content

Commit

Permalink
Merge pull request ofiwg#5962 from shefty/master
Browse files Browse the repository at this point in the history
prov/rxm: Rename several functions in rxm_cq
  • Loading branch information
shefty authored May 20, 2020
2 parents 987bf79 + b5d639e commit 8d07fa7
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 80 deletions.
11 changes: 5 additions & 6 deletions prov/rxm/src/rxm.h
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ int rxm_domain_open(struct fid_fabric *fabric, struct fi_info *info,
struct fid_domain **dom, void *context);
int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq_fid, void *context);
ssize_t rxm_cq_handle_rx_buf(struct rxm_rx_buf *rx_buf);
ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf);

int rxm_endpoint(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep, void *context);
Expand All @@ -733,14 +733,14 @@ int rxm_conn_cmap_alloc(struct rxm_ep *rxm_ep);
void rxm_cq_write_error(struct util_cq *cq, struct util_cntr *cntr,
void *op_context, int err);
void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err);
void rxm_cq_read_write_error(struct rxm_ep *rxm_ep);
ssize_t rxm_cq_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp);
void rxm_handle_comp_error(struct rxm_ep *rxm_ep);
ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp);
void rxm_ep_progress(struct util_ep *util_ep);
void rxm_ep_progress_coll(struct util_ep *util_ep);
void rxm_ep_do_progress(struct util_ep *util_ep);

ssize_t rxm_cq_handle_eager(struct rxm_rx_buf *rx_buf);
ssize_t rxm_cq_handle_coll_eager(struct rxm_rx_buf *rx_buf);
ssize_t rxm_handle_eager(struct rxm_rx_buf *rx_buf);
ssize_t rxm_handle_coll_eager(struct rxm_rx_buf *rx_buf);
int rxm_finish_eager_send(struct rxm_ep *rxm_ep, struct rxm_tx_eager_buf *tx_eager_buf);
int rxm_finish_coll_eager_send(struct rxm_ep *rxm_ep, struct rxm_tx_eager_buf *tx_eager_buf);

Expand All @@ -749,7 +749,6 @@ int rxm_msg_ep_prepost_recv(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep);
int rxm_ep_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
enum fi_op op, struct fi_atomic_attr *attr,
uint64_t flags);
int rxm_rx_repost_new(struct rxm_rx_buf *rx_buf);

static inline size_t rxm_ep_max_atomic_size(struct fi_info *info)
{
Expand Down
6 changes: 3 additions & 3 deletions prov/rxm/src/rxm_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ static int rxm_conn_reprocess_directed_recvs(struct rxm_recv_queue *recv_queue)
rx_buf->recv_entry = container_of(entry, struct rxm_recv_entry,
entry);

ret = rxm_cq_handle_rx_buf(rx_buf);
ret = rxm_handle_rx_buf(rx_buf);
if (ret) {
err_entry.op_context = rx_buf;
err_entry.flags = rx_buf->recv_entry->comp_flags;
Expand Down Expand Up @@ -1100,14 +1100,14 @@ static void rxm_flush_msg_cq(struct rxm_ep *rxm_ep)
do {
ret = fi_cq_read(rxm_ep->msg_cq, &comp, 1);
if (ret > 0) {
ret = rxm_cq_handle_comp(rxm_ep, &comp);
ret = rxm_handle_comp(rxm_ep, &comp);
if (OFI_UNLIKELY(ret)) {
rxm_cq_write_error_all(rxm_ep, ret);
} else {
ret = 1;
}
} else if (ret == -FI_EAVAIL) {
rxm_cq_read_write_error(rxm_ep);
rxm_handle_comp_error(rxm_ep);
ret = 1;
} else if (ret < 0 && ret != -FI_EAGAIN) {
rxm_cq_write_error_all(rxm_ep, ret);
Expand Down
116 changes: 55 additions & 61 deletions prov/rxm/src/rxm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,19 @@ rxm_cq_strerror(struct fid_cq *cq_fid, int prov_errno,
return fi_cq_strerror(rxm_ep->msg_cq, prov_errno, err_data, buf, len);
}

static inline uint64_t
rxm_cq_get_rx_comp_and_op_flags(struct rxm_rx_buf *rx_buf)
static int rxm_repost_new_rx(struct rxm_rx_buf *rx_buf)
{
return (rx_buf->pkt.hdr.flags | ofi_rx_flags[rx_buf->pkt.hdr.op]);
struct rxm_rx_buf *new_rx_buf;
if (rx_buf->repost) {
rx_buf->repost = 0;
new_rx_buf = rxm_rx_buf_alloc(rx_buf->ep, rx_buf->msg_ep, 1);
if (!new_rx_buf)
return -FI_ENOMEM;

dlist_insert_tail(&new_rx_buf->repost_entry,
&new_rx_buf->ep->repost_ready_list);
}
return FI_SUCCESS;
}

static int rxm_finish_buf_recv(struct rxm_rx_buf *rx_buf)
Expand All @@ -78,10 +87,10 @@ static int rxm_finish_buf_recv(struct rxm_rx_buf *rx_buf)
dlist_insert_tail(&rx_buf->unexp_msg.entry,
&rx_buf->conn->sar_deferred_rx_msg_list);
// repost a new buffer immediately while SAR takes some time to complete
return rxm_rx_repost_new(rx_buf);
return rxm_repost_new_rx(rx_buf);
}

flags = rxm_cq_get_rx_comp_and_op_flags(rx_buf);
flags = (rx_buf->pkt.hdr.flags | ofi_rx_flags[rx_buf->pkt.hdr.op]);

if (rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_eager)
flags |= FI_MORE;
Expand Down Expand Up @@ -177,7 +186,7 @@ static int rxm_finish_recv(struct rxm_rx_buf *rx_buf, size_t done_len)
}

static int
rxm_cq_tx_comp_write(struct rxm_ep *rxm_ep, uint64_t comp_flags,
rxm_cq_write_tx_comp(struct rxm_ep *rxm_ep, uint64_t comp_flags,
void *app_context, uint64_t flags)
{
int ret;
Expand All @@ -200,7 +209,7 @@ rxm_cq_tx_comp_write(struct rxm_ep *rxm_ep, uint64_t comp_flags,
static int rxm_finish_rma(struct rxm_ep *rxm_ep, struct rxm_rma_buf *rma_buf,
uint64_t comp_flags)
{
int ret = rxm_cq_tx_comp_write(rxm_ep, comp_flags,
int ret = rxm_cq_write_tx_comp(rxm_ep, comp_flags,
rma_buf->app_context, rma_buf->flags);

assert(((comp_flags & FI_WRITE) && !(comp_flags & FI_READ)) ||
Expand All @@ -222,7 +231,7 @@ static int rxm_finish_rma(struct rxm_ep *rxm_ep, struct rxm_rma_buf *rma_buf,

int rxm_finish_eager_send(struct rxm_ep *rxm_ep, struct rxm_tx_eager_buf *tx_buf)
{
int ret = rxm_cq_tx_comp_write(rxm_ep, ofi_tx_cq_flags(tx_buf->pkt.hdr.op),
int ret = rxm_cq_write_tx_comp(rxm_ep, ofi_tx_cq_flags(tx_buf->pkt.hdr.op),
tx_buf->app_context, tx_buf->flags);

assert(ofi_tx_cq_flags(tx_buf->pkt.hdr.op) & FI_SEND);
Expand All @@ -245,7 +254,7 @@ static int rxm_finish_sar_segment_send(struct rxm_ep *rxm_ep,
break;
case RXM_SAR_SEG_LAST:
if (!err) {
ret = rxm_cq_tx_comp_write(rxm_ep,
ret = rxm_cq_write_tx_comp(rxm_ep,
ofi_tx_cq_flags(tx_buf->pkt.hdr.op),
tx_buf->app_context, tx_buf->flags);

Expand Down Expand Up @@ -289,7 +298,7 @@ static int rxm_rndv_tx_finish(struct rxm_ep *rxm_ep,
if (!rxm_ep->rdm_mr_local)
rxm_msg_mr_closev(tx_buf->mr, tx_buf->count);

ret = rxm_cq_tx_comp_write(rxm_ep, ofi_tx_cq_flags(tx_buf->pkt.hdr.op),
ret = rxm_cq_write_tx_comp(rxm_ep, ofi_tx_cq_flags(tx_buf->pkt.hdr.op),
tx_buf->app_context, tx_buf->flags);

assert(ofi_tx_cq_flags(tx_buf->pkt.hdr.op) & FI_SEND);
Expand Down Expand Up @@ -334,7 +343,7 @@ static int rxm_rx_buf_match_msg_id(struct dlist_entry *item, const void *arg)
return (msg_id == rx_buf->pkt.ctrl_hdr.msg_id);
}

static ssize_t rxm_cq_copy_seg_data(struct rxm_rx_buf *rx_buf, int *done)
static ssize_t rxm_process_seg_data(struct rxm_rx_buf *rx_buf, int *done)
{
uint64_t done_len;
ssize_t ret;
Expand Down Expand Up @@ -392,7 +401,7 @@ static ssize_t rxm_handle_seg_data(struct rxm_rx_buf *rx_buf)
ssize_t ret;
int done;

ret = rxm_cq_copy_seg_data(rx_buf, &done);
ret = rxm_process_seg_data(rx_buf, &done);
if (done || !(rx_buf->ep->rxm_info->mode & FI_BUFFERED_RECV))
return ret;

Expand All @@ -408,18 +417,18 @@ static ssize_t rxm_handle_seg_data(struct rxm_rx_buf *rx_buf)

dlist_remove(&rx_buf->unexp_msg.entry);
rx_buf->recv_entry = recv_entry;
ret = rxm_cq_copy_seg_data(rx_buf, &done);
ret = rxm_process_seg_data(rx_buf, &done);
if (done)
break;
}
return ret;
}

static ssize_t
rxm_cq_rndv_read_prepare_deferred(struct rxm_deferred_tx_entry **def_tx_entry,
size_t index,
struct iovec *iov, void *desc[RXM_IOV_LIMIT],
size_t count, struct rxm_rx_buf *rx_buf)
rxm_prepare_deferred_rndv_read(struct rxm_deferred_tx_entry **def_tx_entry,
size_t index, struct iovec *iov,
void *desc[RXM_IOV_LIMIT], size_t count,
struct rxm_rx_buf *rx_buf)
{
uint8_t i;

Expand All @@ -443,21 +452,6 @@ rxm_cq_rndv_read_prepare_deferred(struct rxm_deferred_tx_entry **def_tx_entry,
return 0;
}

int rxm_rx_repost_new(struct rxm_rx_buf *rx_buf)
{
struct rxm_rx_buf *new_rx_buf;
if (rx_buf->repost) {
rx_buf->repost = 0;
new_rx_buf = rxm_rx_buf_alloc(rx_buf->ep, rx_buf->msg_ep, 1);
if (!new_rx_buf)
return -FI_ENOMEM;

dlist_insert_tail(&new_rx_buf->repost_entry,
&new_rx_buf->ep->repost_ready_list);
}
return FI_SUCCESS;
}

static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf)
{
size_t i, index = 0, offset = 0, count, total_recv_len;
Expand All @@ -468,7 +462,7 @@ static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf)

/* En-queue new rx buf to be posted ASAP so that we don't block any
* incoming messages. RNDV processing can take a while. */
ret = rxm_rx_repost_new(rx_buf);
ret = rxm_repost_new_rx(rx_buf);
if (ret)
return ret;

Expand Down Expand Up @@ -539,7 +533,7 @@ static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf)
if (ret == -FI_EAGAIN) {
struct rxm_deferred_tx_entry *def_tx_entry;

ret = rxm_cq_rndv_read_prepare_deferred(
ret = rxm_prepare_deferred_rndv_read(
&def_tx_entry, i, iov, desc,
count, rx_buf);
if (ret)
Expand All @@ -558,7 +552,7 @@ static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf)
return ret;
}

ssize_t rxm_cq_handle_eager(struct rxm_rx_buf *rx_buf)
ssize_t rxm_handle_eager(struct rxm_rx_buf *rx_buf)
{
uint64_t done_len;

Expand All @@ -568,7 +562,7 @@ ssize_t rxm_cq_handle_eager(struct rxm_rx_buf *rx_buf)
return rxm_finish_recv(rx_buf, done_len);
}

ssize_t rxm_cq_handle_coll_eager(struct rxm_rx_buf *rx_buf)
ssize_t rxm_handle_coll_eager(struct rxm_rx_buf *rx_buf)
{
uint64_t done_len;
ssize_t ret;
Expand All @@ -589,7 +583,7 @@ ssize_t rxm_cq_handle_coll_eager(struct rxm_rx_buf *rx_buf)
return ret;
}

ssize_t rxm_cq_handle_rx_buf(struct rxm_rx_buf *rx_buf)
ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf)
{
switch (rx_buf->pkt.ctrl_hdr.type) {
case rxm_ctrl_eager:
Expand All @@ -606,8 +600,8 @@ ssize_t rxm_cq_handle_rx_buf(struct rxm_rx_buf *rx_buf)
}

static ssize_t
rxm_cq_match_rx_buf(struct rxm_rx_buf *rx_buf,
struct rxm_recv_queue *recv_queue,
rxm_match_rx_buf(struct rxm_rx_buf *rx_buf,
struct rxm_recv_queue *recv_queue,
struct rxm_recv_match_attr *match_attr)
{
struct dlist_entry *entry;
Expand All @@ -616,7 +610,7 @@ rxm_cq_match_rx_buf(struct rxm_rx_buf *rx_buf,
recv_queue->match_recv, match_attr);
if (entry) {
rx_buf->recv_entry = container_of(entry, struct rxm_recv_entry, entry);
return rxm_cq_handle_rx_buf(rx_buf);
return rxm_handle_rx_buf(rx_buf);
}

RXM_DBG_ADDR_TAG(FI_LOG_CQ, "No matching recv found for incoming msg",
Expand All @@ -630,7 +624,7 @@ rxm_cq_match_rx_buf(struct rxm_rx_buf *rx_buf,

// repost a new buffer now since we don't know when the unexpected
// buffer will be consumed
return rxm_rx_repost_new(rx_buf);
return rxm_repost_new_rx(rx_buf);
}

static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf)
Expand All @@ -654,13 +648,13 @@ static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf)
switch(rx_buf->pkt.hdr.op) {
case ofi_op_msg:
FI_DBG(&rxm_prov, FI_LOG_CQ, "Got MSG op\n");
return rxm_cq_match_rx_buf(rx_buf, &rx_buf->ep->recv_queue,
&match_attr);
return rxm_match_rx_buf(rx_buf, &rx_buf->ep->recv_queue,
&match_attr);
case ofi_op_tagged:
FI_DBG(&rxm_prov, FI_LOG_CQ, "Got TAGGED op\n");
match_attr.tag = rx_buf->pkt.hdr.tag;
return rxm_cq_match_rx_buf(rx_buf, &rx_buf->ep->trecv_queue,
&match_attr);
return rxm_match_rx_buf(rx_buf, &rx_buf->ep->trecv_queue,
&match_attr);
default:
FI_WARN(&rxm_prov, FI_LOG_CQ, "Unknown op!\n");
assert(0);
Expand Down Expand Up @@ -811,11 +805,11 @@ static int rxm_handle_remote_write(struct rxm_ep *rxm_ep,
return 0;
}

static void rxm_ep_format_atomic_resp_pkt_hdr(struct rxm_conn *rxm_conn,
struct rxm_tx_atomic_buf *tx_buf,
size_t data_len, uint32_t pkt_op,
enum fi_datatype datatype,
uint8_t atomic_op)
static void rxm_format_atomic_resp_pkt_hdr(struct rxm_conn *rxm_conn,
struct rxm_tx_atomic_buf *tx_buf,
size_t data_len, uint32_t pkt_op,
enum fi_datatype datatype,
uint8_t atomic_op)
{
rxm_ep_format_tx_buf_pkt(rxm_conn, data_len, pkt_op, 0, 0, 0,
&tx_buf->pkt);
Expand All @@ -840,10 +834,10 @@ static ssize_t rxm_atomic_send_resp(struct rxm_ep *rxm_ep,
sizeof(struct rxm_pkt);

resp_buf->hdr.state = RXM_ATOMIC_RESP_SENT;
rxm_ep_format_atomic_resp_pkt_hdr(rx_buf->conn, resp_buf, resp_len,
rx_buf->pkt.hdr.op,
rx_buf->pkt.hdr.atomic.datatype,
rx_buf->pkt.hdr.atomic.op);
rxm_format_atomic_resp_pkt_hdr(rx_buf->conn, resp_buf, resp_len,
rx_buf->pkt.hdr.op,
rx_buf->pkt.hdr.atomic.datatype,
rx_buf->pkt.hdr.atomic.op);
resp_buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->handle.remote_key;
resp_buf->pkt.ctrl_hdr.msg_id = rx_buf->pkt.ctrl_hdr.msg_id;
atomic_hdr = (struct rxm_atomic_resp_hdr *) resp_buf->pkt.data;
Expand Down Expand Up @@ -1030,7 +1024,7 @@ static ssize_t rxm_handle_atomic_resp(struct rxm_ep *rxm_ep,
resp_hdr->data, len);

if (!(tx_buf->flags & FI_INJECT))
ret = rxm_cq_tx_comp_write(rxm_ep,
ret = rxm_cq_write_tx_comp(rxm_ep,
ofi_tx_cq_flags(tx_buf->pkt.hdr.op),
tx_buf->app_context, tx_buf->flags);

Expand Down Expand Up @@ -1081,7 +1075,7 @@ int rxm_finish_coll_eager_send(struct rxm_ep *rxm_ep,
return ret;
};

ssize_t rxm_cq_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp)
ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp)
{
struct rxm_rx_buf *rx_buf;
struct rxm_tx_base_buf *tx_buf;
Expand Down Expand Up @@ -1239,7 +1233,7 @@ void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err)
rxm_cntr_incerr(rxm_ep->util_ep.rd_cntr);
}

void rxm_cq_read_write_error(struct rxm_ep *rxm_ep)
void rxm_handle_comp_error(struct rxm_ep *rxm_ep)
{
struct rxm_tx_base_buf *base_buf;
struct rxm_tx_eager_buf *eager_buf;
Expand Down Expand Up @@ -1418,17 +1412,17 @@ void rxm_ep_do_progress(struct util_ep *util_ep)
do {
ret = fi_cq_read(rxm_ep->msg_cq, &comp, 1);
if (ret > 0) {
// We don't have enough info to write a good
// error entry to the CQ at this point
ret = rxm_cq_handle_comp(rxm_ep, &comp);
ret = rxm_handle_comp(rxm_ep, &comp);
if (ret) {
// We don't have enough info to write a good
// error entry to the CQ at this point
rxm_cq_write_error_all(rxm_ep, ret);
} else {
ret = 1;
}
} else if (ret < 0 && (ret != -FI_EAGAIN)) {
if (ret == -FI_EAVAIL)
rxm_cq_read_write_error(rxm_ep);
rxm_handle_comp_error(rxm_ep);
else
rxm_cq_write_error_all(rxm_ep, ret);
}
Expand Down
Loading

0 comments on commit 8d07fa7

Please sign in to comment.