Skip to content

Commit

Permalink
UCP/WIREUP: Support EP reconfiguration for non-reused p2p scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
shasson5 committed Sep 11, 2024
1 parent 51a4c5a commit 84523e9
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 58 deletions.
263 changes: 211 additions & 52 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -1355,25 +1355,160 @@ static void ucp_wireup_discard_uct_eps(ucp_ep_h ep, uct_ep_h *uct_eps,
}
}

static int
ucp_wireup_are_all_lanes_p2p(ucp_ep_h ep, const ucp_ep_config_key_t *key)
{
ucp_lane_index_t lane;
ucp_rsc_index_t rsc_index;

for (lane = 0; lane < key->num_lanes; ++lane) {
rsc_index = ucp_ep_get_rsc_index(ep, lane);
ucs_assert(rsc_index != UCP_NULL_RESOURCE);

if (ucp_worker_is_tl_2iface(ep->worker, rsc_index)) {
return 0;
}
}

return 1;
}

static unsigned
ucp_ep_num_reused_lanes(ucp_ep_h ep, const ucp_lane_index_t *reuse_lane_map)
{
unsigned num_reused = 0;
ucp_lane_index_t lane;

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
num_reused += (reuse_lane_map[lane] != UCP_NULL_LANE);
}

return num_reused;
}

static int
ucp_wireup_check_is_reconfigurable(ucp_ep_h ep,
const ucp_ep_config_key_t *new_key,
const ucp_unpacked_address_t *remote_address,
const unsigned *addr_indices)
{
ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES];
const ucp_ep_config_key_t *old_key;

if (ucp_ep_has_cm_lane(ep)) {
return 1;
}

old_key = &ucp_ep_config(ep)->key;

/* Verify both old/new configurations have only p2p lanes */
if (!ucp_wireup_are_all_lanes_p2p(ep, old_key) ||
!ucp_wireup_are_all_lanes_p2p(ep, new_key) ||
(old_key->num_lanes != new_key->num_lanes)) {
return 0;
}

ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address,
addr_indices, reuse_lane_map);

/* For now, reconfig is supported only if no lanes are reused */
return ucp_ep_num_reused_lanes(ep, reuse_lane_map) == 0;
}

static ucp_lane_index_t
ucp_wireup_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key,
const ucp_lane_index_t *reuse_lane_map)
{
if (ucp_ep_has_cm_lane(ep)) {
return key->cm_lane;
}

/* Just use first lane, as only non-reused lanes are allowed at the
* moment. */
ucs_assert(key->num_lanes > 0);
ucs_assert(ucp_ep_num_reused_lanes(ep, reuse_lane_map) == 0);
return 0;
}

static ucs_status_t
ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key,
uct_ep_h *new_uct_eps,
const ucp_lane_index_t *reuse_lane_map)
{
uct_ep_h uct_ep = NULL;
ucp_lane_index_t old_lane, new_wireup_lane;
ucp_wireup_ep_t *old_wireup_ep, *new_wireup_ep;
ucp_rsc_index_t aux_rsc_index;
int is_p2p;
ucs_status_t status;

/* Get old wireup lane */
old_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);
old_wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, old_lane));
ucs_assert_always(old_wireup_ep != NULL);

/* Set wireup EP for new configuration's wireup lane */
if (ucp_ep_has_cm_lane(ep)) {
/* Use existing EP from CM lane */
new_wireup_ep = ucp_ep_get_cm_wireup_ep(ep);
ucs_assert(new_wireup_ep != NULL);
aux_rsc_index = ucp_ep_get_rsc_index(ep, old_lane);
} else {
/* Create new EP for non-CM flow */
status = ucp_wireup_ep_create(ep, &uct_ep);
if (status != UCS_OK) {
return status;
}

new_wireup_ep = ucp_wireup_ep(uct_ep);
aux_rsc_index = ucp_wireup_ep_get_aux_rsc_index(
&old_wireup_ep->super.super);
}

ucs_assert(aux_rsc_index != UCP_NULL_RESOURCE);
is_p2p = ucp_ep_config_connect_p2p(ep->worker, &ucp_ep_config(ep)->key,
aux_rsc_index);

