Skip to content

Commit

Permalink
tcp provider: Early remove ep from polling after shutdown
Browse files Browse the repository at this point in the history
I noticed that the polling thread in our applicatioon
is spinning for some time after calling fi_shutdown().
Reason is that we only call fi_close() after all references
to our internal connection handling are dropped and that
can be much later than the call of fi_close() and the
FI_SHUTDOWN event handling. Our polling thread then
started to spin without ever going to sleep.
The solution here is to remove the endpoint from polling
after the TCPX_EP_SHUTDOWN flag is set and only after
another round of polling in tcpx_progress() - similar to
what the socket provider does.

Change-Id: I66ea20bf344ae8f74a67532657d0e1d199638391
Signed-off-by: Bernd Schubert <[email protected]>
  • Loading branch information
bsbernd committed Mar 6, 2020
1 parent 1307914 commit 53206ab
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
3 changes: 3 additions & 0 deletions prov/tcp/src/tcpx.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ enum tcpx_cm_state {
TCPX_EP_CONNECTING,
TCPX_EP_CONNECTED,
TCPX_EP_SHUTDOWN,
TCPX_EP_POLL_REMOVED,
TCPX_EP_ERROR,
};

Expand Down Expand Up @@ -288,6 +289,8 @@ int tcpx_read_to_buffer(SOCKET sock, struct stage_buf *stage_buf);

struct tcpx_xfer_entry *tcpx_xfer_entry_alloc(struct tcpx_cq *cq,
enum tcpx_xfer_op_codes type);

void tcpx_ep_wait_fd_del(struct tcpx_ep *ep);
void tcpx_xfer_entry_release(struct tcpx_cq *tcpx_cq,
struct tcpx_xfer_entry *xfer_entry);
void tcpx_srx_xfer_release(struct tcpx_rx_ctx *srx_ctx,
Expand Down
22 changes: 17 additions & 5 deletions prov/tcp/src/tcpx_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,17 +347,18 @@ static void tcpx_ep_tx_rx_queues_release(struct tcpx_ep *ep)
fastlock_release(&ep->lock);
}

static int tcpx_ep_close(struct fid *fid)
/**
* Release the ep from polling
*/
void tcpx_ep_wait_fd_del(struct tcpx_ep *ep)
{
FI_DBG(&tcpx_prov, FI_LOG_EP_CTRL, "releasing ep=%p\n", ep);

struct tcpx_eq *eq;
struct tcpx_ep *ep = container_of(fid, struct tcpx_ep,
util_ep.ep_fid.fid);

eq = container_of(ep->util_ep.eq, struct tcpx_eq,
util_eq);

tcpx_ep_tx_rx_queues_release(ep);

/* eq->close_lock protects from processing stale connection events */
fastlock_acquire(&eq->close_lock);
if (ep->util_ep.rx_cq)
Expand All @@ -370,6 +371,17 @@ static int tcpx_ep_close(struct fid *fid)
ofi_wait_fd_del(ep->util_ep.eq->wait, ep->sock);

fastlock_release(&eq->close_lock);
}

static int tcpx_ep_close(struct fid *fid)
{
struct tcpx_ep *ep = container_of(fid, struct tcpx_ep,
util_ep.ep_fid.fid);

tcpx_ep_tx_rx_queues_release(ep);

tcpx_ep_wait_fd_del(ep); /* ensure that everything is really released */

ofi_eq_remove_fid_events(ep->util_ep.eq, &ep->util_ep.ep_fid.fid);
ofi_close_socket(ep->sock);
ofi_endpoint_close(&ep->util_ep);
Expand Down
34 changes: 24 additions & 10 deletions prov/tcp/src/tcpx_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,35 @@ static void tcpx_report_error(struct tcpx_ep *tcpx_ep, int err)
&err_entry, sizeof(err_entry), UTIL_FLAG_ERROR);
}

/**
* Shutdown is done in two phases, phase1 writes the FI_SHUTDOWN event, which
* a polling thread still needs to handle, phase2 removes the fd
* of the ep from polling, so that a polling thread won't spin
* if it does not close the connection immediately after it handled
* FI_SHUTDOWN
*/
int tcpx_ep_shutdown_report(struct tcpx_ep *ep, fid_t fid)
{
struct fi_eq_cm_entry cm_entry = {0};
ssize_t len;

if (ep->cm_state == TCPX_EP_SHUTDOWN)
return FI_SUCCESS;

tcpx_cq_report_xfer_fail(ep, -FI_ENOTCONN);
ep->cm_state = TCPX_EP_SHUTDOWN;
cm_entry.fid = fid;
len = fi_eq_write(&ep->util_ep.eq->eq_fid, FI_SHUTDOWN,
&cm_entry, sizeof(cm_entry), 0);
if (len < 0)
return (int) len;
switch (ep->cm_state) {
case TCPX_EP_POLL_REMOVED:
break;
case TCPX_EP_SHUTDOWN:
tcpx_ep_wait_fd_del(ep);
ep->cm_state = TCPX_EP_POLL_REMOVED;
break;
default:
tcpx_cq_report_xfer_fail(ep, -FI_ENOTCONN);
ep->cm_state = TCPX_EP_SHUTDOWN;
cm_entry.fid = fid;
len = fi_eq_write(&ep->util_ep.eq->eq_fid, FI_SHUTDOWN,
&cm_entry, sizeof(cm_entry), 0);
if (len < 0)
return (int) len;
break;
}

return FI_SUCCESS;
}
Expand Down

0 comments on commit 53206ab

Please sign in to comment.