Skip to content

Commit

Permalink
Refine progress thread for FI_PROGRESS_MANUAL
Browse files Browse the repository at this point in the history
* Probe the CQ on the default context (necessary)
* Probe the CQ on all contexts (perhaps not necessary?)
* Rearrange progress thread and teams init/fini for correctness
* Resolve a race when growing the contexts array and setting new size
* Add --enable-progress-thread configure flag (default:disabled)

Signed-off-by: David Ozog <[email protected]>
  • Loading branch information
davidozog committed Oct 16, 2024
1 parent 7148a3a commit 10bf4ca
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 10 deletions.
6 changes: 6 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ AS_IF([test "$enable_completion_polling" = "yes" -o "$enable_manual_progress" =
AS_IF([test "$enable_manual_progress" = "yes"],
[AC_DEFINE([ENABLE_MANUAL_PROGRESS], [1], [Enable manual progress])])

AC_ARG_ENABLE([progress-thread],
[AC_HELP_STRING([--enable-progress-thread],
[Enable a thread on each PE to make progress calls with interval SHMEM_PROGRESS_INTERVAL (default:disabled)])])
AS_IF([test "$enable_progress_thread" = "yes"],
[AC_DEFINE([ENABLE_PROGRESS_THREAD], [1], [Enable progress thread])])

AC_ARG_ENABLE([total-data-ordering],
[AC_HELP_STRING([--enable-total-data-ordering],
[Configure handling of total data ordering option. "no" or "never" to build with the assumption total data ordering will never be available. "yes" or "always" to build with the assumption total data ordering will always be available (if not, applications will abort in start_pes()). "check" to build with no assumptions, which may lead to a slight performance decrease on high performance networks. (default: disabled)])])
Expand Down
8 changes: 8 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ shmem_internal_shutdown(void)

shmem_internal_finalized = 1;

#ifdef USE_OFI
shmem_transport_progress_thread_fini();
#endif

shmem_internal_team_fini();

shmem_transport_fini();
Expand Down Expand Up @@ -517,6 +521,10 @@ shmem_internal_heap_postinit(void)
shmem_internal_randr_init();
randr_initialized = 1;

#ifdef USE_OFI
shmem_transport_progress_thread_init();
#endif

atexit(shmem_internal_shutdown_atexit);
shmem_internal_initialized = 1;

Expand Down
2 changes: 2 additions & 0 deletions src/shmem_team.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ extern shmem_internal_team_t shmem_internal_team_world;
extern shmem_internal_team_t shmem_internal_team_shared;
extern shmem_internal_team_t shmem_internal_team_node;

extern shmem_internal_team_t **shmem_internal_team_pool;

enum shmem_internal_team_op_t {
SYNC = 0,
BCAST,
Expand Down
70 changes: 62 additions & 8 deletions src/transport_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,74 @@ int shmem_transport_dtype_table[] = {
static pthread_t shmem_transport_ofi_progress_thread;
static int shmem_transport_ofi_progress_thread_enabled;


static void *shmem_transport_ofi_progress_thread_func(void *arg)
{
while (__atomic_load_n(&shmem_transport_ofi_progress_thread_enabled, __ATOMIC_ACQUIRE)) {
//shmem_transport_probe();
printf("prog...\n");
shmem_transport_full_probe();
usleep(shmem_internal_params.PROGRESS_INTERVAL);
}
return NULL;
}


void shmem_transport_progress_thread_init(void)
{
if (shmem_internal_params.PROGRESS_INTERVAL > 0)
pthread_create(&shmem_transport_ofi_progress_thread, NULL,
&shmem_transport_ofi_progress_thread_func, NULL);
return;
}


void shmem_transport_progress_thread_fini(void)
{
if (shmem_internal_params.PROGRESS_INTERVAL > 0) {
__atomic_store_n(&shmem_transport_ofi_progress_thread_enabled, 0, __ATOMIC_RELEASE);
pthread_join(shmem_transport_ofi_progress_thread, NULL);
}
return;
}


void shmem_transport_full_probe(void)
{
#if defined(ENABLE_PROGRESS_THREAD)
# ifdef USE_THREAD_COMPLETION
if (0 == pthread_mutex_trylock(&shmem_transport_ofi_progress_lock)) {
# endif
for (long i = 0; i < shmem_internal_params.TEAMS_MAX; i++) {
if (shmem_internal_team_pool[i] != NULL) {
for (int j = 0; j < shmem_internal_team_pool[i]->contexts_len; j++) {
if (shmem_internal_team_pool[i]->contexts[j] != NULL) {
struct fi_cq_entry buf;
if (!shmem_internal_team_pool[i]->contexts[j]->options & SHMEM_CTX_PRIVATE) {
int ret = fi_cq_read(shmem_internal_team_pool[i]->contexts[j]->cq, &buf, 1);
if (ret == 1)
RAISE_WARN_STR("Unexpected event");
}
}
}
}
}
struct fi_cq_entry buf;
int ret = fi_cq_read(shmem_transport_ofi_target_cq, &buf, 1);
if (ret == 1)
RAISE_WARN_STR("Unexpected event");

ret = fi_cq_read(shmem_transport_ctx_default.cq, &buf, 1);
if (ret == 1)
RAISE_WARN_STR("Unexpected event");

# ifdef USE_THREAD_COMPLETION
pthread_mutex_unlock(&shmem_transport_ofi_progress_lock);
}
# endif
#endif
return;
}


/* Need a syscall to gettid() because glibc doesn't provide a wrapper
* (see gettid manpage in the NOTES section): */
static inline
Expand Down Expand Up @@ -518,7 +576,6 @@ int shmem_transport_ofi_stx_search_shared(long threshold)
}



static inline
void shmem_transport_ofi_stx_allocate(shmem_transport_ctx_t *ctx)
{
Expand Down Expand Up @@ -1996,10 +2053,6 @@ int shmem_transport_startup(void)
ret = populate_av();
if (ret != 0) return ret;

if (shmem_internal_params.PROGRESS_INTERVAL > 0)
pthread_create(&shmem_transport_ofi_progress_thread, NULL,
&shmem_transport_ofi_progress_thread_func, NULL);

return 0;
}

Expand All @@ -2022,8 +2075,9 @@ int shmem_transport_ctx_create(struct shmem_internal_team_t *team, long options,
id = team->contexts_len;

size_t i = team->contexts_len;
team->contexts = realloc(team->contexts, (i + shmem_transport_ofi_grow_size) *
sizeof(shmem_transport_ctx_t*));
team->contexts_len += shmem_transport_ofi_grow_size;
team->contexts = realloc(team->contexts, team->contexts_len * sizeof(shmem_transport_ctx_t*));

if (team->contexts == NULL) {
RAISE_ERROR_STR("Out of memory when allocating OFI ctx array");
Expand Down
11 changes: 9 additions & 2 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,26 @@ extern struct fid_ep* shmem_transport_ofi_target_ep;
shmem_free_list_unlock(ctx->bounce_buffers); \
} while (0)

void shmem_transport_progress_thread_init(void);
void shmem_transport_progress_thread_fini(void);
void shmem_transport_full_probe(void);

static inline
void shmem_transport_probe(void)
{
#if defined(ENABLE_MANUAL_PROGRESS) /* FIXME */
#if defined(ENABLE_MANUAL_PROGRESS)
# ifdef USE_THREAD_COMPLETION
if (0 == pthread_mutex_trylock(&shmem_transport_ofi_progress_lock)) {
# endif
struct fi_cq_entry buf;
int ret = fi_cq_read(shmem_transport_ofi_target_cq, &buf, 1);
if (ret == 1)
RAISE_WARN_STR("Unexpected event");
# ifdef USE_THREAD_COMPLETION
pthread_mutex_unlock(&shmem_transport_ofi_progress_lock);
}
# endif
#endif

return;
}

Expand Down

0 comments on commit 10bf4ca

Please sign in to comment.