/* Move aux EP to new wireup lane */
ucp_wireup_ep_set_aux(new_wireup_ep,
ucp_wireup_ep_extract_msg_ep(old_wireup_ep),
aux_rsc_index, is_p2p);

/* Remove old wireup_ep as it's not needed anymore.
* NOTICE: Next two lines are intentionally not merged with the lane
* removal loop in ucp_wireup_check_config_intersect, because of future
* support for non-wireup EPs reconfiguration (which will modify this
* code). */
uct_ep_destroy(&old_wireup_ep->super.super);
ucp_ep_set_lane(ep, old_lane, NULL);

/* Select CM/non-reused lane as new wireup lane */
new_wireup_lane = ucp_wireup_find_non_reused_lane(ep, key, reuse_lane_map);

new_uct_eps[new_wireup_lane] = &new_wireup_ep->super.super;
key->wireup_msg_lane = new_wireup_lane;
return UCS_OK;
}

static ucs_status_t
ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
const ucp_unpacked_address_t *remote_address,
const unsigned *addr_indices,
ucp_lane_map_t *connect_lane_bitmap,
ucs_queue_head_t *replay_pending_queue)
{
uct_ep_h new_uct_eps[UCP_MAX_LANES] = { NULL };
ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES] = { UCP_NULL_LANE };
ucp_wireup_ep_t *cm_wireup_ep = NULL;
uct_ep_h new_uct_eps[UCP_MAX_LANES] = {NULL};
ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES] = {UCP_NULL_LANE};
ucp_ep_config_key_t *old_key;
ucp_lane_index_t lane, reuse_lane;
uct_ep_h uct_ep;
ucs_status_t status;

*connect_lane_bitmap = UCS_MASK(new_key->num_lanes);

if (!ucp_ep_has_cm_lane(ep) ||
(ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL)) {
if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) ||
!ucp_wireup_check_is_reconfigurable(ep, new_key, remote_address,
addr_indices)) {
/* nothing to intersect with */
return ucp_ep_realloc_lanes(ep, new_key->num_lanes);
}
Expand All @@ -1384,43 +1519,36 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
ucs_assert(ucp_ep_get_lane(ep, lane) != NULL);
}

cm_wireup_ep = ucp_ep_get_cm_wireup_ep(ep);
ucs_assert(cm_wireup_ep != NULL);

old_key = &ucp_ep_config(ep)->key;
ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address,
addr_indices, reuse_lane_map);

/* CM lane has to be reused by the new EP configuration */
ucs_assert(reuse_lane_map[ucp_ep_get_cm_lane(ep)] != UCP_NULL_LANE);
if (ucp_ep_has_cm_lane(ep)) {
/* CM lane has to be reused by the new EP configuration */
ucs_assert(reuse_lane_map[ucp_ep_get_cm_lane(ep)] != UCP_NULL_LANE);
/* wireup lane hasn't been selected by the new configuration: only this
* function should select it */
ucs_assertv(new_key->wireup_msg_lane == UCP_NULL_LANE,
"new_key->wireup_msg_lane=%u", new_key->wireup_msg_lane);
}

/* wireup lane has to be selected for the old configuration */
ucs_assert(old_key->wireup_msg_lane != UCP_NULL_LANE);
/* wireup lane hasn't been selected by the new configuration: only this
* function should select it */
ucs_assert(new_key->wireup_msg_lane == UCP_NULL_LANE);

