Skip to content

Commit

Permalink
misc: add MPIX_{Start/Stop}_progress_thread
Browse files Browse the repository at this point in the history
Add functions to launch progress thread that runs MPIX_Stream_progress.
Also replace the implementation of MPIR_CVAR_ASYNC_PROGRESS with the new
API.
  • Loading branch information
hzhou committed Jul 6, 2022
1 parent 2e87dd0 commit fb9d751
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 35 deletions.
6 changes: 6 additions & 0 deletions src/binding/c/stream_api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ MPIX_Stream_progress:
stream: STREAM, [stream object]
.impl: mpid

MPIX_Start_progress_thread:
stream: STREAM, [stream object]

MPIX_Stop_progress_thread:
stream: STREAM, [stream object]

MPIX_Stream_send:
buf: BUFFER, constant=True, [initial address of send buffer]
count: POLYXFER_NUM_ELEM_NNI, [number of elements in send buffer]
Expand Down
4 changes: 0 additions & 4 deletions src/include/mpir_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,4 @@ int MPIR_Find_external(struct MPIR_Comm *comm, int *external_size_p, int *extern
int MPIR_Get_internode_rank(MPIR_Comm * comm_ptr, int r);
int MPIR_Get_intranode_rank(MPIR_Comm * comm_ptr, int r);

/* Default routines for asynchronous progress thread */
int MPIR_Init_async_thread(void);
int MPIR_Finalize_async_thread(void);

#endif /* MPIR_MISC_H_INCLUDED */
107 changes: 83 additions & 24 deletions src/mpi/init/init_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "mpiimpl.h"
#include "mpi_init.h"
#include "utarray.h"

/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===
Expand Down Expand Up @@ -62,22 +63,34 @@
#define DO_ASYNC_THREAD_AFFINITY
#endif

enum {
MPIR_ASYNC_STATE_UNSET,
MPIR_ASYNC_STATE_RUNNING,
MPIR_ASYNC_STATE_DONE,
};

struct async_thread {
MPID_Thread_id_t thread_id;
MPL_atomic_int_t state;
MPIR_Stream *stream_ptr;
};

static UT_icd icd_async_thread_list = { sizeof(struct async_thread), NULL, NULL, NULL };

static UT_array *async_thread_list;

static int MPIR_async_thread_initialized = 0;
static MPID_Thread_id_t progress_thread_id;
static MPL_atomic_int_t async_done = MPL_ATOMIC_INT_T_INITIALIZER(0);

static void progress_fn(void *data)
{
MPID_Progress_state state;
struct async_thread *p = data;

MPID_THREAD_CS_ENTER(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);

MPID_Progress_start(&state);
while (MPL_atomic_load_int(&async_done) == 0) {
MPID_Progress_test(&state);
while (MPL_atomic_load_int(&p->state) == MPIR_ASYNC_STATE_RUNNING) {
MPID_Stream_progress(p->stream_ptr);
MPID_THREAD_CS_YIELD(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
}
MPID_Progress_end(&state);

MPID_THREAD_CS_EXIT(GLOBAL, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);

Expand Down Expand Up @@ -215,27 +228,55 @@ static int get_thread_affinity(bool * apply_affinity, int **p_thread_affinity, i
}
#endif /* DO_ASYNC_THREAD_AFFINITY */

/* called inside MPID_Init_async_thread to provide device override */
int MPIR_Init_async_thread(void)
static struct async_thread *find_async_thread(MPIR_Stream * stream_ptr)
{
int mpi_errno = MPI_SUCCESS, thr_err;
struct async_thread *p = NULL;
while ((p = (struct async_thread *) utarray_next(async_thread_list, p))) {
if (p->stream_ptr == stream_ptr) {
break;
} else if (stream_ptr && stream_ptr && stream_ptr->vci == stream_ptr->vci) {
/* prevent launch extra progress threads on the same vci, e.g. GPU streams */
break;
}
}
return p;
}

int MPIR_Start_progress_thread_impl(MPIR_Stream * stream_ptr)
{
int mpi_errno = MPI_SUCCESS;
MPIR_FUNC_ENTER;

#ifdef DO_ASYNC_THREAD_AFFINITY
int *thread_affinity = NULL, affinity_idx;
bool apply_affinity;
#endif

MPIR_FUNC_ENTER;
struct async_thread *p = find_async_thread(stream_ptr);
if (p == NULL) {
utarray_extend_back(async_thread_list, MPL_MEM_OTHER);
p = (struct async_thread *) utarray_back(async_thread_list);
p->stream_ptr = stream_ptr;
MPL_atomic_store_int(&p->state, MPIR_ASYNC_STATE_UNSET);
}

if (MPL_atomic_load_int(&p->state) != MPIR_ASYNC_STATE_UNSET) {
goto fn_exit;
}
#ifdef DO_ASYNC_THREAD_AFFINITY
mpi_errno = get_thread_affinity(&apply_affinity, &thread_affinity, &affinity_idx);
MPIR_ERR_CHECK(mpi_errno);
#endif

MPL_atomic_store_int(&p->state, MPIR_ASYNC_STATE_RUNNING);
int err = 0;
MPID_Thread_create((MPID_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err);
MPID_Thread_create((MPID_Thread_func_t) progress_fn, (void *) p, &p->thread_id, &err);
MPIR_ERR_CHECK(mpi_errno);

#ifdef DO_ASYNC_THREAD_AFFINITY
if (apply_affinity) {
MPL_thread_set_affinity(progress_thread_id, &(thread_affinity[affinity_idx]), 1, &thr_err);
int thr_err;
MPL_thread_set_affinity(p->thread_id, &(thread_affinity[affinity_idx]), 1, &thr_err);
MPIR_ERR_CHKANDJUMP1(thr_err, mpi_errno, MPI_ERR_OTHER, "**set_thread_affinity",
"**set_thread_affinity %d", thread_affinity[affinity_idx]);
}
Expand All @@ -244,24 +285,30 @@ int MPIR_Init_async_thread(void)
MPIR_FUNC_EXIT;

fn_exit:
#ifdef DO_ASYNC_THREAD_AFFINITY
MPL_free(thread_affinity);
#endif
return mpi_errno;
fn_fail:
goto fn_exit;
}

/* called inside MPID_Finalize_async_thread to provide device override */
int MPIR_Finalize_async_thread(void)
int MPIR_Stop_progress_thread_impl(MPIR_Stream * stream_ptr)
{
int mpi_errno = MPI_SUCCESS;

MPIR_FUNC_ENTER;

MPL_atomic_store_int(&async_done, 1);
MPID_Thread_join(progress_thread_id);
struct async_thread *p = find_async_thread(stream_ptr);
if (p == NULL || MPL_atomic_load_int(&p->state) == MPIR_ASYNC_STATE_UNSET) {
goto fn_exit;
}

MPIR_FUNC_EXIT;
MPL_atomic_store_int(&p->state, MPIR_ASYNC_STATE_DONE);
MPID_Thread_join(p->thread_id);
MPL_atomic_store_int(&p->state, MPIR_ASYNC_STATE_UNSET);

fn_exit:
MPIR_FUNC_EXIT;
return mpi_errno;
}

Expand All @@ -270,12 +317,16 @@ int MPII_init_async(void)
{
int mpi_errno = MPI_SUCCESS;

if (async_thread_list) {
goto fn_exit;
}

utarray_new(async_thread_list, &icd_async_thread_list, MPL_MEM_OTHER);

if (MPIR_CVAR_ASYNC_PROGRESS) {
if (MPIR_ThreadInfo.thread_provided == MPI_THREAD_MULTIPLE) {
mpi_errno = MPID_Init_async_thread();
if (mpi_errno)
goto fn_fail;

MPIR_ERR_CHECK(mpi_errno);
MPIR_async_thread_initialized = 1;
} else {
printf("WARNING: No MPI_THREAD_MULTIPLE support (needed for async progress)\n");
Expand All @@ -298,16 +349,24 @@ int MPII_finalize_async(void)
mpi_errno = MPID_Finalize_async_thread();
}

/* stop any user launched progress threads */
struct async_thread *p = NULL;
while ((p = (struct async_thread *) utarray_next(async_thread_list, p))) {
mpi_errno = MPIR_Stop_progress_thread_impl(p->stream_ptr);
}

utarray_free(async_thread_list);
async_thread_list = NULL;
return mpi_errno;
}

#else
int MPIR_Finalize_async_thread(void)
int MPIR_Start_progress_thread_impl(MPIR_Stream * stream_ptr)
{
return MPI_SUCCESS;
}

int MPIR_Init_async_thread(void)
int MPIR_Stop_progress_thread_impl(MPIR_Stream * stream_ptr)
{
return MPI_SUCCESS;
}
Expand Down
7 changes: 4 additions & 3 deletions src/mpi/init/mpir_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,14 @@ int MPII_Init_thread(int *argc, char ***argv, int user_required, int *provided,
}

MPII_world_set_initilized();

mpi_errno = MPII_init_async();
MPIR_ERR_CHECK(mpi_errno);
}
if (provided) {
*provided = MPIR_ThreadInfo.thread_provided;
}

mpi_errno = MPII_init_async();
MPIR_ERR_CHECK(mpi_errno);

MPL_initlock_unlock(&MPIR_init_lock);
return mpi_errno;

Expand Down
4 changes: 2 additions & 2 deletions src/mpid/ch3/include/mpidpost.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ int MPIDI_CH3I_Comm_set_hints(MPIR_Comm *, MPIR_Info *info_ptr);
*/
MPL_STATIC_INLINE_PREFIX int MPID_Init_async_thread(void)
{
return MPIR_Init_async_thread();
return MPIR_Start_progress_thread_impl(NULL);
}

MPL_STATIC_INLINE_PREFIX int MPID_Finalize_async_thread(void)
{
return MPIR_Finalize_async_thread();
return MPIR_Stop_progress_thread_impl(NULL);
}

MPL_STATIC_INLINE_PREFIX int MPID_Test(MPIR_Request * request_ptr, int *flag, MPI_Status * status)
Expand Down
4 changes: 2 additions & 2 deletions src/mpid/ch4/include/mpidpost.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ MPL_STATIC_INLINE_PREFIX void MPID_Request_destroy_hook(MPIR_Request * req)
*/
MPL_STATIC_INLINE_PREFIX int MPID_Init_async_thread(void)
{
return MPIR_Init_async_thread();
return MPIR_Start_progress_thread_impl(NULL);
}

MPL_STATIC_INLINE_PREFIX int MPID_Finalize_async_thread(void)
{
return MPIR_Finalize_async_thread();
return MPIR_Stop_progress_thread_impl(NULL);
}

#endif /* MPIDPOST_H_INCLUDED */

0 comments on commit fb9d751

Please sign in to comment.