Skip to content

Commit

Permalink
Only use thread in libfabric DP when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Jul 22, 2024
1 parent a17ab8d commit c57a7b2
Showing 1 changed file with 70 additions and 30 deletions.
100 changes: 70 additions & 30 deletions source/adios2/toolkit/sst/dp/rdma_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,32 @@ struct fabric_state
pthread_t pthread_id;
};

void cq_read(struct fabric_state *fabric, struct fi_cq_data_entry *CQEntry)
void cq_read(struct fabric_state *fabric, struct fi_cq_data_entry *CQEntry, CP_Services Svcs,
void *Stream)
{
unsigned int current_backoff_seconds = 0;
struct fi_cq_data_entry *res = cq_manual_progress_pop(fabric->cq_manual_progress);
memcpy(CQEntry, res, sizeof(struct fi_cq_data_entry));
free(res);
return;
if (fabric->cq_manual_progress)
{
struct fi_cq_data_entry *res = cq_manual_progress_pop(fabric->cq_manual_progress);
memcpy(CQEntry, res, sizeof(struct fi_cq_data_entry));
free(res);
}
else
{
ssize_t rc = fi_cq_sread(fabric->cq_signal, (void *)CQEntry, 1, NULL, -1);
if (rc < 1)
{
struct fi_cq_err_entry error = {.err = 0};
fi_cq_readerr(fabric->cq_signal, &error, 0);
if (error.err != -FI_SUCCESS)
{
Svcs->verbose(
Stream, DPCriticalVerbose,
"[PullSelection] no completion event (%d (%s - %s)).\n", rc,
fi_strerror(error.err),
fi_cq_strerror(fabric->cq_signal, error.err, error.err_data, NULL, error.len));
}
}
}
}

/*
Expand Down Expand Up @@ -643,32 +662,45 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params,
return;
}

fi_freeinfo(originfo);

fabric->cq_manual_progress = NULL;

struct cq_manual_progress *manual_progress = malloc(sizeof(struct cq_manual_progress));

manual_progress->cq_signal = fabric->cq_signal;
if (pthread_mutex_init(&manual_progress->cq_event_list_mutex, NULL) != 0)
if (info->domain_attr->data_progress == FI_PROGRESS_MANUAL)
{
Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not init mutex.\n");
return;
}
manual_progress->cq_event_list = NULL;
manual_progress->cq_event_list_filled = 0;
manual_progress->Svcs = Svcs;
manual_progress->Stream = CP_Stream;
manual_progress->do_continue = 1;
pthread_cond_init(&manual_progress->cq_even_list_signal, NULL);
Svcs->verbose(
CP_Stream, DPTraceVerbose,
"Using a separate thread to comply with the fabric's manual progress preference.\n");

fabric->cq_manual_progress = manual_progress;
struct cq_manual_progress *manual_progress = malloc(sizeof(struct cq_manual_progress));

if (pthread_create(&fabric->pthread_id, NULL, &make_progress, fabric->cq_manual_progress) != 0)
manual_progress->cq_signal = fabric->cq_signal;
if (pthread_mutex_init(&manual_progress->cq_event_list_mutex, NULL) != 0)
{
Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not init mutex.\n");
return;
}
manual_progress->cq_event_list = NULL;
manual_progress->cq_event_list_filled = 0;
manual_progress->Svcs = Svcs;
manual_progress->Stream = CP_Stream;
manual_progress->do_continue = 1;
pthread_cond_init(&manual_progress->cq_even_list_signal, NULL);

fabric->cq_manual_progress = manual_progress;

if (pthread_create(&fabric->pthread_id, NULL, &make_progress, fabric->cq_manual_progress) !=
0)
{
Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not start thread.\n");
return;
}
}
else
{
Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not start thread.\n");
return;
Svcs->verbose(CP_Stream, DPTraceVerbose,
"Using the fabric's automatic progress capability.\n");
}

fi_freeinfo(originfo);
}

static void fini_fabric(struct fabric_state *fabric, CP_Services Svcs, void *CP_Stream)
Expand Down Expand Up @@ -1860,7 +1892,7 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream, RdmaCompletionHan

while (Handle->Pending > 0)
{
cq_read(Fabric, &CQEntry);
cq_read(Fabric, &CQEntry, Svcs, Stream);
if (CQEntry.flags & FI_REMOTE_CQ_DATA)
{
BufferSlot = CQEntry.data >> 31;
Expand Down Expand Up @@ -1926,7 +1958,7 @@ static int WaitForAnyPull(CP_Services Svcs, Rdma_RS_Stream Stream)
RdmaCompletionHandle Handle_t;
struct fi_cq_data_entry CQEntry = {0};

cq_read(Fabric, &CQEntry);
cq_read(Fabric, &CQEntry, Svcs, Stream);
{
Svcs->verbose(Stream->CP_Stream, DPTraceVerbose,
"got completion for request with handle %p (flags %li).\n",
Expand Down Expand Up @@ -2301,6 +2333,14 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams

ifname = get_preferred_domain(Params);

char const *provider_name = NULL;
if ((provider_name = getenv("FABRIC_PROVIDER")))
{
size_t len = strlen(provider_name);
hints->fabric_attr->prov_name = malloc(len + 1);
memcpy(hints->fabric_attr->prov_name, provider_name, len + 1);
}

forkunsafe = getenv("FI_FORK_UNSAFE");
if (!forkunsafe)
{
Expand Down Expand Up @@ -2588,7 +2628,7 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep)

while (WRidx > 0)
{
cq_read(Fabric, &CQEntry);
cq_read(Fabric, &CQEntry, Svcs, Stream);
CQBuffer = CQEntry.op_context;
if (CQBuffer >= SendBuffer && CQBuffer < (SendBuffer + StepLog->WRanks))
{
Expand Down Expand Up @@ -2715,7 +2755,7 @@ static void PullSelection(CP_Services Svcs, Rdma_WSR_Stream Stream)
RankReq = Stream->PreloadReq;
while (RankReq)
{
cq_read(Fabric, &CQEntry);
cq_read(Fabric, &CQEntry, Svcs, Stream);
CQRankReq = CQEntry.op_context;
if (CQEntry.flags & FI_READ)
{
Expand Down Expand Up @@ -2747,7 +2787,7 @@ static void CompletePush(CP_Services Svcs, Rdma_WSR_Stream Stream, TimestepList

while (Step->OutstandingWrites > 0)
{
cq_read(Fabric, &CQEntry);
cq_read(Fabric, &CQEntry, Svcs, Stream);
if (CQEntry.flags & FI_WRITE)
{
CQTimestep = (long)CQEntry.op_context;
Expand Down

0 comments on commit c57a7b2

Please sign in to comment.