/* set the correct WIREUP MSG lane in case of CM */
/* set the correct WIREUP MSG lane */
reuse_lane = reuse_lane_map[old_key->wireup_msg_lane];
if (reuse_lane != UCP_NULL_LANE) {
/* previous wireup lane is part of the new configuration, so reuse it */
new_key->wireup_msg_lane = reuse_lane;
} else /* old wireup lane won't be reused */ {
/* previous wireup lane is not part of new configuration, so add it as
* auxiliary endpoint inside cm lane, to be able to continue wireup
* messages exchange */
new_key->wireup_msg_lane = new_key->cm_lane;
reuse_lane = old_key->wireup_msg_lane;
uct_ep = ucp_ep_get_lane(ep, reuse_lane);
ucp_wireup_ep_set_aux(cm_wireup_ep,
ucp_wireup_ep_extract_next_ep(uct_ep),
old_key->lanes[reuse_lane].rsc_index,
ucp_ep_is_lane_p2p(ep, reuse_lane));

/* reset the UCT EP from the previous WIREUP lane and destroy its WIREUP EP,
* since it's not needed anymore in the new configuration, UCT EP will be
* used for sending WIREUP MSGs in the new configuration */
uct_ep_destroy(uct_ep);
ucp_ep_set_lane(ep, reuse_lane, NULL);
* auxiliary endpoint inside CM/non-reused lane, to be able to
* continue wireup messages exchange */
status = ucp_wireup_replace_wireup_msg_lane(ep, new_key, new_uct_eps,
reuse_lane_map);
if (status != UCS_OK) {
return status;
}
}

/* Need to discard only old lanes that won't be used anymore in the new
Expand Down Expand Up @@ -1466,6 +1594,34 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
return UCS_OK;
}

static ucs_status_t
ucp_wireup_try_select_lanes(ucp_ep_h ep, unsigned ep_init_flags,
const ucp_tl_bitmap_t *tl_bitmap,
const ucp_unpacked_address_t *remote_address,
unsigned *addr_indices, ucp_ep_config_key_t *key,
ucp_rsc_index_t *dst_md_storage)
{
ucs_status_t status;

ucp_ep_config_key_reset(key);
ucp_ep_config_key_set_err_mode(key, ep_init_flags);
ucp_ep_config_key_init_flags(key, ep_init_flags);
key->dst_version = remote_address->dst_version;
key->dst_md_cmpts = dst_md_storage;

status = ucp_wireup_select_lanes(ep, ep_init_flags, *tl_bitmap,
remote_address, addr_indices, key, 1);
if (status != UCS_OK) {
return status;
}

/* Get all reachable MDs from full remote address list and join with
* current ep configuration
*/
ucp_wireup_get_reachable_mds(ep, ep_init_flags, remote_address, key);
return UCS_OK;
}

ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
const ucp_tl_bitmap_t *local_tl_bitmap,
const ucp_unpacked_address_t *remote_address,
Expand All @@ -1482,26 +1638,30 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
ucs_status_t status;
char str[32];
ucs_queue_head_t replay_pending_queue;
ucp_rsc_index_t dst_mds_mem[UCP_MAX_MDS];

tl_bitmap = UCS_STATIC_BITMAP_AND(*local_tl_bitmap,
worker->context->tl_bitmap);
ucs_assert(!UCS_STATIC_BITMAP_IS_ZERO(tl_bitmap));

ucs_trace("ep %p: initialize lanes", ep);
ucs_log_indent(1);

ucp_ep_config_key_reset(&key);
ucp_ep_config_key_set_err_mode(&key, ep_init_flags);
ucp_ep_config_key_init_flags(&key, ep_init_flags);
ucp_wireup_eps_pending_extract(ep, &replay_pending_queue);

key.dst_version = remote_address->dst_version;
status = ucp_wireup_try_select_lanes(ep, ep_init_flags, &tl_bitmap,
remote_address, addr_indices, &key,
dst_mds_mem);
if (status != UCS_OK) {
goto out;
}

/* Allow to choose only the lanes that were already chosen for case
* without CM to prevent reconfiguration error.
*/
if ((ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) &&
!ucp_ep_has_cm_lane(ep)) {
!ucp_ep_config_is_equal(&ucp_ep_config(ep)->key, &key) &&
!ucp_wireup_check_is_reconfigurable(ep, &key, remote_address,
addr_indices)) {
/* Allow to choose only the lanes that were already chosen for case
* without CM to prevent reconfiguration error.
*/
UCS_STATIC_BITMAP_RESET_ALL(&current_tl_bitmap);
for (lane = 0; lane < ucp_ep_config(ep)->key.num_lanes; ++lane) {
rsc_idx = ucp_ep_config(ep)->key.lanes[lane].rsc_index;
Expand All @@ -1514,24 +1674,23 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
&worker->context->tl_rscs[rsc_idx].tl_rsc));
}
UCS_STATIC_BITMAP_AND_INPLACE(&tl_bitmap, current_tl_bitmap);

