Skip to content

Commit

Permalink
coll/han: implement hierarchical gatherv
Browse files Browse the repository at this point in the history
Add gatherv implementation to optimize large-scale communications on
multiple nodes and multiple processes per node, by avoiding high-incast
traffic on the root process.

Because *V collectives do not have equal datatype/count on every
process, it does not natively support message-size based tuning without
an additional global communication.

Similar to gather and allgather, the hierarchical gatherv requires a
temporary buffer and memory copy to handle out-of-order data, or
non-contiguous placement on the output buffer, which results in worse
performance for large messages compared to the linear implementation.

Signed-off-by: Wenduo Wang <[email protected]>
  • Loading branch information
wenduwan committed Mar 22, 2024
1 parent b22e5fa commit 48c125e
Show file tree
Hide file tree
Showing 10 changed files with 620 additions and 2 deletions.
4 changes: 3 additions & 1 deletion ompi/mca/coll/han/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ coll_han_bcast.c \
coll_han_reduce.c \
coll_han_scatter.c \
coll_han_gather.c \
coll_han_gatherv.c \
coll_han_allreduce.c \
coll_han_allgather.c \
coll_han_component.c \
Expand All @@ -31,7 +32,8 @@ coll_han_algorithms.c \
coll_han_dynamic.c \
coll_han_dynamic_file.c \
coll_han_topo.c \
coll_han_subcomms.c
coll_han_subcomms.c \
coll_han_utils.c

# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
Expand Down
20 changes: 20 additions & 0 deletions ompi/mca/coll/han/coll_han.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ typedef struct mca_coll_han_op_module_name_t {
mca_coll_han_op_up_low_module_name_t allreduce;
mca_coll_han_op_up_low_module_name_t allgather;
mca_coll_han_op_up_low_module_name_t gather;
mca_coll_han_op_up_low_module_name_t gatherv;
mca_coll_han_op_up_low_module_name_t scatter;
} mca_coll_han_op_module_name_t;

Expand Down Expand Up @@ -235,6 +236,10 @@ typedef struct mca_coll_han_component_t {
uint32_t han_gather_up_module;
/* low level module for gather */
uint32_t han_gather_low_module;
/* up level module for gatherv */
uint32_t han_gatherv_up_module;
/* low level module for gatherv */
uint32_t han_gatherv_low_module;
/* up level module for scatter */
uint32_t han_scatter_up_module;
/* low level module for scatter */
Expand Down Expand Up @@ -279,6 +284,7 @@ typedef struct mca_coll_han_single_collective_fallback_s {
mca_coll_base_module_barrier_fn_t barrier;
mca_coll_base_module_bcast_fn_t bcast;
mca_coll_base_module_gather_fn_t gather;
mca_coll_base_module_gatherv_fn_t gatherv;
mca_coll_base_module_reduce_fn_t reduce;
mca_coll_base_module_scatter_fn_t scatter;
} module_fn;
Expand All @@ -298,6 +304,7 @@ typedef struct mca_coll_han_collectives_fallback_s {
mca_coll_han_single_collective_fallback_t bcast;
mca_coll_han_single_collective_fallback_t reduce;
mca_coll_han_single_collective_fallback_t gather;
mca_coll_han_single_collective_fallback_t gatherv;
mca_coll_han_single_collective_fallback_t scatter;
} mca_coll_han_collectives_fallback_t;

Expand Down Expand Up @@ -371,6 +378,9 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
#define previous_gather fallback.gather.module_fn.gather
#define previous_gather_module fallback.gather.module

#define previous_gatherv fallback.gatherv.module_fn.gatherv
#define previous_gatherv_module fallback.gatherv.module

#define previous_scatter fallback.scatter.module_fn.scatter
#define previous_scatter_module fallback.scatter.module

Expand All @@ -394,6 +404,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, bcast); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatter); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gather); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gatherv); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, reduce); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, allreduce); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, allgather); \
Expand Down Expand Up @@ -476,6 +487,9 @@ int
mca_coll_han_gather_intra_dynamic(GATHER_BASE_ARGS,
mca_coll_base_module_t *module);
int
mca_coll_han_gatherv_intra_dynamic(GATHERV_BASE_ARGS,
mca_coll_base_module_t *module);
int
mca_coll_han_reduce_intra_dynamic(REDUCE_BASE_ARGS,
mca_coll_base_module_t *module);
int
Expand All @@ -493,4 +507,10 @@ ompi_coll_han_reorder_gather(const void *sbuf,
struct ompi_communicator_t *comm,
int * topo);

