From 53206ab6d50d31e160ba63c74af29c0183773047 Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Tue, 3 Mar 2020 22:12:49 +0100 Subject: [PATCH] tcp provider: Early remove ep from polling after shutdown I noticed that the polling thread in our applicatioon is spinning for some time after calling fi_shutdown(). Reason is that we only call fi_close() after all references to our internal connection handling are dropped and that can be much later than the call of fi_close() and the FI_SHUTDOWN event handling. Our polling thread then started to spin without ever going to sleep. The solution here is to remove the endpoint from polling after the TCPX_EP_SHUTDOWN flag is set and only after another round of polling in tcpx_progress() - similar to what the socket provider does. Change-Id: I66ea20bf344ae8f74a67532657d0e1d199638391 Signed-off-by: Bernd Schubert --- prov/tcp/src/tcpx.h | 3 +++ prov/tcp/src/tcpx_ep.c | 22 +++++++++++++++++----- prov/tcp/src/tcpx_progress.c | 34 ++++++++++++++++++++++++---------- 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/prov/tcp/src/tcpx.h b/prov/tcp/src/tcpx.h index 19197535298..c5b342cebaf 100644 --- a/prov/tcp/src/tcpx.h +++ b/prov/tcp/src/tcpx.h @@ -136,6 +136,7 @@ enum tcpx_cm_state { TCPX_EP_CONNECTING, TCPX_EP_CONNECTED, TCPX_EP_SHUTDOWN, + TCPX_EP_POLL_REMOVED, TCPX_EP_ERROR, }; @@ -288,6 +289,8 @@ int tcpx_read_to_buffer(SOCKET sock, struct stage_buf *stage_buf); struct tcpx_xfer_entry *tcpx_xfer_entry_alloc(struct tcpx_cq *cq, enum tcpx_xfer_op_codes type); + +void tcpx_ep_wait_fd_del(struct tcpx_ep *ep); void tcpx_xfer_entry_release(struct tcpx_cq *tcpx_cq, struct tcpx_xfer_entry *xfer_entry); void tcpx_srx_xfer_release(struct tcpx_rx_ctx *srx_ctx, diff --git a/prov/tcp/src/tcpx_ep.c b/prov/tcp/src/tcpx_ep.c index d1fdc75e6ca..47a5b889759 100644 --- a/prov/tcp/src/tcpx_ep.c +++ b/prov/tcp/src/tcpx_ep.c @@ -347,17 +347,18 @@ static void tcpx_ep_tx_rx_queues_release(struct tcpx_ep *ep) fastlock_release(&ep->lock); } -static int tcpx_ep_close(struct fid *fid) +/** + * Release the ep from polling + */ +void tcpx_ep_wait_fd_del(struct tcpx_ep *ep) { + FI_DBG(&tcpx_prov, FI_LOG_EP_CTRL, "releasing ep=%p\n", ep); + struct tcpx_eq *eq; - struct tcpx_ep *ep = container_of(fid, struct tcpx_ep, - util_ep.ep_fid.fid); eq = container_of(ep->util_ep.eq, struct tcpx_eq, util_eq); - tcpx_ep_tx_rx_queues_release(ep); - /* eq->close_lock protects from processing stale connection events */ fastlock_acquire(&eq->close_lock); if (ep->util_ep.rx_cq) @@ -370,6 +371,17 @@ static int tcpx_ep_close(struct fid *fid) ofi_wait_fd_del(ep->util_ep.eq->wait, ep->sock); fastlock_release(&eq->close_lock); +} + +static int tcpx_ep_close(struct fid *fid) +{ + struct tcpx_ep *ep = container_of(fid, struct tcpx_ep, + util_ep.ep_fid.fid); + + tcpx_ep_tx_rx_queues_release(ep); + + tcpx_ep_wait_fd_del(ep); /* ensure that everything is really released */ + ofi_eq_remove_fid_events(ep->util_ep.eq, &ep->util_ep.ep_fid.fid); ofi_close_socket(ep->sock); ofi_endpoint_close(&ep->util_ep); diff --git a/prov/tcp/src/tcpx_progress.c b/prov/tcp/src/tcpx_progress.c index e775b64b401..12024a52d69 100644 --- a/prov/tcp/src/tcpx_progress.c +++ b/prov/tcp/src/tcpx_progress.c @@ -72,21 +72,35 @@ static void tcpx_report_error(struct tcpx_ep *tcpx_ep, int err) &err_entry, sizeof(err_entry), UTIL_FLAG_ERROR); } +/** + * Shutdown is done in two phases, phase1 writes the FI_SHUTDOWN event, which + * a polling thread still needs to handle, phase2 removes the fd + * of the ep from polling, so that a polling thread won't spin + * if it does not close the connection immediately after it handled + * FI_SHUTDOWN + */ int tcpx_ep_shutdown_report(struct tcpx_ep *ep, fid_t fid) { struct fi_eq_cm_entry cm_entry = {0}; ssize_t len; - if (ep->cm_state == TCPX_EP_SHUTDOWN) - return FI_SUCCESS; - - tcpx_cq_report_xfer_fail(ep, -FI_ENOTCONN); - ep->cm_state = TCPX_EP_SHUTDOWN; - cm_entry.fid = fid; - len = fi_eq_write(&ep->util_ep.eq->eq_fid, FI_SHUTDOWN, - &cm_entry, sizeof(cm_entry), 0); - if (len < 0) - return (int) len; + switch (ep->cm_state) { + case TCPX_EP_POLL_REMOVED: + break; + case TCPX_EP_SHUTDOWN: + tcpx_ep_wait_fd_del(ep); + ep->cm_state = TCPX_EP_POLL_REMOVED; + break; + default: + tcpx_cq_report_xfer_fail(ep, -FI_ENOTCONN); + ep->cm_state = TCPX_EP_SHUTDOWN; + cm_entry.fid = fid; + len = fi_eq_write(&ep->util_ep.eq->eq_fid, FI_SHUTDOWN, + &cm_entry, sizeof(cm_entry), 0); + if (len < 0) + return (int) len; + break; + } return FI_SUCCESS; }