diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index 9491c451d6..af680daba9 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -254,13 +254,32 @@ struct fabric_state pthread_t pthread_id; }; -void cq_read(struct fabric_state *fabric, struct fi_cq_data_entry *CQEntry) +void cq_read(struct fabric_state *fabric, struct fi_cq_data_entry *CQEntry, CP_Services Svcs, + void *Stream) { - unsigned int current_backoff_seconds = 0; - struct fi_cq_data_entry *res = cq_manual_progress_pop(fabric->cq_manual_progress); - memcpy(CQEntry, res, sizeof(struct fi_cq_data_entry)); - free(res); - return; + if (fabric->cq_manual_progress) + { + struct fi_cq_data_entry *res = cq_manual_progress_pop(fabric->cq_manual_progress); + memcpy(CQEntry, res, sizeof(struct fi_cq_data_entry)); + free(res); + } + else + { + ssize_t rc = fi_cq_sread(fabric->cq_signal, (void *)CQEntry, 1, NULL, -1); + if (rc < 1) + { + struct fi_cq_err_entry error = {.err = 0}; + fi_cq_readerr(fabric->cq_signal, &error, 0); + if (error.err != -FI_SUCCESS) + { + Svcs->verbose( + Stream, DPCriticalVerbose, + "[PullSelection] no completion event (%d (%s - %s)).\n", rc, + fi_strerror(error.err), + fi_cq_strerror(fabric->cq_signal, error.err, error.err_data, NULL, error.len)); + } + } + } } /* @@ -643,32 +662,45 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, return; } - fi_freeinfo(originfo); - fabric->cq_manual_progress = NULL; - struct cq_manual_progress *manual_progress = malloc(sizeof(struct cq_manual_progress)); - - manual_progress->cq_signal = fabric->cq_signal; - if (pthread_mutex_init(&manual_progress->cq_event_list_mutex, NULL) != 0) + if (info->domain_attr->data_progress == FI_PROGRESS_MANUAL) { - Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not init mutex.\n"); - return; - } - manual_progress->cq_event_list = NULL; - manual_progress->cq_event_list_filled = 0; - manual_progress->Svcs = Svcs; - manual_progress->Stream = CP_Stream; - manual_progress->do_continue = 1; - pthread_cond_init(&manual_progress->cq_even_list_signal, NULL); + Svcs->verbose( + CP_Stream, DPTraceVerbose, + "Using a separate thread to comply with the fabric's manual progress preference.\n"); - fabric->cq_manual_progress = manual_progress; + struct cq_manual_progress *manual_progress = malloc(sizeof(struct cq_manual_progress)); - if (pthread_create(&fabric->pthread_id, NULL, &make_progress, fabric->cq_manual_progress) != 0) + manual_progress->cq_signal = fabric->cq_signal; + if (pthread_mutex_init(&manual_progress->cq_event_list_mutex, NULL) != 0) + { + Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not init mutex.\n"); + return; + } + manual_progress->cq_event_list = NULL; + manual_progress->cq_event_list_filled = 0; + manual_progress->Svcs = Svcs; + manual_progress->Stream = CP_Stream; + manual_progress->do_continue = 1; + pthread_cond_init(&manual_progress->cq_even_list_signal, NULL); + + fabric->cq_manual_progress = manual_progress; + + if (pthread_create(&fabric->pthread_id, NULL, &make_progress, fabric->cq_manual_progress) != + 0) + { + Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not start thread.\n"); + return; + } + } + else { - Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not start thread.\n"); - return; + Svcs->verbose(CP_Stream, DPTraceVerbose, + "Using the fabric's automatic progress capability.\n"); } + + fi_freeinfo(originfo); } static void fini_fabric(struct fabric_state *fabric, CP_Services Svcs, void *CP_Stream) @@ -1860,7 +1892,7 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream, RdmaCompletionHan while (Handle->Pending > 0) { - cq_read(Fabric, &CQEntry); + cq_read(Fabric, &CQEntry, Svcs, Stream); if (CQEntry.flags & FI_REMOTE_CQ_DATA) { BufferSlot = CQEntry.data >> 31; @@ -1926,7 +1958,7 @@ static int WaitForAnyPull(CP_Services Svcs, Rdma_RS_Stream Stream) RdmaCompletionHandle Handle_t; struct fi_cq_data_entry CQEntry = {0}; - cq_read(Fabric, &CQEntry); + cq_read(Fabric, &CQEntry, Svcs, Stream); { Svcs->verbose(Stream->CP_Stream, DPTraceVerbose, "got completion for request with handle %p (flags %li).\n", @@ -2301,6 +2333,14 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams ifname = get_preferred_domain(Params); + char const *provider_name = NULL; + if ((provider_name = getenv("FABRIC_PROVIDER"))) + { + size_t len = strlen(provider_name); + hints->fabric_attr->prov_name = malloc(len + 1); + memcpy(hints->fabric_attr->prov_name, provider_name, len + 1); + } + forkunsafe = getenv("FI_FORK_UNSAFE"); if (!forkunsafe) { @@ -2588,7 +2628,7 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep) while (WRidx > 0) { - cq_read(Fabric, &CQEntry); + cq_read(Fabric, &CQEntry, Svcs, Stream); CQBuffer = CQEntry.op_context; if (CQBuffer >= SendBuffer && CQBuffer < (SendBuffer + StepLog->WRanks)) { @@ -2715,7 +2755,7 @@ static void PullSelection(CP_Services Svcs, Rdma_WSR_Stream Stream) RankReq = Stream->PreloadReq; while (RankReq) { - cq_read(Fabric, &CQEntry); + cq_read(Fabric, &CQEntry, Svcs, Stream); CQRankReq = CQEntry.op_context; if (CQEntry.flags & FI_READ) { @@ -2747,7 +2787,7 @@ static void CompletePush(CP_Services Svcs, Rdma_WSR_Stream Stream, TimestepList while (Step->OutstandingWrites > 0) { - cq_read(Fabric, &CQEntry); + cq_read(Fabric, &CQEntry, Svcs, Stream); if (CQEntry.flags & FI_WRITE) { CQTimestep = (long)CQEntry.op_context;