From 10bf4cac675b479c06c33f275622f717dc170402 Mon Sep 17 00:00:00 2001 From: David Ozog Date: Tue, 30 Jun 2020 18:59:34 -0400 Subject: [PATCH] Refine progress thread for FI_PROGRESS_MANUAL * Probe the CQ on the default context (necessary) * Probe the CQ on all contexts (perhaps not necessary?) * Rearrange progress thread and teams init/fini for correctness * Resolve a race when growing the contexts array and setting new size * Add --enable-progress-thread configure flag (default:disabled) Signed-off-by: David Ozog --- configure.ac | 6 ++++ src/init.c | 8 ++++++ src/shmem_team.h | 2 ++ src/transport_ofi.c | 70 +++++++++++++++++++++++++++++++++++++++------ src/transport_ofi.h | 11 +++++-- 5 files changed, 87 insertions(+), 10 deletions(-) diff --git a/configure.ac b/configure.ac index 6c99d376..2ef0be20 100755 --- a/configure.ac +++ b/configure.ac @@ -96,6 +96,12 @@ AS_IF([test "$enable_completion_polling" = "yes" -o "$enable_manual_progress" = AS_IF([test "$enable_manual_progress" = "yes"], [AC_DEFINE([ENABLE_MANUAL_PROGRESS], [1], [Enable manual progress])]) +AC_ARG_ENABLE([progress-thread], + [AC_HELP_STRING([--enable-progress-thread], + [Enable a thread on each PE to make progress calls with interval SHMEM_PROGRESS_INTERVAL (default:disabled)])]) +AS_IF([test "$enable_progress_thread" = "yes"], + [AC_DEFINE([ENABLE_PROGRESS_THREAD], [1], [Enable progress thread])]) + AC_ARG_ENABLE([total-data-ordering], [AC_HELP_STRING([--enable-total-data-ordering], [Configure handling of total data ordering option. "no" or "never" to build with the assumption total data ordering will never be available. "yes" or "always" to build with the assumption total data ordering will always be available (if not, applications will abort in start_pes()). "check" to build with no assumptions, which may lead to a slight performance decrease on high performance networks. (default: disabled)])]) diff --git a/src/init.c b/src/init.c index 01ca23df..d0758535 100644 --- a/src/init.c +++ b/src/init.c @@ -147,6 +147,10 @@ shmem_internal_shutdown(void) shmem_internal_finalized = 1; +#ifdef USE_OFI + shmem_transport_progress_thread_fini(); +#endif + shmem_internal_team_fini(); shmem_transport_fini(); @@ -517,6 +521,10 @@ shmem_internal_heap_postinit(void) shmem_internal_randr_init(); randr_initialized = 1; +#ifdef USE_OFI + shmem_transport_progress_thread_init(); +#endif + atexit(shmem_internal_shutdown_atexit); shmem_internal_initialized = 1; diff --git a/src/shmem_team.h b/src/shmem_team.h index 19573086..75d3befe 100644 --- a/src/shmem_team.h +++ b/src/shmem_team.h @@ -33,6 +33,8 @@ extern shmem_internal_team_t shmem_internal_team_world; extern shmem_internal_team_t shmem_internal_team_shared; extern shmem_internal_team_t shmem_internal_team_node; +extern shmem_internal_team_t **shmem_internal_team_pool; + enum shmem_internal_team_op_t { SYNC = 0, BCAST, diff --git a/src/transport_ofi.c b/src/transport_ofi.c index 73efa092..61cb3a3f 100644 --- a/src/transport_ofi.c +++ b/src/transport_ofi.c @@ -170,16 +170,74 @@ int shmem_transport_dtype_table[] = { static pthread_t shmem_transport_ofi_progress_thread; static int shmem_transport_ofi_progress_thread_enabled; + static void *shmem_transport_ofi_progress_thread_func(void *arg) { while (__atomic_load_n(&shmem_transport_ofi_progress_thread_enabled, __ATOMIC_ACQUIRE)) { - //shmem_transport_probe(); - printf("prog...\n"); + shmem_transport_full_probe(); usleep(shmem_internal_params.PROGRESS_INTERVAL); } return NULL; } + +void shmem_transport_progress_thread_init(void) +{ + if (shmem_internal_params.PROGRESS_INTERVAL > 0) + pthread_create(&shmem_transport_ofi_progress_thread, NULL, + &shmem_transport_ofi_progress_thread_func, NULL); + return; +} + + +void shmem_transport_progress_thread_fini(void) +{ + if (shmem_internal_params.PROGRESS_INTERVAL > 0) { + __atomic_store_n(&shmem_transport_ofi_progress_thread_enabled, 0, __ATOMIC_RELEASE); + pthread_join(shmem_transport_ofi_progress_thread, NULL); + } + return; +} + + +void shmem_transport_full_probe(void) +{ +#if defined(ENABLE_PROGRESS_THREAD) +# ifdef USE_THREAD_COMPLETION + if (0 == pthread_mutex_trylock(&shmem_transport_ofi_progress_lock)) { +# endif + for (long i = 0; i < shmem_internal_params.TEAMS_MAX; i++) { + if (shmem_internal_team_pool[i] != NULL) { + for (int j = 0; j < shmem_internal_team_pool[i]->contexts_len; j++) { + if (shmem_internal_team_pool[i]->contexts[j] != NULL) { + struct fi_cq_entry buf; + if (!shmem_internal_team_pool[i]->contexts[j]->options & SHMEM_CTX_PRIVATE) { + int ret = fi_cq_read(shmem_internal_team_pool[i]->contexts[j]->cq, &buf, 1); + if (ret == 1) + RAISE_WARN_STR("Unexpected event"); + } + } + } + } + } + struct fi_cq_entry buf; + int ret = fi_cq_read(shmem_transport_ofi_target_cq, &buf, 1); + if (ret == 1) + RAISE_WARN_STR("Unexpected event"); + + ret = fi_cq_read(shmem_transport_ctx_default.cq, &buf, 1); + if (ret == 1) + RAISE_WARN_STR("Unexpected event"); + +# ifdef USE_THREAD_COMPLETION + pthread_mutex_unlock(&shmem_transport_ofi_progress_lock); + } +# endif +#endif + return; +} + + /* Need a syscall to gettid() because glibc doesn't provide a wrapper * (see gettid manpage in the NOTES section): */ static inline @@ -518,7 +576,6 @@ int shmem_transport_ofi_stx_search_shared(long threshold) } - static inline void shmem_transport_ofi_stx_allocate(shmem_transport_ctx_t *ctx) { @@ -1996,10 +2053,6 @@ int shmem_transport_startup(void) ret = populate_av(); if (ret != 0) return ret; - if (shmem_internal_params.PROGRESS_INTERVAL > 0) - pthread_create(&shmem_transport_ofi_progress_thread, NULL, - &shmem_transport_ofi_progress_thread_func, NULL); - return 0; } @@ -2022,8 +2075,9 @@ int shmem_transport_ctx_create(struct shmem_internal_team_t *team, long options, id = team->contexts_len; size_t i = team->contexts_len; + team->contexts = realloc(team->contexts, (i + shmem_transport_ofi_grow_size) * + sizeof(shmem_transport_ctx_t*)); team->contexts_len += shmem_transport_ofi_grow_size; - team->contexts = realloc(team->contexts, team->contexts_len * sizeof(shmem_transport_ctx_t*)); if (team->contexts == NULL) { RAISE_ERROR_STR("Out of memory when allocating OFI ctx array"); diff --git a/src/transport_ofi.h b/src/transport_ofi.h index a403fd9b..a47e9041 100644 --- a/src/transport_ofi.h +++ b/src/transport_ofi.h @@ -379,19 +379,26 @@ extern struct fid_ep* shmem_transport_ofi_target_ep; shmem_free_list_unlock(ctx->bounce_buffers); \ } while (0) +void shmem_transport_progress_thread_init(void); +void shmem_transport_progress_thread_fini(void); +void shmem_transport_full_probe(void); + static inline void shmem_transport_probe(void) { -#if defined(ENABLE_MANUAL_PROGRESS) /* FIXME */ +#if defined(ENABLE_MANUAL_PROGRESS) +# ifdef USE_THREAD_COMPLETION if (0 == pthread_mutex_trylock(&shmem_transport_ofi_progress_lock)) { +# endif struct fi_cq_entry buf; int ret = fi_cq_read(shmem_transport_ofi_target_cq, &buf, 1); if (ret == 1) RAISE_WARN_STR("Unexpected event"); +# ifdef USE_THREAD_COMPLETION pthread_mutex_unlock(&shmem_transport_ofi_progress_lock); } +# endif #endif - return; }