status = ucp_wireup_try_select_lanes(ep, ep_init_flags, &tl_bitmap,
remote_address, addr_indices, &key,
dst_mds_mem);
if (status != UCS_OK) {
goto out;
}
}

status = ucp_wireup_select_lanes(ep, ep_init_flags, tl_bitmap,
remote_address, addr_indices, &key, 1);
status = ucp_wireup_check_config_intersect(ep, &key, remote_address,
addr_indices,
&connect_lane_bitmap,
&replay_pending_queue);
if (status != UCS_OK) {
goto out;
}

ucp_wireup_check_config_intersect(ep, &key, remote_address, addr_indices,
&connect_lane_bitmap,
&replay_pending_queue);

/* Get all reachable MDs from full remote address list and join with
* current ep configuration
*/
key.dst_md_cmpts = ucs_alloca(sizeof(*key.dst_md_cmpts) * UCP_MAX_MDS);
ucp_wireup_get_reachable_mds(ep, ep_init_flags, remote_address, &key);

/* Load new configuration */
status = ucp_worker_get_ep_config(worker, &key, ep_init_flags,
&new_cfg_index);
Expand All @@ -1552,8 +1711,8 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
cm_idx = ep->ext->cm_idx;

if ((ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) &&
/* reconfiguration is allowed for CM flow */
!ucp_ep_has_cm_lane(ep)) {
!ucp_wireup_check_is_reconfigurable(ep, &key, remote_address,
addr_indices)) {
/*
* TODO handle a case where we have to change lanes and reconfigure the ep:
*
Expand Down
31 changes: 28 additions & 3 deletions src/ucp/wireup/wireup_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,35 @@ static ssize_t ucp_wireup_ep_bcopy_send_func(uct_ep_h uct_ep)
return UCS_ERR_NO_RESOURCE;
}

static int ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep_t *wireup_ep)
{
return (wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) ||
(wireup_ep->aux_ep == NULL);
}

uct_ep_h ucp_wireup_ep_extract_msg_ep(ucp_wireup_ep_t *wireup_ep)
{
uct_ep_h msg_ep;

ucs_assertv(ucs_queue_is_empty(&wireup_ep->pending_q), "queue_length=%zu",
ucs_queue_length(&wireup_ep->pending_q));
uct_ep_pending_purge(&wireup_ep->super.super, NULL, NULL);

if (ucp_wireup_ep_is_next_ep_active(wireup_ep)) {
return ucp_wireup_ep_extract_next_ep(&wireup_ep->super.super);
}

msg_ep = wireup_ep->aux_ep;
wireup_ep->aux_ep = NULL;
wireup_ep->aux_rsc_index = UCP_NULL_RESOURCE;
return msg_ep;
}

uct_ep_h ucp_wireup_ep_get_msg_ep(ucp_wireup_ep_t *wireup_ep)
{
uct_ep_h wireup_msg_ep;

if ((wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) || (wireup_ep->aux_ep == NULL)) {
if (ucp_wireup_ep_is_next_ep_active(wireup_ep)) {
wireup_msg_ep = wireup_ep->super.uct_ep;
} else {
wireup_msg_ep = wireup_ep->aux_ep;
Expand Down Expand Up @@ -488,8 +512,9 @@ ucs_status_t ucp_wireup_ep_connect(uct_ep_h uct_ep, unsigned ep_init_flags,
UCT_TL_RESOURCE_DESC_ARG(
&worker->context->tl_rscs[rsc_index].tl_rsc));

/* we need to create an auxiliary transport only for active messages */
if (connect_aux) {
/* We need to create an auxiliary transport only for active messages.
Skip this step if auxiliary already exists. */
if (connect_aux && (wireup_ep->aux_ep == NULL)) {
status = ucp_wireup_ep_connect_aux(wireup_ep, ep_init_flags,
remote_address);
if (status != UCS_OK) {
Expand Down
Loading

0 comments on commit 84523e9

Please sign in to comment.