diff --git a/source/adios2/toolkit/sst/dp/daos_dp.c b/source/adios2/toolkit/sst/dp/daos_dp.c index d97bcffaa6..703955898b 100644 --- a/source/adios2/toolkit/sst/dp/daos_dp.c +++ b/source/adios2/toolkit/sst/dp/daos_dp.c @@ -10,8 +10,8 @@ #include "sst_data.h" #include "adios2/toolkit/profiling/taustubs/taustubs.h" -#include "dp_interface.h" #include "daos.h" +#include "dp_interface.h" /* * Some conventions: @@ -49,7 +49,8 @@ * plane would replace one or both of these with RDMA functionality. */ -typedef struct _Daos_RS_Stream { +typedef struct _Daos_RS_Stream +{ CManager cm; void *CP_Stream; CMFormat ReadRequestFormat; @@ -65,35 +66,39 @@ typedef struct _Daos_RS_Stream { /* queued timestep info */ struct _RSTimestepEntry *QueuedTimesteps; -} *Daos_RS_Stream; +} * Daos_RS_Stream; -typedef struct _Daos_WSR_Stream { +typedef struct _Daos_WSR_Stream +{ struct _Daos_WS_Stream *WS_Stream; CP_PeerCohort PeerCohort; int ReaderCohortSize; char *ReaderRequests; struct _DaosReaderContactInfo *ReaderContactInfo; - struct _DaosWriterContactInfo * - WriterContactInfo; /* included so we can free on destroy */ -} *Daos_WSR_Stream; + struct _DaosWriterContactInfo + *WriterContactInfo; /* included so we can free on destroy */ +} * Daos_WSR_Stream; -typedef struct _TimestepEntry { +typedef struct _TimestepEntry +{ long Timestep; struct _SstData Data; struct _DaosPerTimestepInfo *DP_TimestepInfo; struct _TimestepEntry *Next; -} *TimestepList; +} * TimestepList; -typedef struct _RSTimestepEntry { +typedef struct _RSTimestepEntry +{ long Timestep; int WriterRank; char *Data; long DataSize; long DataStart; struct _RSTimestepEntry *Next; -} *RSTimestepList; +} * RSTimestepList; -typedef struct _Daos_WS_Stream { +typedef struct _Daos_WS_Stream +{ CManager cm; void *CP_Stream; int Rank; @@ -104,20 +109,23 @@ typedef struct _Daos_WS_Stream { int ReaderCount; Daos_WSR_Stream *Readers; -} *Daos_WS_Stream; +} * Daos_WS_Stream; -typedef struct _DaosReaderContactInfo { +typedef struct _DaosReaderContactInfo +{ char *ContactString; CMConnection Conn; void *RS_Stream; -} *DaosReaderContactInfo; +} * DaosReaderContactInfo; -typedef struct _DaosWriterContactInfo { +typedef struct _DaosWriterContactInfo +{ char *ContactString; void *WS_Stream; -} *DaosWriterContactInfo; +} * DaosWriterContactInfo; -typedef struct _DaosReadRequestMsg { +typedef struct _DaosReadRequestMsg +{ long Timestep; size_t Offset; size_t Length; @@ -125,56 +133,53 @@ typedef struct _DaosReadRequestMsg { void *RS_Stream; int RequestingRank; int NotifyCondition; -} *DaosReadRequestMsg; - -static FMField DaosReadRequestList[] = - { { "Timestep", "integer", - sizeof(long), FMOffset(DaosReadRequestMsg, Timestep) }, - { "Offset", "integer", - sizeof(size_t), FMOffset(DaosReadRequestMsg, Offset) }, - { "Length", "integer", - sizeof(size_t), FMOffset(DaosReadRequestMsg, Length) }, - { "WS_Stream", "integer", - sizeof(void *), FMOffset(DaosReadRequestMsg, WS_Stream) }, - { "RS_Stream", "integer", - sizeof(void *), FMOffset(DaosReadRequestMsg, RS_Stream) }, - { "RequestingRank", "integer", - sizeof(int), FMOffset(DaosReadRequestMsg, RequestingRank) }, - { "NotifyCondition", "integer", - sizeof(int), FMOffset(DaosReadRequestMsg, NotifyCondition) }, - { NULL, NULL, 0, 0 } }; +} * DaosReadRequestMsg; + +static FMField DaosReadRequestList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(DaosReadRequestMsg, Timestep)}, + {"Offset", "integer", sizeof(size_t), FMOffset(DaosReadRequestMsg, Offset)}, + {"Length", "integer", sizeof(size_t), FMOffset(DaosReadRequestMsg, Length)}, + {"WS_Stream", "integer", sizeof(void *), + FMOffset(DaosReadRequestMsg, WS_Stream)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(DaosReadRequestMsg, RS_Stream)}, + {"RequestingRank", "integer", sizeof(int), + FMOffset(DaosReadRequestMsg, RequestingRank)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(DaosReadRequestMsg, NotifyCondition)}, + {NULL, NULL, 0, 0}}; static FMStructDescRec DaosReadRequestStructs[] = { - { "DaosReadRequest", DaosReadRequestList, - sizeof(struct _DaosReadRequestMsg), NULL }, - { NULL, NULL, 0, NULL } -}; + {"DaosReadRequest", DaosReadRequestList, sizeof(struct _DaosReadRequestMsg), + NULL}, + {NULL, NULL, 0, NULL}}; -typedef struct _DaosReadReplyMsg { +typedef struct _DaosReadReplyMsg +{ long Timestep; size_t DataLength; void *RS_Stream; char *Data; int NotifyCondition; -} *DaosReadReplyMsg; - -static FMField DaosReadReplyList[] = - { { "Timestep", "integer", - sizeof(long), FMOffset(DaosReadReplyMsg, Timestep) }, - { "RS_Stream", "integer", - sizeof(void *), FMOffset(DaosReadReplyMsg, RS_Stream) }, - { "DataLength", "integer", - sizeof(size_t), FMOffset(DaosReadReplyMsg, DataLength) }, - { "Data", "char[DataLength]", - sizeof(char), FMOffset(DaosReadReplyMsg, Data) }, - { "NotifyCondition", "integer", - sizeof(int), FMOffset(DaosReadReplyMsg, NotifyCondition) }, - { NULL, NULL, 0, 0 } }; - -static FMStructDescRec DaosReadReplyStructs[] = - { { "DaosReadReply", DaosReadReplyList, - sizeof(struct _DaosReadReplyMsg), NULL }, - { NULL, NULL, 0, NULL } }; +} * DaosReadReplyMsg; + +static FMField DaosReadReplyList[] = { + {"Timestep", "integer", sizeof(long), FMOffset(DaosReadReplyMsg, Timestep)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(DaosReadReplyMsg, RS_Stream)}, + {"DataLength", "integer", sizeof(size_t), + FMOffset(DaosReadReplyMsg, DataLength)}, + {"Data", "char[DataLength]", sizeof(char), + FMOffset(DaosReadReplyMsg, Data)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(DaosReadReplyMsg, NotifyCondition)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec DaosReadReplyStructs[] = { + {"DaosReadReply", DaosReadReplyList, sizeof(struct _DaosReadReplyMsg), + NULL}, + {NULL, NULL, 0, NULL}}; static void DaosReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data, attr_list attrs); @@ -183,7 +188,8 @@ static DP_RS_Stream DaosInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, struct _SstParams *Params, attr_list WriterContactAttributes, - SstStats Stats) { + SstStats Stats) +{ Daos_RS_Stream Stream = malloc(sizeof(struct _Daos_RS_Stream)); DaosReaderContactInfo Contact = malloc(sizeof(struct _DaosReaderContactInfo)); @@ -209,10 +215,13 @@ static DP_RS_Stream DaosInitReader(CP_Services Svcs, void *CP_Stream, set_string_attr(ListenAttrs, attr_atom_from_string("CM_TRANSPORT"), "sockets"); - if (Params->DataInterface) { + if (Params->DataInterface) + { set_string_attr(ListenAttrs, attr_atom_from_string("IP_INTERFACE"), strdup(Params->DataInterface)); - } else if (Params->NetworkInterface) { + } + else if (Params->NetworkInterface) + { set_string_attr(ListenAttrs, attr_atom_from_string("IP_INTERFACE"), strdup(Params->NetworkInterface)); } @@ -230,13 +239,15 @@ static DP_RS_Stream DaosInitReader(CP_Services Svcs, void *CP_Stream, return Stream; } -static void DaosDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) { +static void DaosDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) +{ Daos_RS_Stream RS_Stream = (Daos_RS_Stream)RS_Stream_v; free(RS_Stream); } static void DaosReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, - void *client_Data, attr_list attrs) { + void *client_Data, attr_list attrs) +{ TAU_START_FUNC(); DaosReadRequestMsg ReadRequestMsg = (DaosReadRequestMsg)msg_v; Daos_WSR_Stream WSR_Stream = ReadRequestMsg->WS_Stream; @@ -252,8 +263,10 @@ static void DaosReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, "offset %d, length %d\n", RequestingRank, ReadRequestMsg->Timestep, ReadRequestMsg->Offset, ReadRequestMsg->Length); - while (tmp != NULL) { - if (tmp->Timestep == ReadRequestMsg->Timestep) { + while (tmp != NULL) + { + if (tmp->Timestep == ReadRequestMsg->Timestep) + { struct _DaosReadReplyMsg ReadReplyMsg; /* memset avoids uninit byte warnings from valgrind */ memset(&ReadReplyMsg, 0, sizeof(ReadReplyMsg)); @@ -266,7 +279,8 @@ static void DaosReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, WS_Stream->CP_Stream, DPTraceVerbose, "Sending a reply to reader rank %d for remote memory read\n", RequestingRank); - if (!WSR_Stream->ReaderContactInfo[RequestingRank].Conn) { + if (!WSR_Stream->ReaderContactInfo[RequestingRank].Conn) + { attr_list List = attr_list_from_string( WSR_Stream->ReaderContactInfo[RequestingRank] .ContactString); @@ -296,7 +310,8 @@ static void DaosReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, TAU_STOP_FUNC(); } -typedef struct _DaosCompletionHandle { +typedef struct _DaosCompletionHandle +{ int CMcondition; CManager cm; void *CPStream; @@ -305,17 +320,19 @@ typedef struct _DaosCompletionHandle { int Failed; int Rank; struct _DaosCompletionHandle *Next; -} *DaosCompletionHandle; +} * DaosCompletionHandle; static void DaosReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, - void *client_Data, attr_list attrs) { + void *client_Data, attr_list attrs) +{ TAU_START_FUNC(); DaosReadReplyMsg ReadReplyMsg = (DaosReadReplyMsg)msg_v; Daos_RS_Stream RS_Stream = ReadReplyMsg->RS_Stream; CP_Services Svcs = (CP_Services)client_Data; DaosCompletionHandle Handle = NULL; - if (CMCondition_has_signaled(cm, ReadReplyMsg->NotifyCondition)) { + if (CMCondition_has_signaled(cm, ReadReplyMsg->NotifyCondition)) + { Svcs->verbose(RS_Stream->CP_Stream, DPTraceVerbose, "Got a reply to remote memory " "read, but the condition is " @@ -325,7 +342,8 @@ static void DaosReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, } Handle = CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); - if (!Handle) { + if (!Handle) + { Svcs->verbose( RS_Stream->CP_Stream, DPTraceVerbose, "Got a reply to remote memory read, but condition not found\n"); @@ -353,7 +371,8 @@ static void DaosReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, static DP_WS_Stream DaosInitWriter(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params, attr_list DPAttrs, - SstStats Stats) { + SstStats Stats) +{ Daos_WS_Stream Stream = malloc(sizeof(struct _Daos_WS_Stream)); CManager cm = Svcs->getCManager(CP_Stream); SMPI_Comm comm = Svcs->getMPIComm(CP_Stream); @@ -377,11 +396,14 @@ static DP_WS_Stream DaosInitWriter(CP_Services Svcs, void *CP_Stream, return (void *)Stream; } -static void DaosDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) { +static void DaosDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) +{ Daos_WS_Stream WS_Stream = (Daos_WS_Stream)WS_Stream_v; // nvs_finalize_(WS_Stream->nvs); - for (int i = 0; i < WS_Stream->ReaderCount; i++) { - if (WS_Stream->Readers[i]) { + for (int i = 0; i < WS_Stream->ReaderCount; i++) + { + if (WS_Stream->Readers[i]) + { free(WS_Stream->Readers[i]->WriterContactInfo->ContactString); free(WS_Stream->Readers[i]->WriterContactInfo); free(WS_Stream->Readers[i]->ReaderContactInfo->ContactString); @@ -401,7 +423,8 @@ static DP_WSR_Stream DaosInitWriterPerReader(CP_Services Svcs, int readerCohortSize, CP_PeerCohort PeerCohort, void **providedReaderInfo_v, - void **WriterContactInfoPtr) { + void **WriterContactInfoPtr) +{ Daos_WS_Stream WS_Stream = (Daos_WS_Stream)WS_Stream_v; Daos_WSR_Stream WSR_Stream = malloc(sizeof(*WSR_Stream)); DaosWriterContactInfo ContactInfo; @@ -422,7 +445,8 @@ static DP_WSR_Stream DaosInitWriterPerReader(CP_Services Svcs, WSR_Stream->ReaderCohortSize = readerCohortSize; WSR_Stream->ReaderContactInfo = malloc(sizeof(struct _DaosReaderContactInfo) * readerCohortSize); - for (int i = 0; i < readerCohortSize; i++) { + for (int i = 0; i < readerCohortSize; i++) + { WSR_Stream->ReaderContactInfo[i].ContactString = NULL; WSR_Stream->ReaderContactInfo[i].Conn = NULL; WSR_Stream->ReaderContactInfo[i].RS_Stream = @@ -455,7 +479,8 @@ static DP_WSR_Stream DaosInitWriterPerReader(CP_Services Svcs, } static void DaosDestroyWriterPerReader(CP_Services Svcs, - DP_WSR_Stream WSR_Stream_v) { + DP_WSR_Stream WSR_Stream_v) +{ Daos_WSR_Stream WSR_Stream = (Daos_WSR_Stream)WSR_Stream_v; free(WSR_Stream); } @@ -464,7 +489,8 @@ static void DaosProvideWriterDataToReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v, int writerCohortSize, CP_PeerCohort PeerCohort, - void **providedWriterInfo_v) { + void **providedWriterInfo_v) +{ Daos_RS_Stream RS_Stream = (Daos_RS_Stream)RS_Stream_v; DaosWriterContactInfo *providedWriterInfo = (DaosWriterContactInfo *)providedWriterInfo_v; @@ -478,7 +504,8 @@ static void DaosProvideWriterDataToReader(CP_Services Svcs, */ RS_Stream->WriterContactInfo = malloc(sizeof(struct _DaosWriterContactInfo) * writerCohortSize); - for (int i = 0; i < writerCohortSize; i++) { + for (int i = 0; i < writerCohortSize; i++) + { RS_Stream->WriterContactInfo[i].ContactString = strdup(providedWriterInfo[i]->ContactString); RS_Stream->WriterContactInfo[i].WS_Stream = @@ -490,28 +517,33 @@ static void DaosProvideWriterDataToReader(CP_Services Svcs, RS_Stream->WriterContactInfo[i].WS_Stream, i); } RS_Stream->writer_nvs = malloc(sizeof(void *) * writerCohortSize); - for (int i = 0; i < writerCohortSize; i++) { + for (int i = 0; i < writerCohortSize; i++) + { RS_Stream->writer_nvs[i] = NULL; // nvs_open_store(RS_Stream->WriterContactInfo[i].ContactString); } } static void AddRequestToList(CP_Services Svcs, Daos_RS_Stream Stream, - DaosCompletionHandle Handle) { + DaosCompletionHandle Handle) +{ Handle->Next = Stream->PendingReadRequests; Stream->PendingReadRequests = Handle; } static void RemoveRequestFromList(CP_Services Svcs, Daos_RS_Stream Stream, - DaosCompletionHandle Handle) { + DaosCompletionHandle Handle) +{ DaosCompletionHandle Tmp = Stream->PendingReadRequests; - if (Stream->PendingReadRequests == Handle) { + if (Stream->PendingReadRequests == Handle) + { Stream->PendingReadRequests = Handle->Next; return; } - while (Tmp != NULL && Tmp->Next != Handle) { + while (Tmp != NULL && Tmp->Next != Handle) + { Tmp = Tmp->Next; } @@ -523,12 +555,15 @@ static void RemoveRequestFromList(CP_Services Svcs, Daos_RS_Stream Stream, } static void FailRequestsToRank(CP_Services Svcs, CManager cm, - Daos_RS_Stream Stream, int FailedRank) { + Daos_RS_Stream Stream, int FailedRank) +{ DaosCompletionHandle Tmp = Stream->PendingReadRequests; Svcs->verbose(Stream->CP_Stream, DPTraceVerbose, "Fail pending requests to writer rank %d\n", FailedRank); - while (Tmp != NULL) { - if (Tmp->Rank == FailedRank) { + while (Tmp != NULL) + { + if (Tmp->Rank == FailedRank) + { Tmp->Failed = 1; Svcs->verbose(Tmp->CPStream, DPTraceVerbose, "Found a pending remote memory read " @@ -546,15 +581,17 @@ static void FailRequestsToRank(CP_Services Svcs, CManager cm, "Done Failing requests to writer rank %d\n", FailedRank); } -typedef struct _DaosPerTimestepInfo { +typedef struct _DaosPerTimestepInfo +{ char *CheckString; int CheckInt; -} *DaosPerTimestepInfo; +} * DaosPerTimestepInfo; static void *DaosReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, int Rank, long Timestep, size_t Offset, size_t Length, void *Buffer, - void *DP_TimestepInfo) { + void *DP_TimestepInfo) +{ Daos_RS_Stream Stream = (Daos_RS_Stream) Stream_v; /* DP_RS_Stream is the return from InitReader */ CManager cm = Svcs->getCManager(Stream->CP_Stream); @@ -587,15 +624,19 @@ static void *DaosReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, BaseAddr = NULL; // nvs_get_with_malloc(Stream->writer_nvs[Rank], StringName, 1); - if (BaseAddr) { + if (BaseAddr) + { memcpy(Buffer, BaseAddr + Offset, Length); - } else { + } + else + { fprintf(stderr, "remote memory read failed\n"); } return ret; } -static int DaosWaitForCompletion(CP_Services Svcs, void *Handle_v) { +static int DaosWaitForCompletion(CP_Services Svcs, void *Handle_v) +{ DaosCompletionHandle Handle = (DaosCompletionHandle)Handle_v; int Ret = 1; Svcs->verbose( @@ -609,14 +650,17 @@ static int DaosWaitForCompletion(CP_Services Svcs, void *Handle_v) { */ if (Handle->CMcondition != -1) CMCondition_wait(Handle->cm, Handle->CMcondition); - if (Handle->Failed) { + if (Handle->Failed) + { Svcs->verbose(Handle->CPStream, DPTraceVerbose, "Remote memory read to rank %d with " "condition %d has FAILED because of " "writer failure\n", Handle->Rank, Handle->CMcondition); Ret = 0; - } else { + } + else + { Svcs->verbose( Handle->CPStream, DPTraceVerbose, "Remote memory read to rank %d with condition %d has completed\n", @@ -628,7 +672,8 @@ static int DaosWaitForCompletion(CP_Services Svcs, void *Handle_v) { } static void DaosNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, - int FailedPeerRank) { + int FailedPeerRank) +{ Daos_RS_Stream Stream = (Daos_RS_Stream) Stream_v; /* DP_RS_Stream is the return from InitReader */ CManager cm = Svcs->getCManager(Stream->CP_Stream); @@ -643,7 +688,8 @@ static void DaosNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, static void DaosProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, struct _SstData *Data, struct _SstData *LocalMetadata, long Timestep, - void **TimestepInfoPtr) { + void **TimestepInfoPtr) +{ Daos_WS_Stream Stream = (Daos_WS_Stream)Stream_v; TimestepList Entry = malloc(sizeof(struct _TimestepEntry)); struct _DaosPerTimestepInfo *Info = NULL; @@ -677,24 +723,30 @@ static void DaosProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, } static void DaosReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, - long Timestep) { + long Timestep) +{ Daos_WS_Stream Stream = (Daos_WS_Stream)Stream_v; TimestepList List = Stream->Timesteps; Svcs->verbose(Stream->CP_Stream, DPPerRankVerbose, "Releasing timestep %ld\n", Timestep); - if (Stream->Timesteps->Timestep == Timestep) { + if (Stream->Timesteps->Timestep == Timestep) + { Stream->Timesteps = List->Next; if (List->DP_TimestepInfo && List->DP_TimestepInfo->CheckString) free(List->DP_TimestepInfo->CheckString); if (List->DP_TimestepInfo) free(List->DP_TimestepInfo); free(List); - } else { + } + else + { TimestepList last = List; List = List->Next; - while (List != NULL) { - if (List->Timestep == Timestep) { + while (List != NULL) + { + if (List->Timestep == Timestep) + { last->Next = List->Next; if (List->DP_TimestepInfo && List->DP_TimestepInfo->CheckString) free(List->DP_TimestepInfo->CheckString); @@ -717,56 +769,52 @@ static void DaosReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, } static FMField DaosReaderContactList[] = { - { "ContactString", "string", - sizeof(char *), FMOffset(DaosReaderContactInfo, ContactString) }, - { "reader_ID", "integer", - sizeof(void *), FMOffset(DaosReaderContactInfo, RS_Stream) }, - { NULL, NULL, 0, 0 } -}; + {"ContactString", "string", sizeof(char *), + FMOffset(DaosReaderContactInfo, ContactString)}, + {"reader_ID", "integer", sizeof(void *), + FMOffset(DaosReaderContactInfo, RS_Stream)}, + {NULL, NULL, 0, 0}}; static FMStructDescRec DaosReaderContactStructs[] = { - { "DaosReaderContactInfo", DaosReaderContactList, - sizeof(struct _DaosReaderContactInfo), NULL }, - { NULL, NULL, 0, NULL } -}; + {"DaosReaderContactInfo", DaosReaderContactList, + sizeof(struct _DaosReaderContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; static FMField DaosWriterContactList[] = { - { "ContactString", "string", - sizeof(char *), FMOffset(DaosWriterContactInfo, ContactString) }, - { "writer_ID", "integer", - sizeof(void *), FMOffset(DaosWriterContactInfo, WS_Stream) }, - { NULL, NULL, 0, 0 } -}; + {"ContactString", "string", sizeof(char *), + FMOffset(DaosWriterContactInfo, ContactString)}, + {"writer_ID", "integer", sizeof(void *), + FMOffset(DaosWriterContactInfo, WS_Stream)}, + {NULL, NULL, 0, 0}}; static FMStructDescRec DaosWriterContactStructs[] = { - { "DaosWriterContactInfo", DaosWriterContactList, - sizeof(struct _DaosWriterContactInfo), NULL }, - { NULL, NULL, 0, NULL } -}; + {"DaosWriterContactInfo", DaosWriterContactList, + sizeof(struct _DaosWriterContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; static FMField DaosTimestepInfoList[] = { - { "CheckString", "string", - sizeof(char *), FMOffset(DaosPerTimestepInfo, CheckString) }, - { "CheckInt", "integer", - sizeof(void *), FMOffset(DaosPerTimestepInfo, CheckInt) }, - { NULL, NULL, 0, 0 } -}; + {"CheckString", "string", sizeof(char *), + FMOffset(DaosPerTimestepInfo, CheckString)}, + {"CheckInt", "integer", sizeof(void *), + FMOffset(DaosPerTimestepInfo, CheckInt)}, + {NULL, NULL, 0, 0}}; static FMStructDescRec DaosTimestepInfoStructs[] = { - { "DaosTimestepInfo", DaosTimestepInfoList, - sizeof(struct _DaosPerTimestepInfo), NULL }, - { NULL, NULL, 0, NULL } -}; + {"DaosTimestepInfo", DaosTimestepInfoList, + sizeof(struct _DaosPerTimestepInfo), NULL}, + {NULL, NULL, 0, NULL}}; static struct _CP_DP_Interface daosDPInterface; static int DaosGetPriority(CP_Services Svcs, void *CP_Stream, - struct _SstParams *Params) { + struct _SstParams *Params) +{ /* The daos DP priority 10 */ return 10; } -extern CP_DP_Interface LoadDaosDP() { +extern CP_DP_Interface LoadDaosDP() +{ memset(&daosDPInterface, 0, sizeof(daosDPInterface)); daosDPInterface.ReaderContactFormats = DaosReaderContactStructs; daosDPInterface.WriterContactFormats = DaosWriterContactStructs; diff --git a/source/adios2/toolkit/sst/dp/dp.c b/source/adios2/toolkit/sst/dp/dp.c index 09b7dbbcab..bb77cf87e3 100644 --- a/source/adios2/toolkit/sst/dp/dp.c +++ b/source/adios2/toolkit/sst/dp/dp.c @@ -18,22 +18,28 @@ extern CP_DP_Interface LoadDaosDP(); #endif /* SST_HAVE_LIBFABRIC */ extern CP_DP_Interface LoadEVpathDP(); -typedef struct _DPElement { +typedef struct _DPElement +{ const char *Name; CP_DP_Interface Interface; long Priority; -} *DPlist; +} * DPlist; static DPlist AddDPPossibility(CP_Services Svcs, void *CP_Stream, DPlist List, CP_DP_Interface Interface, const char *Name, - struct _SstParams *Params) { + struct _SstParams *Params) +{ int Count = 0; if (Interface == NULL) return List; - if (List == NULL) { + if (List == NULL) + { List = malloc(2 * sizeof(*List)); - } else { - while (List[Count].Interface) { + } + else + { + while (List[Count].Interface) + { Count++; } List = realloc(List, sizeof(*List) * (Count + 2)); @@ -46,7 +52,8 @@ static DPlist AddDPPossibility(CP_Services Svcs, void *CP_Stream, DPlist List, } CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, - struct _SstParams *Params, int Rank) { + struct _SstParams *Params, int Rank) +{ CP_DP_Interface Ret; DPlist List = NULL; List = AddDPPossibility(Svcs, CP_Stream, List, LoadEVpathDP(), "evpath", @@ -66,25 +73,32 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, int BestPrioDP = -1; int i = 0; int FoundPreferred = 0; - if (Params->DataTransport) { + if (Params->DataTransport) + { if (Rank == 0) Svcs->verbose(CP_Stream, DPPerStepVerbose, "Prefered dataplane name is \"%s\"\n", Params->DataTransport); } - while (List[i].Interface) { + while (List[i].Interface) + { if (Rank == 0) Svcs->verbose(CP_Stream, DPPerStepVerbose, "Considering DataPlane \"%s\" for possible use, " "priority is %d\n", List[i].Name, List[i].Priority); - if (Params->DataTransport) { - if (strcasecmp(List[i].Name, Params->DataTransport) == 0) { + if (Params->DataTransport) + { + if (strcasecmp(List[i].Name, Params->DataTransport) == 0) + { FoundPreferred = 1; - if (List[i].Priority >= 0) { + if (List[i].Priority >= 0) + { SelectedDP = i; break; - } else { + } + else + { if (Rank == 0) fprintf(stderr, "Warning: Perferred DataPlane \"%s\" is " @@ -93,23 +107,28 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, } } } - if (List[i].Priority > BestPriority) { + if (List[i].Priority > BestPriority) + { BestPriority = List[i].Priority; BestPrioDP = i; } i++; } - if (Params->DataTransport && (FoundPreferred == 0)) { + if (Params->DataTransport && (FoundPreferred == 0)) + { if (Rank == 0) fprintf(stderr, "Warning: Preferred DataPlane \"%s\" not found.\n", Params->DataTransport); } - if (SelectedDP != -1) { + if (SelectedDP != -1) + { if (Rank == 0) Svcs->verbose(CP_Stream, DPSummaryVerbose, "Selecting DataPlane \"%s\" (preferred) for use\n", List[SelectedDP].Name); - } else { + } + else + { if (Rank == 0) Svcs->verbose(CP_Stream, DPSummaryVerbose, "Selecting DataPlane \"%s\", priority %d for use\n", @@ -117,16 +136,20 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, SelectedDP = BestPrioDP; } i = 0; - while (List[i].Interface) { - if (i != SelectedDP) { - if (List[i].Interface->unGetPriority) { + while (List[i].Interface) + { + if (i != SelectedDP) + { + if (List[i].Interface->unGetPriority) + { List[i].Interface->unGetPriority(Svcs, CP_Stream); } } i++; } - if (Params->DataTransport) { + if (Params->DataTransport) + { free(Params->DataTransport); } Params->DataTransport = strdup(List[SelectedDP].Name); diff --git a/source/adios2/toolkit/transport/file/FileDaos.cpp b/source/adios2/toolkit/transport/file/FileDaos.cpp index 7ea5bc09fe..6bf2a09f62 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.cpp +++ b/source/adios2/toolkit/transport/file/FileDaos.cpp @@ -23,21 +23,29 @@ #include #include -namespace adios2 { -namespace transport { +namespace adios2 +{ +namespace transport +{ -FileDaos::FileDaos(helper::Comm const &comm) -: Transport("File", "Daos", comm) {} +FileDaos::FileDaos(helper::Comm const &comm) : Transport("File", "Daos", comm) +{ +} -FileDaos::~FileDaos() { - if (m_IsOpen) { +FileDaos::~FileDaos() +{ + if (m_IsOpen) + { close(m_FileDescriptor); } } -void FileDaos::WaitForOpen() { - if (m_IsOpening) { - if (m_OpenFuture.valid()) { +void FileDaos::WaitForOpen() +{ + if (m_IsOpening) + { + if (m_OpenFuture.valid()) + { m_FileDescriptor = m_OpenFuture.get(); } m_IsOpening = false; @@ -47,8 +55,9 @@ void FileDaos::WaitForOpen() { } void FileDaos::Open(const std::string &name, const Mode openMode, - const bool async) { - auto lf_AsyncOpenWrite = [&](const std::string & name)->int { + const bool async) +{ + auto lf_AsyncOpenWrite = [&](const std::string &name) -> int { dfs_obj_t *obj; ProfilerStart("open"); errno = 0; @@ -63,14 +72,18 @@ void FileDaos::Open(const std::string &name, const Mode openMode, m_Name = name; CheckName(); m_OpenMode = openMode; - switch (m_OpenMode) { + switch (m_OpenMode) + { - case(Mode::Write) : - if (async) { + case (Mode::Write): + if (async) + { m_IsOpening = true; m_OpenFuture = std::async(std::launch::async, lf_AsyncOpenWrite, name); - } else { + } + else + { ProfilerStart("open"); errno = 0; m_FileDescriptor = @@ -80,7 +93,7 @@ void FileDaos::Open(const std::string &name, const Mode openMode, } break; - case(Mode::Append) : + case (Mode::Append): ProfilerStart("open"); errno = 0; // m_FileDescriptor = open(m_Name.c_str(), O_RDWR); @@ -90,7 +103,7 @@ void FileDaos::Open(const std::string &name, const Mode openMode, ProfilerStop("open"); break; - case(Mode::Read) : + case (Mode::Read): ProfilerStart("open"); errno = 0; m_FileDescriptor = open(m_Name.c_str(), O_RDONLY); @@ -103,23 +116,28 @@ void FileDaos::Open(const std::string &name, const Mode openMode, ", in call to Daos open"); } - if (!m_IsOpening) { + if (!m_IsOpening) + { CheckFile("couldn't open file " + m_Name + ", in call to Daos open"); m_IsOpen = true; } } -void FileDaos::Write(const char *buffer, size_t size, size_t start) { +void FileDaos::Write(const char *buffer, size_t size, size_t start) +{ auto lf_Write = [&](const char *buffer, size_t size) { - while (size > 0) { + while (size > 0) + { ProfilerStart("write"); errno = 0; const auto writtenSize = write(m_FileDescriptor, buffer, size); m_Errno = errno; ProfilerStop("write"); - if (writtenSize == -1) { - if (errno == EINTR) { + if (writtenSize == -1) + { + if (errno == EINTR) + { continue; } @@ -134,12 +152,14 @@ void FileDaos::Write(const char *buffer, size_t size, size_t start) { }; WaitForOpen(); - if (start != MaxSizeT) { + if (start != MaxSizeT) + { errno = 0; const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET); m_Errno = errno; - if (static_cast(newPosition) != start) { + if (static_cast(newPosition) != start) + { throw std::ios_base::failure( "ERROR: couldn't move to start position " + std::to_string(start) + " in file " + m_Name + @@ -147,32 +167,40 @@ void FileDaos::Write(const char *buffer, size_t size, size_t start) { } } - if (size > DefaultMaxFileBatchSize) { + if (size > DefaultMaxFileBatchSize) + { const size_t batches = size / DefaultMaxFileBatchSize; const size_t remainder = size % DefaultMaxFileBatchSize; size_t position = 0; - for (size_t b = 0; b < batches; ++b) { + for (size_t b = 0; b < batches; ++b) + { lf_Write(&buffer[position], DefaultMaxFileBatchSize); position += DefaultMaxFileBatchSize; } lf_Write(&buffer[position], remainder); - } else { + } + else + { lf_Write(buffer, size); } } -void FileDaos::Read(char *buffer, size_t size, size_t start) { +void FileDaos::Read(char *buffer, size_t size, size_t start) +{ auto lf_Read = [&](char *buffer, size_t size) { - while (size > 0) { + while (size > 0) + { ProfilerStart("read"); errno = 0; const auto readSize = read(m_FileDescriptor, buffer, size); m_Errno = errno; ProfilerStop("read"); - if (readSize == -1) { - if (errno == EINTR) { + if (readSize == -1) + { + if (errno == EINTR) + { continue; } @@ -188,12 +216,14 @@ void FileDaos::Read(char *buffer, size_t size, size_t start) { WaitForOpen(); - if (start != MaxSizeT) { + if (start != MaxSizeT) + { errno = 0; const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET); m_Errno = errno; - if (static_cast(newPosition) != start) { + if (static_cast(newPosition) != start) + { throw std::ios_base::failure( "ERROR: couldn't move to start position " + std::to_string(start) + " in file " + m_Name + @@ -201,26 +231,32 @@ void FileDaos::Read(char *buffer, size_t size, size_t start) { } } - if (size > DefaultMaxFileBatchSize) { + if (size > DefaultMaxFileBatchSize) + { const size_t batches = size / DefaultMaxFileBatchSize; const size_t remainder = size % DefaultMaxFileBatchSize; size_t position = 0; - for (size_t b = 0; b < batches; ++b) { + for (size_t b = 0; b < batches; ++b) + { lf_Read(&buffer[position], DefaultMaxFileBatchSize); position += DefaultMaxFileBatchSize; } lf_Read(&buffer[position], remainder); - } else { + } + else + { lf_Read(buffer, size); } } -size_t FileDaos::GetSize() { +size_t FileDaos::GetSize() +{ struct stat fileStat; WaitForOpen(); errno = 0; - if (fstat(m_FileDescriptor, &fileStat) == -1) { + if (fstat(m_FileDescriptor, &fileStat) == -1) + { m_Errno = errno; throw std::ios_base::failure("ERROR: couldn't get size of file " + m_Name + SysErrMsg()); @@ -231,7 +267,8 @@ size_t FileDaos::GetSize() { void FileDaos::Flush() {} -void FileDaos::Close() { +void FileDaos::Close() +{ WaitForOpen(); ProfilerStart("close"); errno = 0; @@ -239,7 +276,8 @@ void FileDaos::Close() { m_Errno = errno; ProfilerStop("close"); - if (status == -1) { + if (status == -1) + { throw std::ios_base::failure("ERROR: couldn't close file " + m_Name + ", in call to Daos IO close" + SysErrMsg()); @@ -248,43 +286,52 @@ void FileDaos::Close() { m_IsOpen = false; } -void FileDaos::Delete() { +void FileDaos::Delete() +{ WaitForOpen(); - if (m_IsOpen) { + if (m_IsOpen) + { Close(); } std::remove(m_Name.c_str()); } -void FileDaos::CheckFile(const std::string hint) const { - if (m_FileDescriptor == -1) { +void FileDaos::CheckFile(const std::string hint) const +{ + if (m_FileDescriptor == -1) + { throw std::ios_base::failure("ERROR: " + hint + SysErrMsg()); } } -std::string FileDaos::SysErrMsg() const { +std::string FileDaos::SysErrMsg() const +{ return std::string(": errno = " + std::to_string(m_Errno) + ": " + strerror(m_Errno)); } -void FileDaos::SeekToEnd() { +void FileDaos::SeekToEnd() +{ WaitForOpen(); errno = 0; const int status = lseek(m_FileDescriptor, 0, SEEK_END); m_Errno = 0; - if (status == -1) { + if (status == -1) + { throw std::ios_base::failure( "ERROR: couldn't seek to the end of file " + m_Name + ", in call to Daos IO lseek" + SysErrMsg()); } } -void FileDaos::SeekToBegin() { +void FileDaos::SeekToBegin() +{ WaitForOpen(); errno = 0; const int status = lseek(m_FileDescriptor, 0, SEEK_SET); m_Errno = errno; - if (status == -1) { + if (status == -1) + { throw std::ios_base::failure( "ERROR: couldn't seek to the begin of file " + m_Name + ", in call to Daos IO lseek" + SysErrMsg()); diff --git a/source/adios2/toolkit/transport/file/FileDaos.h b/source/adios2/toolkit/transport/file/FileDaos.h index cd288bdcdb..a38d8eae6f 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.h +++ b/source/adios2/toolkit/transport/file/FileDaos.h @@ -14,14 +14,18 @@ #include "adios2/common/ADIOSConfig.h" #include "adios2/toolkit/transport/Transport.h" -namespace adios2 { -namespace helper { +namespace adios2 +{ +namespace helper +{ class Comm; } -namespace transport { +namespace transport +{ /** File descriptor transport using the Daos IO library */ -class FileDaos : public Transport { +class FileDaos : public Transport +{ public: FileDaos(helper::Comm const &comm); diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index 822a330060..d2b1a4ec32 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -34,24 +34,30 @@ #include "adios2/toolkit/transport/file/FileStdio.h" #include "adios2/toolkit/transport/null/NullTransport.h" -namespace adios2 { -namespace transportman { +namespace adios2 +{ +namespace transportman +{ TransportMan::TransportMan(helper::Comm &comm) : m_Comm(comm) {} void TransportMan::MkDirsBarrier(const std::vector &fileNames, const std::vector ¶metersVector, - const bool nodeLocal) { + const bool nodeLocal) +{ auto lf_CreateDirectories = [&](const std::vector &fileNames) { - for (size_t i = 0; i < fileNames.size(); ++i) { + for (size_t i = 0; i < fileNames.size(); ++i) + { const auto lastPathSeparator( fileNames[i].find_last_of(PathSeparator)); - if (lastPathSeparator == std::string::npos) { + if (lastPathSeparator == std::string::npos) + { continue; } const Params ¶meters = parametersVector[i]; const std::string type = parameters.at("transport"); - if (type == "File" || type == "file") { + if (type == "File" || type == "file") + { const std::string path( fileNames[i].substr(0, lastPathSeparator)); helper::CreateDirectory(path); @@ -59,11 +65,15 @@ void TransportMan::MkDirsBarrier(const std::vector &fileNames, } }; - if (nodeLocal) { + if (nodeLocal) + { lf_CreateDirectories(fileNames); - } else { + } + else + { int rank = m_Comm.Rank(); - if (rank == 0) { + if (rank == 0) + { lf_CreateDirectories(fileNames); } @@ -74,39 +84,46 @@ void TransportMan::MkDirsBarrier(const std::vector &fileNames, void TransportMan::OpenFiles(const std::vector &fileNames, const Mode openMode, const std::vector ¶metersVector, - const bool profile) { - for (size_t i = 0; i < fileNames.size(); ++i) { + const bool profile) +{ + for (size_t i = 0; i < fileNames.size(); ++i) + { const Params ¶meters = parametersVector[i]; const std::string type = parameters.at("transport"); - if (type == "File" || type == "file") { + if (type == "File" || type == "file") + { std::shared_ptr file = OpenFileTransport(fileNames[i], openMode, parameters, profile); - m_Transports.insert({ i, file }); + m_Transports.insert({i, file}); } } } void TransportMan::OpenFileID(const std::string &name, const size_t id, const Mode mode, const Params ¶meters, - const bool profile) { + const bool profile) +{ std::shared_ptr file = OpenFileTransport(name, mode, parameters, profile); - m_Transports.insert({ id, file }); + m_Transports.insert({id, file}); } std::vector TransportMan::GetFilesBaseNames( const std::string &baseName, - const std::vector ¶metersVector) const { - if (parametersVector.size() <= 1) { - return { baseName }; + const std::vector ¶metersVector) const +{ + if (parametersVector.size() <= 1) + { + return {baseName}; } std::map> typeTransportNames; std::vector baseNames; baseNames.reserve(parametersVector.size()); - for (const auto ¶meters : parametersVector) { + for (const auto ¶meters : parametersVector) + { // Get transport name from user std::string name(baseName); helper::SetParameterValue("Name", parameters, name); // if found in map @@ -115,13 +132,16 @@ std::vector TransportMan::GetFilesBaseNames( auto itType = typeTransportNames.find(type); // check if name exists for this transport type - if (itType != typeTransportNames.end()) { - if (itType->second.count(name) == 1) { + if (itType != typeTransportNames.end()) + { + if (itType->second.count(name) == 1) + { throw std::invalid_argument( "ERROR: two IO AddTransport of the same type can't " "have the same name : " + - name + ", use Name=value parameter, in " - "call to Open"); + name + + ", use Name=value parameter, in " + "call to Open"); } } typeTransportNames[type].insert(name); @@ -130,11 +150,13 @@ std::vector TransportMan::GetFilesBaseNames( return baseNames; } -std::vector TransportMan::GetTransportsTypes() noexcept { +std::vector TransportMan::GetTransportsTypes() noexcept +{ std::vector types; types.reserve(m_Transports.size()); - for (const auto &transportPair : m_Transports) { + for (const auto &transportPair : m_Transports) + { const std::shared_ptr &transport = transportPair.second; types.push_back(transport->m_Type + "_" + transport->m_Library); } @@ -142,11 +164,13 @@ std::vector TransportMan::GetTransportsTypes() noexcept { } std::vector -TransportMan::GetTransportsProfilers() noexcept { +TransportMan::GetTransportsProfilers() noexcept +{ std::vector profilers; profilers.reserve(m_Transports.size()); - for (const auto &transportPair : m_Transports) { + for (const auto &transportPair : m_Transports) + { const auto &transport = transportPair.second; profilers.push_back(&transport->m_Profiler); } @@ -154,16 +178,22 @@ TransportMan::GetTransportsProfilers() noexcept { } void TransportMan::WriteFiles(const char *buffer, const size_t size, - const int transportIndex) { - if (transportIndex == -1) { - for (auto &transportPair : m_Transports) { + const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { auto &transport = transportPair.second; - if (transport->m_Type == "File") { + if (transport->m_Type == "File") + { // make this truly asynch? transport->Write(buffer, size); } } - } else { + } + else + { auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to WriteFiles with index " + std::to_string(transportIndex)); @@ -172,15 +202,21 @@ void TransportMan::WriteFiles(const char *buffer, const size_t size, } void TransportMan::WriteFileAt(const char *buffer, const size_t size, - const size_t start, const int transportIndex) { - if (transportIndex == -1) { - for (auto &transportPair : m_Transports) { + const size_t start, const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { auto &transport = transportPair.second; - if (transport->m_Type == "File") { + if (transport->m_Type == "File") + { transport->Write(buffer, size, start); } } - } else { + } + else + { auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to WriteFileAt with index " + std::to_string(transportIndex)); @@ -188,15 +224,21 @@ void TransportMan::WriteFileAt(const char *buffer, const size_t size, } } -void TransportMan::SeekToFileEnd(const int transportIndex) { - if (transportIndex == -1) { - for (auto &transportPair : m_Transports) { +void TransportMan::SeekToFileEnd(const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { auto &transport = transportPair.second; - if (transport->m_Type == "File") { + if (transport->m_Type == "File") + { transport->SeekToEnd(); } } - } else { + } + else + { auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to SeekToFileEnd with index " + std::to_string(transportIndex)); @@ -204,15 +246,21 @@ void TransportMan::SeekToFileEnd(const int transportIndex) { } } -void TransportMan::SeekToFileBegin(const int transportIndex) { - if (transportIndex == -1) { - for (auto &transportPair : m_Transports) { +void TransportMan::SeekToFileBegin(const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { auto &transport = transportPair.second; - if (transport->m_Type == "File") { + if (transport->m_Type == "File") + { transport->SeekToBegin(); } } - } else { + } + else + { auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to SeekToFileBegin with index " + std::to_string(transportIndex)); @@ -220,7 +268,8 @@ void TransportMan::SeekToFileBegin(const int transportIndex) { } } -size_t TransportMan::GetFileSize(const size_t transportIndex) const { +size_t TransportMan::GetFileSize(const size_t transportIndex) const +{ auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to GetFileSize with index " + std::to_string(transportIndex)); @@ -228,23 +277,30 @@ size_t TransportMan::GetFileSize(const size_t transportIndex) const { } void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start, - const size_t transportIndex) { + const size_t transportIndex) +{ auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to ReadFile with index " + std::to_string(transportIndex)); itTransport->second->Read(buffer, size, start); } -void TransportMan::FlushFiles(const int transportIndex) { - if (transportIndex == -1) { - for (auto &transportPair : m_Transports) { +void TransportMan::FlushFiles(const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { auto &transport = transportPair.second; - if (transport->m_Type == "File") { + if (transport->m_Type == "File") + { transport->Flush(); } } - } else { + } + else + { auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to FlushFiles with index " + std::to_string(transportIndex)); @@ -252,16 +308,22 @@ void TransportMan::FlushFiles(const int transportIndex) { } } -void TransportMan::CloseFiles(const int transportIndex) { - if (transportIndex == -1) { - for (auto &transportPair : m_Transports) { +void TransportMan::CloseFiles(const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { auto &transport = transportPair.second; - if (transport->m_Type == "File") { + if (transport->m_Type == "File") + { transport->Close(); } } - } else { + } + else + { auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to CloseFiles with index " + std::to_string(transportIndex)); @@ -269,16 +331,22 @@ void TransportMan::CloseFiles(const int transportIndex) { } } -void TransportMan::DeleteFiles(const int transportIndex) { - if (transportIndex == -1) { - for (auto &transportPair : m_Transports) { +void TransportMan::DeleteFiles(const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { auto &transport = transportPair.second; - if (transport->m_Type == "File") { + if (transport->m_Type == "File") + { transport->Delete(); } } - } else { + } + else + { auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to CloseFiles with index " + std::to_string(transportIndex)); @@ -286,12 +354,15 @@ void TransportMan::DeleteFiles(const int transportIndex) { } } -bool TransportMan::AllTransportsClosed() const noexcept { +bool TransportMan::AllTransportsClosed() const noexcept +{ bool allClose = true; - for (const auto &transportPair : m_Transports) { + for (const auto &transportPair : m_Transports) + { const auto &transport = transportPair.second; - if (transport->m_IsOpen) { + if (transport->m_IsOpen) + { allClose = false; break; } @@ -300,15 +371,18 @@ bool TransportMan::AllTransportsClosed() const noexcept { } bool TransportMan::FileExists(const std::string &name, const Params ¶meters, - const bool profile) { + const bool profile) +{ bool exists = false; - try { + try + { std::shared_ptr file = OpenFileTransport(name, Mode::Read, parameters, profile); exists = true; file->Close(); } - catch (std::ios_base::failure &) { + catch (std::ios_base::failure &) + { } return exists; } @@ -317,15 +391,17 @@ bool TransportMan::FileExists(const std::string &name, const Params ¶meters, std::shared_ptr TransportMan::OpenFileTransport(const std::string &fileName, const Mode openMode, const Params ¶meters, - const bool profile) { - auto lf_GetBuffered = [&](const std::string bufferedDefault)->bool { + const bool profile) +{ + auto lf_GetBuffered = [&](const std::string bufferedDefault) -> bool { bool bufferedValue; std::string bufferedValueStr(bufferedDefault); helper::SetParameterValue("Buffered", parameters, bufferedValueStr); helper::SetParameterValue("buffered", parameters, bufferedValueStr); { std::stringstream ss(bufferedValueStr); - if (!(ss >> std::boolalpha >> bufferedValue)) { + if (!(ss >> std::boolalpha >> bufferedValue)) + { throw std::invalid_argument( "ERROR: invalid value for \"buffered\" transport " "parameter: " + @@ -337,21 +413,28 @@ TransportMan::OpenFileTransport(const std::string &fileName, auto lf_SetFileTransport = [&](const std::string library, std::shared_ptr &transport) { - if (library == "stdio") { + if (library == "stdio") + { transport = std::make_shared(m_Comm); - if (!lf_GetBuffered("true")) { + if (!lf_GetBuffered("true")) + { transport->SetBuffer(nullptr, 0); } - } else if (library == "fstream") { + } + else if (library == "fstream") + { transport = std::make_shared(m_Comm); - if (!lf_GetBuffered("true")) { + if (!lf_GetBuffered("true")) + { transport->SetBuffer(nullptr, 0); } } #ifndef _WIN32 - else if (library == "POSIX" || library == "posix") { + else if (library == "POSIX" || library == "posix") + { transport = std::make_shared(m_Comm); - if (lf_GetBuffered("false")) { + if (lf_GetBuffered("false")) + { throw std::invalid_argument( "ERROR: " + library + " transport does not support buffered I/O."); @@ -359,9 +442,11 @@ TransportMan::OpenFileTransport(const std::string &fileName, } #endif #ifdef ADIOS2_HAVE_DAOS - else if (library == "Daos" || library == "daos") { + else if (library == "Daos" || library == "daos") + { transport = std::make_shared(m_Comm); - if (lf_GetBuffered("false")) { + if (lf_GetBuffered("false")) + { throw std::invalid_argument( "ERROR: " + library + " transport does not support buffered I/O."); @@ -369,25 +454,30 @@ TransportMan::OpenFileTransport(const std::string &fileName, } #endif #ifdef ADIOS2_HAVE_IME - else if (library == "IME" || library == "ime") { + else if (library == "IME" || library == "ime") + { transport = std::make_shared(m_Comm); } -#endif - else if (library == "NULL" || library == "null") { +#endif + else if (library == "NULL" || library == "null") + { transport = std::make_shared(m_Comm); - if (lf_GetBuffered("false")) { + if (lf_GetBuffered("false")) + { throw std::invalid_argument( "ERROR: " + library + " transport does not support buffered I/O."); } - } else { + } + else + { throw std::invalid_argument( "ERROR: invalid IO AddTransport library " + library); } }; auto lf_GetLibrary = [](const std::string defaultLibrary, - const Params & parameters)->std::string { + const Params ¶meters) -> std::string { std::string library(defaultLibrary); helper::SetParameterValue("Library", parameters, library); helper::SetParameterValue("library", parameters, library); @@ -395,7 +485,7 @@ TransportMan::OpenFileTransport(const std::string &fileName, }; auto lf_GetTimeUnits = [&](const std::string defaultTimeUnit, - const Params & parameters)->TimeUnit { + const Params ¶meters) -> TimeUnit { std::string profileUnits(defaultTimeUnit); helper::SetParameterValue("ProfileUnits", parameters, profileUnits); helper::SetParameterValue("profileunits", parameters, profileUnits); @@ -403,7 +493,7 @@ TransportMan::OpenFileTransport(const std::string &fileName, }; auto lf_GetAsync = [&](const std::string defaultAsync, - const Params & parameters)->bool { + const Params ¶meters) -> bool { std::string Async = defaultAsync; helper::SetParameterValue("AsyncTasks", parameters, Async); helper::SetParameterValue("asynctasks", parameters, Async); @@ -416,7 +506,8 @@ TransportMan::OpenFileTransport(const std::string &fileName, transport); // Default or user ProfileUnits in parameters - if (profile) { + if (profile) + { transport->InitProfiler(openMode, lf_GetTimeUnits(DefaultTimeUnit, parameters)); } @@ -431,12 +522,15 @@ TransportMan::OpenFileTransport(const std::string &fileName, void TransportMan::CheckFile( std::unordered_map>::const_iterator itTransport, - const std::string hint) const { - if (itTransport == m_Transports.end()) { + const std::string hint) const +{ + if (itTransport == m_Transports.end()) + { throw std::invalid_argument("ERROR: invalid transport " + hint + "\n"); } - if (itTransport->second->m_Type != "File") { + if (itTransport->second->m_Type != "File") + { throw std::invalid_argument("ERROR: invalid type " + itTransport->second->m_Library + ", must be file " + hint + "\n");