Skip to content

Commit

Permalink
sessions:deworldify behavior of pmix pset lookup
Browse files Browse the repository at this point in the history
It turns out that the existing ompi_instance_group_pmix_pset
implementation assumes an MPI_COMM_WORLD type of model.

This prevents the ability to use more dynamically generated process
sets, possibly using an external agent.

Swith to using the pmix pset membership query to find new pset
membership.

Related to open-mpi#10862

Signed-off-by: Howard Pritchard <[email protected]>
  • Loading branch information
hppritcha committed Oct 4, 2022
1 parent 6755fe7 commit e45025a
Showing 1 changed file with 64 additions and 53 deletions.
117 changes: 64 additions & 53 deletions ompi/instance/instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -1204,71 +1204,82 @@ static int ompi_instance_group_self (ompi_instance_t *instance, ompi_group_t **g

static int ompi_instance_group_pmix_pset (ompi_instance_t *instance, const char *pset_name, ompi_group_t **group_out)
{
int ret = OMPI_SUCCESS;
size_t i,n;
bool isnew, try_again = false, refresh = true;
pmix_status_t rc;
pmix_proc_t p;
ompi_group_t *group;
pmix_value_t *pval = NULL;
char *stmp = NULL;
size_t size = 0;

/* make the group large enough to hold world */
group = ompi_group_allocate (NULL, ompi_process_info.num_procs);
if (OPAL_UNLIKELY(NULL == group)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
ompi_group_t *group = NULL;
pmix_query_t query;
pmix_info_t *info = NULL;
size_t ninfo;
opal_process_name_t pname;

PMIX_QUERY_CONSTRUCT(&query);
PMIX_ARGV_APPEND(rc, query.keys, PMIX_QUERY_PSET_MEMBERSHIP);
PMIX_INFO_CREATE(query.qualifiers, 1);
PMIX_INFO_LOAD(&query.qualifiers[0], PMIX_PSET_NAME, pset_name, PMIX_STRING);

for (size_t i = 0 ; i < ompi_process_info.num_procs ; ++i) {
opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid};
/*
* First try finding in the local PMIx cache, if not found, try a refresh
*/
fn_try_again:
rc = PMIx_Query_info(&query, 1, &info, &ninfo);
if (PMIX_SUCCESS != (rc = PMIx_Query_info(&query, 1, &info, &ninfo)) || 0 == ninfo) {
if ((PMIX_ERR_NOT_FOUND == rc) && (false == try_again)) {
try_again = true;
PMIX_QUERY_DESTRUCT(&query);
PMIX_QUERY_CONSTRUCT(&query);
PMIX_ARGV_APPEND(rc, query.keys, PMIX_QUERY_PSET_MEMBERSHIP);
PMIX_INFO_CREATE(query.qualifiers, 2);
PMIX_INFO_LOAD(&query.qualifiers[0], PMIX_PSET_NAME, pset_name, PMIX_STRING);
PMIX_INFO_LOAD(&query.qualifiers[1], PMIX_QUERY_REFRESH_CACHE, &refresh, PMIX_BOOL);
goto fn_try_again;
}
ret = opal_pmix_convert_status(rc);
ompi_instance_print_error ("PMIx_Query_info() failed", ret);
goto fn_w_query;
}

OPAL_PMIX_CONVERT_NAME(&p, &name);
rc = PMIx_Get(&p, PMIX_PSET_NAME, NULL, 0, &pval);
if (OPAL_UNLIKELY(PMIX_SUCCESS != rc)) {
OBJ_RELEASE(group);
return opal_pmix_convert_status(rc);
}
for(n = 0; n < ninfo; n++){
if(0 == strcmp(info[n].key, PMIX_QUERY_PSET_MEMBERSHIP)){

pmix_data_array_t *data_array = info[n].value.data.darray;
pmix_proc_t *members_array = (pmix_proc_t*) data_array->array;

PMIX_VALUE_UNLOAD(rc,
pval,
(void **)&stmp,
&size);
if (0 != strcmp (pset_name, stmp)) {
PMIX_VALUE_RELEASE(pval);
free(stmp);
continue;
}
PMIX_VALUE_RELEASE(pval);
free(stmp);
group = ompi_group_allocate (NULL, data_array->size);
if (OPAL_UNLIKELY(NULL == group)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto fn_w_info;
}

/* look for existing ompi_proc_t that matches this name */
group->grp_proc_pointers[size] = (ompi_proc_t *) ompi_proc_lookup (name);
if (NULL == group->grp_proc_pointers[size]) {
/* set sentinel value */
group->grp_proc_pointers[size] = (ompi_proc_t *) ompi_proc_name_to_sentinel (name);
} else {
OBJ_RETAIN (group->grp_proc_pointers[size]);
for(i = 0; i < data_array->size; i++){
OPAL_PMIX_CONVERT_PROCT(ret, &pname, &members_array[i]);
if (OPAL_SUCCESS == rc) {
group->grp_proc_pointers[i] = ompi_proc_find_and_add(&pname,&isnew);
} else {
ompi_instance_print_error ("OPAL_PMIX_CONVERT_PROCT failed %d", ret);
ompi_group_free(&group);
goto fn_w_info;
}
}
break;
}
++size;
}

/* shrink the proc array if needed */
if (size < (size_t) group->grp_proc_count) {
void *tmp = realloc (group->grp_proc_pointers, size * sizeof (group->grp_proc_pointers[0]));
if (OPAL_UNLIKELY(NULL == tmp)) {
OBJ_RELEASE(group);
return OMPI_ERR_OUT_OF_RESOURCE;
}

group->grp_proc_pointers = (ompi_proc_t **) tmp;
group->grp_proc_count = (int) size;
if (NULL != group) {
ompi_set_group_rank (group, ompi_proc_local());
group->grp_instance = instance;
*group_out = group;
} else {
ret = OMPI_ERR_NOT_FOUND;
}

ompi_set_group_rank (group, ompi_proc_local());
fn_w_info:
PMIX_INFO_DESTRUCT(info);
fn_w_query:
PMIX_QUERY_DESTRUCT(&query);

group->grp_instance = instance;

*group_out = group;
return OMPI_SUCCESS;
return ret;
}

static int ompi_instance_get_pmix_pset_size (ompi_instance_t *instance, const char *pset_name, size_t *size_out)
Expand Down

0 comments on commit e45025a

Please sign in to comment.