Skip to content

Commit

Permalink
prov/cxi: synchronous fi_close on collective multicast
Browse files Browse the repository at this point in the history
The fi_close() operation manages its internal state
and return FI_SUCCESS, or a fatal error code on error.

Signed-off-by: Md Bulbul Sharif <[email protected]>
  • Loading branch information
bulbul-hpe authored and swelch committed Dec 2, 2024
1 parent ae133cc commit f26695b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 11 deletions.
3 changes: 3 additions & 0 deletions prov/cxi/include/cxip.h
Original file line number Diff line number Diff line change
Expand Up @@ -2957,6 +2957,9 @@ struct cxip_coll_mc {
int next_red_id; // next available red_id
int max_red_id; // limit total concurrency
int seqno; // rolling seqno for packets
int close_state; // the state of the close operation
bool has_closed; // true after a mc close call
bool has_error; // true if any error
bool is_multicast; // true if multicast address
bool arm_disable; // arm-disable for testing
bool retry_disable; // retry-disable for testing
Expand Down
81 changes: 70 additions & 11 deletions prov/cxi/src/cxip_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -2704,13 +2704,17 @@ static void _curl_delete_mc_obj(struct cxip_coll_mc *mc_obj);
static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle);

/* Close multicast collective object */
static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete)
static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete, bool has_error)
{
int count;

if (!mc_obj)
return;
TRACE_JOIN("%s starting MC cleanup\n", __func__);

mc_obj->has_closed = true;
mc_obj->has_error = has_error;

/* clear the mcast_addr -> mc_obj reference*/
ofi_idm_clear(&mc_obj->ep_obj->coll.mcast_map, mc_obj->mcast_addr);
mc_obj->ep_obj->coll.is_hwroot = false;
Expand Down Expand Up @@ -2739,10 +2743,18 @@ static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete)
cxip_env.coll_fm_timeout_msec/1000,
(cxip_env.coll_fm_timeout_msec%1000)*1000000};

if (!mc_obj->has_error)
mc_obj->close_state = -FI_EAGAIN;

_tsset(&mc_obj->curlexpires, &expires);
_curl_delete_mc_obj(mc_obj);
} else
free(mc_obj);
} else {
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = FI_SUCCESS;
}
}
}

/* The user can close an individual collective MC address. It must do so on
Expand All @@ -2752,11 +2764,37 @@ static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete)
static int _fi_close_mc(struct fid *fid)
{
struct cxip_coll_mc *mc_obj;
int ret = FI_SUCCESS;

TRACE_JOIN("%s: closing MC\n", __func__);
mc_obj = container_of(fid, struct cxip_coll_mc, mc_fid.fid);
_close_mc(mc_obj, true);
return FI_SUCCESS;
if (!mc_obj) {
TRACE_JOIN("%s: MC object is null\n", __func__);
return ret;
} else if (mc_obj->has_closed) {
TRACE_JOIN("%s: close already called before\n", __func__);
return ret;
} else if (mc_obj->has_error) {
TRACE_JOIN("%s: encounted an error earlier\n", __func__);
return ret;
}

_close_mc(mc_obj, true, false);
while (mc_obj && (ret = mc_obj->close_state) == -FI_EAGAIN) {
ret = cxip_curl_progress(NULL);
if (ret == -FI_EAGAIN) {
usleep(10);
continue;
}
if (ret < 0 && ret != -FI_ENODATA) {
TRACE_JOIN("%s: Curl progress failed, error=%d\n", __func__, ret);
break;
}
usleep(10);
}
free(mc_obj);

return ret;
}

/* multicast object libfabric functions */
Expand Down Expand Up @@ -2986,6 +3024,11 @@ static int _initialize_mc(void *ptr)
_coll_metrics.ep_data.isroot =
mc_obj->hwroot_idx == mc_obj->mynode_idx;

/* Initially set close states to success */
mc_obj->close_state = FI_SUCCESS;
mc_obj->has_closed = false;
mc_obj->has_error = false;

/* Return information to the caller */
jstate->mc_obj = mc_obj;
*jstate->mc = &mc_obj->mc_fid;
Expand All @@ -2996,7 +3039,7 @@ static int _initialize_mc(void *ptr)

fail:
jstate->prov_errno = FI_CXI_ERRNO_JOIN_FAIL_PTE;
_close_mc(mc_obj, true);
_close_mc(mc_obj, true, true);
return ret;
}

Expand Down Expand Up @@ -3076,7 +3119,11 @@ static void _curl_delete_mc_obj(struct cxip_coll_mc *mc_obj)
TRACE_JOIN("CURL delete mcast %d failed\n",
mc_obj->mcast_addr);
free(curl_usrptr);
free(mc_obj);
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = ret;
}
}
}

Expand All @@ -3102,23 +3149,35 @@ static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle)
case 201:
TRACE_JOIN("callback: %ld SUCCESS MCAST DELETED\n",
handle->status);
free(mc_obj);
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = FI_SUCCESS;
}
break;
case 409:
TRACE_JOIN("callback: delete mcast failed: %ld '%s'\n",
handle->status, errmsg);

if (_tsexp(&mc_obj->curlexpires)) {
TRACE_JOIN("callback: FM expired\n");
free(mc_obj);
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = FI_CXI_ERRNO_JOIN_CURL_TIMEOUT;
}
break;
}
/* try again */
_curl_delete_mc_obj(mc_obj);
break;
default:
TRACE_JOIN("callback: %ld unknown status\n", handle->status);
free(mc_obj);
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = FI_CXI_ERRNO_JOIN_CURL_FAILED;
}
break;
}
/* free json memory */
Expand Down Expand Up @@ -4209,7 +4268,7 @@ void cxip_coll_close(struct cxip_ep_obj *ep_obj)
while (!dlist_empty(&ep_obj->coll.mc_list)) {
dlist_pop_front(&ep_obj->coll.mc_list,
struct cxip_coll_mc, mc_obj, entry);
_close_mc(mc_obj, false);
_close_mc(mc_obj, false, true);
}
}

Expand Down

0 comments on commit f26695b

Please sign in to comment.