size_t
coll_han_utils_gcd(const size_t *numerators, const size_t size);

int
coll_han_utils_create_contiguous_datatype(size_t count, const ompi_datatype_t *oldType,
ompi_datatype_t **newType);
#endif /* MCA_COLL_HAN_EXPORT_H */
4 changes: 4 additions & 0 deletions ompi/mca/coll/han/coll_han_algorithms.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ mca_coll_han_algorithm_value_t* mca_coll_han_available_algorithms[COLLCOUNT] =
{"simple", (fnptr_t) &mca_coll_han_gather_intra_simple}, // 2-level
{ 0 }
},
[GATHERV] = (mca_coll_han_algorithm_value_t[]){
{"intra", (fnptr_t) &mca_coll_han_gatherv_intra}, // 2-level
{ 0 }
},
[ALLGATHER] = (mca_coll_han_algorithm_value_t[]){
{"intra", (fnptr_t)&mca_coll_han_allgather_intra}, // 2-level
{"simple", (fnptr_t)&mca_coll_han_allgather_intra_simple}, // 2-level
Expand Down
7 changes: 7 additions & 0 deletions ompi/mca/coll/han/coll_han_algorithms.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ mca_coll_han_gather_intra_simple(const void *sbuf, int scount,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

/* Gatherv */
int
mca_coll_han_gatherv_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
void *rbuf, const int *rcounts, const int *displs,
struct ompi_datatype_t *rdtype, int root,
struct ompi_communicator_t *comm, mca_coll_base_module_t *module);

/* Allgather */
int
mca_coll_han_allgather_intra(const void *sbuf, int scount,
Expand Down
17 changes: 17 additions & 0 deletions ompi/mca/coll/han/coll_han_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ static int han_close(void)
free(mca_coll_han_component.han_op_module_name.gather.han_op_low_module_name);
mca_coll_han_component.han_op_module_name.gather.han_op_low_module_name = NULL;

free(mca_coll_han_component.han_op_module_name.gatherv.han_op_up_module_name);
mca_coll_han_component.han_op_module_name.gatherv.han_op_up_module_name = NULL;
free(mca_coll_han_component.han_op_module_name.gatherv.han_op_low_module_name);
mca_coll_han_component.han_op_module_name.gatherv.han_op_low_module_name = NULL;

free(mca_coll_han_component.han_op_module_name.scatter.han_op_up_module_name);
mca_coll_han_component.han_op_module_name.scatter.han_op_up_module_name = NULL;
free(mca_coll_han_component.han_op_module_name.scatter.han_op_low_module_name);
Expand Down Expand Up @@ -344,6 +349,18 @@ static int han_register(void)
OPAL_INFO_LVL_9, &cs->han_gather_low_module,
&cs->han_op_module_name.gather.han_op_low_module_name);

cs->han_gatherv_up_module = 0;
(void) mca_coll_han_query_module_from_mca(c, "gatherv_up_module",
"up level module for gatherv, 0 basic",
OPAL_INFO_LVL_9, &cs->han_gatherv_up_module,
&cs->han_op_module_name.gatherv.han_op_up_module_name);

cs->han_gatherv_low_module = 0;
(void) mca_coll_han_query_module_from_mca(c, "gatherv_low_module",
"low level module for gatherv, 0 basic",
OPAL_INFO_LVL_9, &cs->han_gatherv_low_module,
&cs->han_op_module_name.gatherv.han_op_low_module_name);

cs->han_scatter_up_module = 0;
(void) mca_coll_han_query_module_from_mca(c, "scatter_up_module",
"up level module for scatter, 0 libnbc, 1 adapt",
Expand Down
100 changes: 100 additions & 0 deletions ompi/mca/coll/han/coll_han_dynamic.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "ompi/mca/coll/han/coll_han_algorithms.h"
#include "ompi/mca/coll/base/coll_base_util.h"

#define MCA_COLL_HAN_ANY_MESSAGE_SIZE 0

/*
* Tests if a dynamic collective is implemented
* Useful for file reading warnings and MCA parameter generation
Expand All @@ -41,6 +43,7 @@ bool mca_coll_han_is_coll_dynamic_implemented(COLLTYPE_T coll_id)
case BARRIER:
case BCAST:
case GATHER:
case GATHERV:
case REDUCE:
case SCATTER:
return true;
Expand Down Expand Up @@ -1045,6 +1048,103 @@ mca_coll_han_gather_intra_dynamic(const void *sbuf, int scount,
sub_module);
}

/*
* Gatherv selector:
* On a sub-communicator, checks the stored rules to find the module to use
* On the global communicator, calls the han collective implementation, or
* calls the correct module if fallback mechanism is activated
*/
int mca_coll_han_gatherv_intra_dynamic(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
void *rbuf, const int *rcounts, const int *displs,
struct ompi_datatype_t *rdtype, int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_han_module_t *han_module = (mca_coll_han_module_t*) module;
TOPO_LVL_T topo_lvl = han_module->topologic_level;
mca_coll_base_module_gatherv_fn_t gatherv;
mca_coll_base_module_t *sub_module;
int rank, verbosity = 0;

if (!han_module->enabled) {
return han_module->previous_gatherv(sbuf, scount, sdtype, rbuf, rcounts, displs, rdtype,
root, comm, han_module->previous_gatherv_module);
}

/* v collectives do not support message-size based dynamic rules */
sub_module = get_module(GATHERV, MCA_COLL_HAN_ANY_MESSAGE_SIZE, comm, han_module);

/* First errors are always printed by rank 0 */
rank = ompi_comm_rank(comm);
if( (0 == rank) && (han_module->dynamic_errors < mca_coll_han_component.max_dynamic_errors) ) {
verbosity = 30;
}

if(NULL == sub_module) {
/*
* No valid collective module from dynamic rules
* nor from mca parameter
*/
han_module->dynamic_errors++;
opal_output_verbose(verbosity, mca_coll_han_component.han_output,
"coll:han:mca_coll_han_gatherv_intra_dynamic "
"HAN did not find any valid module for collective %d (%s) "
"with topological level %d (%s) on communicator (%s/%s). "
"Please check dynamic file/mca parameters\n",
GATHERV, mca_coll_base_colltype_to_str(GATHERV),
topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl),
ompi_comm_print_cid(comm), comm->c_name);
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"HAN/GATHERV: No module found for the sub-communicator. "
"Falling back to another component\n"));
gatherv = han_module->previous_gatherv;
sub_module = han_module->previous_gatherv_module;
} else if (NULL == sub_module->coll_gatherv) {
/*
* No valid collective from dynamic rules
* nor from mca parameter
*/
han_module->dynamic_errors++;
opal_output_verbose(verbosity, mca_coll_han_component.han_output,
"coll:han:mca_coll_han_gatherv_intra_dynamic "
"HAN found valid module for collective %d (%s) "
"with topological level %d (%s) on communicator (%s/%s) "
"but this module cannot handle this collective. "
"Please check dynamic file/mca parameters\n",
GATHERV, mca_coll_base_colltype_to_str(GATHERV),
topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl),
ompi_comm_print_cid(comm), comm->c_name);
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"HAN/GATHERV: the module found for the sub-"
"communicator cannot handle the GATHERV operation. "
"Falling back to another component\n"));
gatherv = han_module->previous_gatherv;
sub_module = han_module->previous_gatherv_module;
} else if (GLOBAL_COMMUNICATOR == topo_lvl && sub_module == module) {
/*
* No fallback mechanism activated for this configuration
* sub_module is valid
* sub_module->coll_gatherv is valid and point to this function
* Call han topological collective algorithm
*/
int algorithm_id = get_algorithm(GATHERV, MCA_COLL_HAN_ANY_MESSAGE_SIZE, comm, han_module);
gatherv = (mca_coll_base_module_gatherv_fn_t) mca_coll_han_algorithm_id_to_fn(GATHERV, algorithm_id);
if (NULL == gatherv) { /* default behaviour */
gatherv = mca_coll_han_gatherv_intra;
}
} else {
/*
* If we get here:
* sub_module is valid
* sub_module->coll_gatherv is valid
* They points to the collective to use, according to the dynamic rules
* Selector's job is done, call the collective
*/
gatherv = sub_module->coll_gatherv;
}
return gatherv(sbuf, scount, sdtype, rbuf, rcounts, displs, rdtype, root, comm, sub_module);
}


/*
* Reduce selector:
Expand Down
Loading

0 comments on commit 48c125e

Please sign in to comment.