Skip to content

Commit

Permalink
Replace sleeping with condition variables
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Jan 23, 2024
1 parent 175e6b1 commit 0b771b6
Showing 1 changed file with 23 additions and 36 deletions.
59 changes: 23 additions & 36 deletions source/adios2/toolkit/sst/dp/rdma_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ struct cq_event_list
struct cq_manual_progress
{
struct fid_cq *cq_signal;
pthread_mutex_t cq_event_list_mutex;

struct cq_event_list *cq_event_list;
pthread_mutex_t cq_event_list_mutex;
pthread_cond_t cq_even_list_signal;
char cq_event_list_filled;

CP_Services Svcs;
void *Stream;
Expand All @@ -156,46 +159,43 @@ void cq_manual_progress_push(struct cq_manual_progress *self, struct cq_event_li
}
head->next = item;
}
self->cq_event_list_filled = 1;
pthread_mutex_unlock(&self->cq_event_list_mutex);
pthread_cond_signal(&self->cq_even_list_signal);
}

struct fi_cq_data_entry *cq_manual_progress_pop(struct cq_manual_progress *self)
{
struct fi_cq_data_entry *res;
pthread_mutex_lock(&self->cq_event_list_mutex);
if (!self->cq_event_list)
while (!self->cq_event_list_filled)
{
res = NULL;
}
else
{
struct cq_event_list *head = self->cq_event_list;
res = head->value;
self->cq_event_list = head->next;
free(head);
pthread_cond_wait(&self->cq_even_list_signal, &self->cq_event_list_mutex);
}
assert(self->cq_event_list);
struct cq_event_list *head = self->cq_event_list;
res = head->value;
self->cq_event_list = head->next;
self->cq_event_list_filled = self->cq_event_list ? 1 : 0;
pthread_mutex_unlock(&self->cq_event_list_mutex);
free(head);
return res;
}

#define SST_BACKOFF_SECONDS_MAX 5

static void *make_progress(void *params_)
{
struct cq_manual_progress *params = (struct cq_manual_progress *)params_;
size_t const batch_size = 100;
struct fi_cq_data_entry *CQEntries = malloc(batch_size * sizeof(struct fi_cq_data_entry));
struct fi_cq_data_entry CQEntries[batch_size];

while (params->do_continue)
{
/*
* The main purpose of this worker thread is to make repeated blocking calls to the blocking
* fi_cq_sread() with a timeout of 5 seconds. Some providers don't make progress in a timely
* fashion otherwise (e.g. shm).
* fi_cq_sread(). Some providers don't make progress in a timely fashion otherwise (e.g.
* shm).
*/
printf("Going into fi_cq_sread()\n");
ssize_t rc = fi_cq_sread(params->cq_signal, (void *)CQEntries, batch_size, NULL, -1);
printf("fi_cq_sread()=%ld\n", rc);
if (rc < 1)
{
struct fi_cq_err_entry error = {.err = 0};
Expand All @@ -222,8 +222,6 @@ static void *make_progress(void *params_)
}
}
}
free(CQEntries);
printf("Returning from thread\n");
return NULL;
}

Expand Down Expand Up @@ -259,22 +257,10 @@ struct fabric_state
void cq_read(struct fabric_state *fabric, struct fi_cq_data_entry *CQEntry)
{
unsigned int current_backoff_seconds = 0;
while (1)
{
struct fi_cq_data_entry *res = cq_manual_progress_pop(fabric->cq_manual_progress);
if (res == NULL)
{
sleep(current_backoff_seconds);
if(current_backoff_seconds < SST_BACKOFF_SECONDS_MAX)
{
++current_backoff_seconds;
}
continue;
}
memcpy(CQEntry, res, sizeof(struct fi_cq_data_entry));
free(res);
return;
}
struct fi_cq_data_entry *res = cq_manual_progress_pop(fabric->cq_manual_progress);
memcpy(CQEntry, res, sizeof(struct fi_cq_data_entry));
free(res);
return;
}

/*
Expand Down Expand Up @@ -670,9 +656,11 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params,
return;
}
manual_progress->cq_event_list = NULL;
manual_progress->cq_event_list_filled = 0;
manual_progress->Svcs = Svcs;
manual_progress->Stream = CP_Stream;
manual_progress->do_continue = 1;
pthread_cond_init(&manual_progress->cq_even_list_signal, NULL);

fabric->cq_manual_progress = manual_progress;

Expand Down Expand Up @@ -1559,7 +1547,6 @@ static DP_WSR_Stream RdmaInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_S
&WSR_Stream->rrmr, Fabric->ctx, Fabric->signal,
Fabric->info->domain_attr->mr_mode);
ReaderRollHandle->Key = fi_mr_key(WSR_Stream->rrmr);
printf("Key: %lu\n", ReaderRollHandle->Key);

WSR_Stream->WriterContactInfo = ContactInfo;

Expand Down

0 comments on commit 0b771b6

Please sign in to comment.