From a52efb7ce5c7e9b7f9064ab46ac9ce0dfdd50914 Mon Sep 17 00:00:00 2001 From: atl Upstream Date: Thu, 11 Jun 2020 13:08:15 -0400 Subject: [PATCH 1/4] atl 2020-06-11 (7b0202fe) Code extracted from: https://github.com/GTkorvo/atl.git at commit 7b0202fea5ea4900d6ae455821765f89b8a94289 (master). Upstream Shortlog ----------------- --- attr.c | 49 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/attr.c b/attr.c index b862d45d35..3d243e25f9 100644 --- a/attr.c +++ b/attr.c @@ -102,6 +102,35 @@ atl_lock_func global_as_lock = NULL; atl_lock_func global_as_unlock = NULL; void* global_as_lock_data = NULL; + +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) +#define NO_SANITIZE_THREAD __attribute__((no_sanitize("thread"))) +#endif +#endif + +#ifndef NO_SANITIZE_THREAD +#define NO_SANITIZE_THREAD +#endif + +extern void NO_SANITIZE_THREAD +atl_install_mutex_funcs(atl_lock_func lock, atl_lock_func unlock, void *client_data) +{ + global_as_lock = lock; + global_as_unlock = unlock; + global_as_lock_data = client_data; +} + +static void NO_SANITIZE_THREAD atl_lock() +{ + if (global_as_lock) (global_as_lock)(global_as_lock_data); +} + +static void NO_SANITIZE_THREAD atl_unlock() +{ + if (global_as_unlock) (global_as_unlock)(global_as_lock_data); +} + int get_pattr(attr_list list,int index, atom_t *name, attr_value_type *val_type, attr_union *value); @@ -1023,7 +1052,9 @@ extern void fdump_attr_list(void *file, attr_list list) { FILE *out = (FILE*)file; + atl_lock(); init_global_atom_server(&global_as); + atl_unlock(); fprintf(out, "Attribute list %p, ref_count = %d\n", list, list->ref_count); internal_dump_attr_list(out, list, 0); } @@ -1036,7 +1067,9 @@ extern int get_attr_id(attr_list list, int item_no, atom_t *item) { + atl_lock(); init_global_atom_server(&global_as); + atl_unlock(); if (item_no < 0) return 0; @@ -1142,23 +1175,15 @@ attr_list_from_string(const char * str) return ret_val; } -extern void -atl_install_mutex_funcs(atl_lock_func lock, atl_lock_func unlock, void *client_data) -{ - global_as_lock = lock; - global_as_unlock = unlock; - global_as_lock_data = client_data; -} - extern atom_t attr_atom_from_string(const char *str) { atom_t tmp; + atl_lock(); init_global_atom_server(&global_as); - if (global_as_lock) (global_as_lock)(global_as_lock_data); tmp = atom_from_string(global_as, (char*)str); - if (global_as_unlock) (global_as_unlock)(global_as_lock_data); + atl_unlock(); return tmp; } @@ -1167,10 +1192,10 @@ char * attr_string_from_atom(atom_t atom) { char *tmp; + atl_lock(); init_global_atom_server(&global_as); - if (global_as_lock) (global_as_lock)(global_as_lock_data); tmp = string_from_atom(global_as, atom); - if (global_as_unlock) (global_as_unlock)(global_as_lock_data); + atl_unlock(); return tmp; } From bdf1225799846facb28f186374214611ad1b1908 Mon Sep 17 00:00:00 2001 From: EVPath Upstream Date: Thu, 11 Jun 2020 14:19:41 -0400 Subject: [PATCH 2/4] EVPath 2020-06-11 (342508eb) Code extracted from: https://github.com/GTkorvo/EVPath.git at commit 342508eb5fc4894d8e1a8c898cdbf1bd7840e314 (master). Upstream Shortlog ----------------- --- cm.c | 9 +++++++++ cm_internal.h | 1 + evpath.h | 9 +++++++++ 3 files changed, 19 insertions(+) diff --git a/cm.c b/cm.c index 53f2db52a3..807c9f3f34 100644 --- a/cm.c +++ b/cm.c @@ -334,6 +334,15 @@ INT_CMget_contact_list(CManager cm) return (cm->contact_lists[0]); } +extern attr_list +INT_CMderef_and_copy_list(CManager cm, attr_list attrs) +{ + // done inside the CM lock, so a safe way to convert a shared list to an owned list + attr_list ret = attr_copy_list(attrs); + free_attr_list(attrs); + return ret; +} + extern attr_list INT_CMget_specific_contact_list(CManager cm, attr_list attrs) { diff --git a/cm_internal.h b/cm_internal.h index 351f400655..a4de74942e 100644 --- a/cm_internal.h +++ b/cm_internal.h @@ -415,6 +415,7 @@ void *INT_CMCondition_get_client_data(CManager cm, int condition); int INT_CMCondition_wait(CManager cm, int condition); extern void INT_CMCondition_fail(CManager cm, int condition); extern attr_list INT_CMget_contact_list(CManager cm); +extern attr_list INT_CMderef_and_copy_list(CManager cm, attr_list attrs); extern void INT_CMregister_non_CM_message_handler(int header, CMNonCMHandler handler); extern void *INT_CMtake_buffer(CManager cm, void *data); extern void INT_CMreturn_buffer(CManager cm, void *data); diff --git a/evpath.h b/evpath.h index dbd121b3ce..91ea50cad1 100644 --- a/evpath.h +++ b/evpath.h @@ -256,6 +256,15 @@ CM_insert_contact_info (CManager cm, attr_list attrs); extern attr_list CMget_specific_contact_list (CManager cm, attr_list attrs); +/*! + * get a thread-owned version of a shared attribute list in a thread-safe way + * \param cm the CManager which owns the attribute list. + * \param attrs the shared attribute list (from CMget_contact list, etc.) + * \return a single-owner contact list + */ +extern attr_list +CMderef_and_copy_list(CManager cm, attr_list attrs); + /*! * check to see if this is contact information for this CM. * From c1cf761c45af21fc27c11878beb8b5538de3dab5 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 16 Jun 2020 16:05:23 -0400 Subject: [PATCH 3/4] Tweaks and test for SST in threads --- source/adios2/toolkit/sst/cp/cp_common.c | 343 ++++++++++-------- source/adios2/toolkit/sst/cp/cp_internal.h | 41 ++- source/adios2/toolkit/sst/cp/cp_reader.c | 53 +-- source/adios2/toolkit/sst/cp/cp_writer.c | 55 ++- source/adios2/toolkit/sst/dp/evpath_dp.c | 19 +- source/adios2/toolkit/sst/dp/rdma_dp.c | 9 +- .../engine/staging-common/CMakeLists.txt | 3 + 7 files changed, 287 insertions(+), 236 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index 7fe5be1f1a..e796347b31 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -840,40 +840,29 @@ static void initAtomList() CM_ENET_CONN_TIMEOUT = attr_atom_from_string("CM_ENET_CONN_TIMEOUT"); } -static void AddCustomStruct(CP_GlobalInfo CPInfo, FMStructDescList Struct) +static void AddCustomStruct(CP_StructList *List, FMStructDescList Struct) { - CPInfo->CustomStructCount++; - CPInfo->CustomStructList = - realloc(CPInfo->CustomStructList, - sizeof(FMStructDescList) * CPInfo->CustomStructCount); - CPInfo->CustomStructList[CPInfo->CustomStructCount - 1] = Struct; + List->CustomStructCount++; + List->CustomStructList = + realloc(List->CustomStructList, + sizeof(FMStructDescList) * List->CustomStructCount); + List->CustomStructList[List->CustomStructCount - 1] = Struct; } -static void FreeCustomStructs(CP_GlobalInfo CPInfo) +static void FreeCustomStructs(CP_StructList *List) { - for (int i = 0; i < CPInfo->CustomStructCount; i++) + for (int i = 0; i < List->CustomStructCount; i++) { - FMfree_struct_list(CPInfo->CustomStructList[i]); + FMfree_struct_list(List->CustomStructList[i]); } - free(CPInfo->CustomStructList); + free(List->CustomStructList); } -static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) +static void doCMFormatRegistration(CP_GlobalCMInfo CPInfo, + CP_DP_Interface DPInfo) { - FMStructDescList PerRankReaderStructs, FullReaderRegisterStructs, - CombinedReaderStructs; - FMStructDescList PerRankWriterStructs, FullWriterResponseStructs, - CombinedWriterStructs; - FMStructDescList CombinedMetadataStructs, CombinedTimestepMetadataStructs; - FMFormat f; - - PerRankReaderStructs = combineCpDpFormats( - CP_DP_PairStructs, CP_ReaderInitStructs, DPInfo->ReaderContactFormats); - f = FMregister_data_format(CPInfo->fm_c, PerRankReaderStructs); - CPInfo->PerRankReaderInfoFormat = - FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); - FFSset_fixed_target(CPInfo->ffs_c, PerRankReaderStructs); - AddCustomStruct(CPInfo, PerRankReaderStructs); + FMStructDescList FullReaderRegisterStructs, FullWriterResponseStructs, + CombinedTimestepMetadataStructs; FullReaderRegisterStructs = combineCpDpFormats(CP_ReaderRegisterStructs, CP_ReaderInitStructs, @@ -882,7 +871,66 @@ static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) CMregister_format(CPInfo->cm, FullReaderRegisterStructs); CMregister_handler(CPInfo->ReaderRegisterFormat, CP_ReaderRegisterHandler, NULL); - AddCustomStruct(CPInfo, FullReaderRegisterStructs); + AddCustomStruct(&CPInfo->CustomStructs, FullReaderRegisterStructs); + + FullWriterResponseStructs = + combineCpDpFormats(CP_WriterResponseStructs, CP_WriterInitStructs, + DPInfo->WriterContactFormats); + CPInfo->WriterResponseFormat = + CMregister_format(CPInfo->cm, FullWriterResponseStructs); + CMregister_handler(CPInfo->WriterResponseFormat, CP_WriterResponseHandler, + NULL); + AddCustomStruct(&CPInfo->CustomStructs, FullWriterResponseStructs); + + CombinedTimestepMetadataStructs = combineCpDpFormats( + TimestepMetadataStructs, NULL, DPInfo->TimestepInfoFormats); + CPInfo->DeliverTimestepMetadataFormat = + CMregister_format(CPInfo->cm, CombinedTimestepMetadataStructs); + CMregister_handler(CPInfo->DeliverTimestepMetadataFormat, + CP_TimestepMetadataHandler, NULL); + AddCustomStruct(&CPInfo->CustomStructs, CombinedTimestepMetadataStructs); + + CPInfo->PeerSetupFormat = CMregister_format(CPInfo->cm, PeerSetupStructs); + CMregister_handler(CPInfo->PeerSetupFormat, CP_PeerSetupHandler, NULL); + + CPInfo->ReaderActivateFormat = + CMregister_format(CPInfo->cm, ReaderActivateStructs); + CMregister_handler(CPInfo->ReaderActivateFormat, CP_ReaderActivateHandler, + NULL); + CPInfo->ReleaseTimestepFormat = + CMregister_format(CPInfo->cm, ReleaseTimestepStructs); + CMregister_handler(CPInfo->ReleaseTimestepFormat, CP_ReleaseTimestepHandler, + NULL); + CPInfo->LockReaderDefinitionsFormat = + CMregister_format(CPInfo->cm, LockReaderDefinitionsStructs); + CMregister_handler(CPInfo->LockReaderDefinitionsFormat, + CP_LockReaderDefinitionsHandler, NULL); + CPInfo->CommPatternLockedFormat = + CMregister_format(CPInfo->cm, CommPatternLockedStructs); + CMregister_handler(CPInfo->CommPatternLockedFormat, + CP_CommPatternLockedHandler, NULL); + CPInfo->WriterCloseFormat = + CMregister_format(CPInfo->cm, WriterCloseStructs); + CMregister_handler(CPInfo->WriterCloseFormat, CP_WriterCloseHandler, NULL); + CPInfo->ReaderCloseFormat = + CMregister_format(CPInfo->cm, ReaderCloseStructs); + CMregister_handler(CPInfo->ReaderCloseFormat, CP_ReaderCloseHandler, NULL); +} + +static void doFFSFormatRegistration(CP_Info CPInfo, CP_DP_Interface DPInfo) +{ + FMStructDescList PerRankReaderStructs, CombinedReaderStructs; + FMStructDescList PerRankWriterStructs, CombinedWriterStructs; + FMStructDescList CombinedMetadataStructs; + FMFormat f; + + PerRankReaderStructs = combineCpDpFormats( + CP_DP_PairStructs, CP_ReaderInitStructs, DPInfo->ReaderContactFormats); + f = FMregister_data_format(CPInfo->fm_c, PerRankReaderStructs); + CPInfo->PerRankReaderInfoFormat = + FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); + FFSset_fixed_target(CPInfo->ffs_c, PerRankReaderStructs); + AddCustomStruct(&CPInfo->CustomStructs, PerRankReaderStructs); CombinedReaderStructs = combineCpDpFormats(CP_DP_ReaderArrayStructs, CP_ReaderInitStructs, @@ -891,7 +939,7 @@ static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) CPInfo->CombinedReaderInfoFormat = FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); FFSset_fixed_target(CPInfo->ffs_c, CombinedReaderStructs); - AddCustomStruct(CPInfo, CombinedReaderStructs); + AddCustomStruct(&CPInfo->CustomStructs, CombinedReaderStructs); PerRankWriterStructs = combineCpDpFormats(CP_DP_WriterPairStructs, CP_WriterInitStructs, @@ -900,16 +948,7 @@ static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) CPInfo->PerRankWriterInfoFormat = FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); FFSset_fixed_target(CPInfo->ffs_c, PerRankWriterStructs); - AddCustomStruct(CPInfo, PerRankWriterStructs); - - FullWriterResponseStructs = - combineCpDpFormats(CP_WriterResponseStructs, CP_WriterInitStructs, - DPInfo->WriterContactFormats); - CPInfo->WriterResponseFormat = - CMregister_format(CPInfo->cm, FullWriterResponseStructs); - CMregister_handler(CPInfo->WriterResponseFormat, CP_WriterResponseHandler, - NULL); - AddCustomStruct(CPInfo, FullWriterResponseStructs); + AddCustomStruct(&CPInfo->CustomStructs, PerRankWriterStructs); CombinedWriterStructs = combineCpDpFormats(CP_DP_WriterArrayStructs, CP_WriterInitStructs, @@ -918,7 +957,7 @@ static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) CPInfo->CombinedWriterInfoFormat = FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); FFSset_fixed_target(CPInfo->ffs_c, CombinedWriterStructs); - AddCustomStruct(CPInfo, CombinedWriterStructs); + AddCustomStruct(&CPInfo->CustomStructs, CombinedWriterStructs); CombinedMetadataStructs = combineCpDpFormats( MetaDataPlusDPInfoStructs, NULL, DPInfo->TimestepInfoFormats); @@ -926,15 +965,7 @@ static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) CPInfo->PerRankMetadataFormat = FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); FFSset_fixed_target(CPInfo->ffs_c, CombinedMetadataStructs); - AddCustomStruct(CPInfo, CombinedMetadataStructs); - - CombinedTimestepMetadataStructs = combineCpDpFormats( - TimestepMetadataStructs, NULL, DPInfo->TimestepInfoFormats); - CPInfo->DeliverTimestepMetadataFormat = - CMregister_format(CPInfo->cm, CombinedTimestepMetadataStructs); - CMregister_handler(CPInfo->DeliverTimestepMetadataFormat, - CP_TimestepMetadataHandler, NULL); - AddCustomStruct(CPInfo, CombinedTimestepMetadataStructs); + AddCustomStruct(&CPInfo->CustomStructs, CombinedMetadataStructs); CombinedMetadataStructs = combineCpDpFormats( TimestepMetadataDistributionStructs, NULL, DPInfo->TimestepInfoFormats); @@ -942,7 +973,7 @@ static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) CPInfo->TimestepDistributionFormat = FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); FFSset_fixed_target(CPInfo->ffs_c, CombinedMetadataStructs); - AddCustomStruct(CPInfo, CombinedMetadataStructs); + AddCustomStruct(&CPInfo->CustomStructs, CombinedMetadataStructs); CombinedMetadataStructs = combineCpDpFormats( ReturnMetadataInfoStructs, NULL, DPInfo->TimestepInfoFormats); @@ -950,45 +981,20 @@ static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) CPInfo->ReturnMetadataInfoFormat = FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); FFSset_fixed_target(CPInfo->ffs_c, CombinedMetadataStructs); - AddCustomStruct(CPInfo, CombinedMetadataStructs); - - CPInfo->PeerSetupFormat = CMregister_format(CPInfo->cm, PeerSetupStructs); - CMregister_handler(CPInfo->PeerSetupFormat, CP_PeerSetupHandler, NULL); - - CPInfo->ReaderActivateFormat = - CMregister_format(CPInfo->cm, ReaderActivateStructs); - CMregister_handler(CPInfo->ReaderActivateFormat, CP_ReaderActivateHandler, - NULL); - CPInfo->ReleaseTimestepFormat = - CMregister_format(CPInfo->cm, ReleaseTimestepStructs); - CMregister_handler(CPInfo->ReleaseTimestepFormat, CP_ReleaseTimestepHandler, - NULL); - CPInfo->LockReaderDefinitionsFormat = - CMregister_format(CPInfo->cm, LockReaderDefinitionsStructs); - CMregister_handler(CPInfo->LockReaderDefinitionsFormat, - CP_LockReaderDefinitionsHandler, NULL); - CPInfo->CommPatternLockedFormat = - CMregister_format(CPInfo->cm, CommPatternLockedStructs); - CMregister_handler(CPInfo->CommPatternLockedFormat, - CP_CommPatternLockedHandler, NULL); - CPInfo->WriterCloseFormat = - CMregister_format(CPInfo->cm, WriterCloseStructs); - CMregister_handler(CPInfo->WriterCloseFormat, CP_WriterCloseHandler, NULL); - CPInfo->ReaderCloseFormat = - CMregister_format(CPInfo->cm, ReaderCloseStructs); - CMregister_handler(CPInfo->ReaderCloseFormat, CP_ReaderCloseHandler, NULL); + AddCustomStruct(&CPInfo->CustomStructs, CombinedMetadataStructs); } -static CP_GlobalInfo CPInfo = NULL; -static int CPInfoRefCount = 0; +static pthread_mutex_t StateMutex = PTHREAD_MUTEX_INITIALIZER; +static CP_GlobalCMInfo SharedCMInfo = NULL; +static int SharedCMInfoRefCount = 0; extern void AddToLastCallFreeList(void *Block) { - CPInfo->LastCallFreeList = - realloc(CPInfo->LastCallFreeList, - sizeof(void *) * (CPInfo->LastCallFreeCount + 1)); - CPInfo->LastCallFreeList[CPInfo->LastCallFreeCount] = Block; - CPInfo->LastCallFreeCount++; + SharedCMInfo->LastCallFreeList = + realloc(SharedCMInfo->LastCallFreeList, + sizeof(void *) * (SharedCMInfo->LastCallFreeCount + 1)); + SharedCMInfo->LastCallFreeList[SharedCMInfo->LastCallFreeCount] = Block; + SharedCMInfo->LastCallFreeCount++; } extern void SstStreamDestroy(SstStream Stream) @@ -1136,33 +1142,38 @@ extern void SstStreamDestroy(SstStream Stream) free(Stream->ParamsBlock); Stream->ParamsBlock = NULL; } + if (Stream->CPInfo->ffs_c) + free_FFSContext(Stream->CPInfo->ffs_c); + if (Stream->CPInfo->fm_c) + free_FMcontext(Stream->CPInfo->fm_c); + FreeCustomStructs(&Stream->CPInfo->CustomStructs); + free(Stream->CPInfo); + pthread_mutex_unlock(&Stream->DataLock); // Stream is free'd in LastCall - CPInfoRefCount--; - if (CPInfoRefCount == 0) + pthread_mutex_lock(&StateMutex); + SharedCMInfoRefCount--; + if (SharedCMInfoRefCount == 0) { CP_verbose( Stream, "Reference count now zero, Destroying process SST info cache\n"); - CManager_close(CPInfo->cm); - if (CPInfo->ffs_c) - free_FFSContext(CPInfo->ffs_c); - if (CPInfo->fm_c) - free_FMcontext(CPInfo->fm_c); - FreeCustomStructs(CPInfo); + CManager_close(SharedCMInfo->cm); + FreeCustomStructs(&SharedCMInfo->CustomStructs); CP_verbose(Stream, "Freeing LastCallList\n"); - for (int i = 0; i < CPInfo->LastCallFreeCount; i++) + for (int i = 0; i < SharedCMInfo->LastCallFreeCount; i++) { - free(CPInfo->LastCallFreeList[i]); + free(SharedCMInfo->LastCallFreeList[i]); } - free(CPInfo->LastCallFreeList); - free(CPInfo); - CPInfo = NULL; + free(SharedCMInfo->LastCallFreeList); + free(SharedCMInfo); + SharedCMInfo = NULL; if (CP_SstParamsList) free_FMfield_list(CP_SstParamsList); CP_SstParamsList = NULL; } + pthread_mutex_unlock(&StateMutex); CP_verbose(&StackStream, "SstStreamDestroy successful, returning\n"); } @@ -1181,7 +1192,10 @@ extern char *CP_GetContactString(SstStream Stream, attr_list DPAttrs) set_string_attr(ListenList, IP_INTERFACE_ATOM, strdup(Stream->ConfigParams->NetworkInterface)); } - ContactList = CMget_specific_contact_list(Stream->CPInfo->cm, ListenList); + ContactList = + CMget_specific_contact_list(Stream->CPInfo->SharedCM->cm, ListenList); + ContactList = + CMderef_and_copy_list(Stream->CPInfo->SharedCM->cm, ContactList); if (strcmp(Stream->ConfigParams->ControlTransport, "enet") == 0) { set_int_attr(ContactList, CM_ENET_CONN_TIMEOUT, 60000); /* 60 seconds */ @@ -1196,87 +1210,96 @@ extern char *CP_GetContactString(SstStream Stream, attr_list DPAttrs) return ret; } -extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule) +extern CP_Info CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule) { + CP_Info StreamCP; - if (CPInfo) + pthread_mutex_lock(&StateMutex); + if (!SharedCMInfo) { - CPInfoRefCount++; - return CPInfo; - } - - initAtomList(); - CPInfo = malloc(sizeof(*CPInfo)); - memset(CPInfo, 0, sizeof(*CPInfo)); + initAtomList(); - CPInfo->cm = CManager_create_control(ControlModule); - if (CMfork_comm_thread(CPInfo->cm) == 0) - { - fprintf(stderr, "ADIOS2 SST Engine failed to fork a communication " - "thread.\nThis is a fatal condition, please check " - "resources or system settings.\nDying now.\n"); - exit(1); - } + SharedCMInfo = malloc(sizeof(*SharedCMInfo)); + memset(SharedCMInfo, 0, sizeof(*SharedCMInfo)); - if (globalNetinfoCallback) - { - IPDiagString = CMget_ip_config_diagnostics(CPInfo->cm); - } + SharedCMInfo->cm = CManager_create_control(ControlModule); + if (CMfork_comm_thread(SharedCMInfo->cm) == 0) + { + fprintf(stderr, "ADIOS2 SST Engine failed to fork a communication " + "thread.\nThis is a fatal condition, please check " + "resources or system settings.\nDying now.\n"); + exit(1); + } - CMlisten(CPInfo->cm); + if (globalNetinfoCallback) + { + IPDiagString = CMget_ip_config_diagnostics(SharedCMInfo->cm); + } - CPInfo->fm_c = create_local_FMcontext(); - CPInfo->ffs_c = create_FFSContext_FM(CPInfo->fm_c); + CMlisten(SharedCMInfo->cm); - if (!CP_SstParamsList) - { - int i = 0; - /* need to pre-process the CP_SstParamsList to fix typedecl values */ - CP_SstParamsList = copy_field_list(CP_SstParamsList_RAW); - while (CP_SstParamsList[i].field_name) + if (!CP_SstParamsList) { - if ((strcmp(CP_SstParamsList[i].field_type, "int") == 0) || - (strcmp(CP_SstParamsList[i].field_type, "size_t") == 0)) - { - free((void *)CP_SstParamsList[i].field_type); - CP_SstParamsList[i].field_type = strdup("integer"); - } - else if ((strcmp(CP_SstParamsList[i].field_type, "char*") == 0) || - (strcmp(CP_SstParamsList[i].field_type, "char *") == 0)) + int i = 0; + /* need to pre-process the CP_SstParamsList to fix typedecl values + */ + CP_SstParamsList = copy_field_list(CP_SstParamsList_RAW); + while (CP_SstParamsList[i].field_name) { - free((void *)CP_SstParamsList[i].field_type); - CP_SstParamsList[i].field_type = strdup("string"); + if ((strcmp(CP_SstParamsList[i].field_type, "int") == 0) || + (strcmp(CP_SstParamsList[i].field_type, "size_t") == 0)) + { + free((void *)CP_SstParamsList[i].field_type); + CP_SstParamsList[i].field_type = strdup("integer"); + } + else if ((strcmp(CP_SstParamsList[i].field_type, "char*") == + 0) || + (strcmp(CP_SstParamsList[i].field_type, "char *") == + 0)) + { + free((void *)CP_SstParamsList[i].field_type); + CP_SstParamsList[i].field_type = strdup("string"); + } + i++; } - i++; } - } - for (int i = 0; i < sizeof(CP_DP_WriterArrayStructs) / - sizeof(CP_DP_WriterArrayStructs[0]); - i++) - { - if (CP_DP_WriterArrayStructs[i].format_name && - (strcmp(CP_DP_WriterArrayStructs[i].format_name, "SstParams") == 0)) + for (int i = 0; i < sizeof(CP_DP_WriterArrayStructs) / + sizeof(CP_DP_WriterArrayStructs[0]); + i++) { - CP_DP_WriterArrayStructs[i].field_list = CP_SstParamsList; + if (CP_DP_WriterArrayStructs[i].format_name && + (strcmp(CP_DP_WriterArrayStructs[i].format_name, "SstParams") == + 0)) + { + CP_DP_WriterArrayStructs[i].field_list = CP_SstParamsList; + } } - } - for (int i = 0; i < sizeof(CP_WriterResponseStructs) / - sizeof(CP_WriterResponseStructs[0]); - i++) - { - if (CP_WriterResponseStructs[i].format_name && - (strcmp(CP_WriterResponseStructs[i].format_name, "SstParams") == 0)) + for (int i = 0; i < sizeof(CP_WriterResponseStructs) / + sizeof(CP_WriterResponseStructs[0]); + i++) { - CP_WriterResponseStructs[i].field_list = CP_SstParamsList; + if (CP_WriterResponseStructs[i].format_name && + (strcmp(CP_WriterResponseStructs[i].format_name, "SstParams") == + 0)) + { + CP_WriterResponseStructs[i].field_list = CP_SstParamsList; + } } + doCMFormatRegistration(SharedCMInfo, DPInfo); } + SharedCMInfoRefCount++; + pthread_mutex_unlock(&StateMutex); + + StreamCP = calloc(1, sizeof(*StreamCP)); + StreamCP->SharedCM = SharedCMInfo; + StreamCP->fm_c = create_local_FMcontext(); + StreamCP->ffs_c = create_FFSContext_FM(StreamCP->fm_c); - doFormatRegistration(CPInfo, DPInfo); + doFFSFormatRegistration(StreamCP, DPInfo); - CPInfoRefCount++; - return CPInfo; + return StreamCP; } SstStream CP_newStream() @@ -1465,7 +1488,10 @@ extern void CP_error(SstStream s, char *Format, ...) va_end(Args); } -static CManager CP_getCManager(SstStream Stream) { return Stream->CPInfo->cm; } +static CManager CP_getCManager(SstStream Stream) +{ + return Stream->CPInfo->SharedCM->cm; +} static SMPI_Comm CP_getMPIComm(SstStream Stream) { return Stream->mpiComm; } @@ -1479,7 +1505,8 @@ static int CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, CP_PeerConnection *Peers = (CP_PeerConnection *)Cohort; if (Peers[Rank].CMconn == NULL) { - Peers[Rank].CMconn = CMget_conn(s->CPInfo->cm, Peers[Rank].ContactList); + Peers[Rank].CMconn = + CMget_conn(s->CPInfo->SharedCM->cm, Peers[Rank].ContactList); if (!Peers[Rank].CMconn) { CP_error(s, diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h index c69b99324e..d7e48c386c 100644 --- a/source/adios2/toolkit/sst/cp/cp_internal.h +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -3,21 +3,18 @@ #define SSTMAGICV0 "#ADIOS2-SST v0\n" -typedef struct _CP_GlobalInfo +typedef struct StructList +{ + int CustomStructCount; + FMStructDescList *CustomStructList; +} CP_StructList; + +typedef struct _CP_GlobalCMInfo { /* exchange info */ CManager cm; - FFSContext ffs_c; - FMContext fm_c; - FFSTypeHandle PerRankReaderInfoFormat; - FFSTypeHandle CombinedReaderInfoFormat; CMFormat ReaderRegisterFormat; - FFSTypeHandle PerRankWriterInfoFormat; - FFSTypeHandle CombinedWriterInfoFormat; CMFormat WriterResponseFormat; - FFSTypeHandle PerRankMetadataFormat; - FFSTypeHandle TimestepDistributionFormat; - FFSTypeHandle ReturnMetadataInfoFormat; CMFormat DeliverTimestepMetadataFormat; CMFormat PeerSetupFormat; CMFormat ReaderActivateFormat; @@ -26,11 +23,25 @@ typedef struct _CP_GlobalInfo CMFormat CommPatternLockedFormat; CMFormat WriterCloseFormat; CMFormat ReaderCloseFormat; - int CustomStructCount; - FMStructDescList *CustomStructList; int LastCallFreeCount; void **LastCallFreeList; -} * CP_GlobalInfo; + struct StructList CustomStructs; +} * CP_GlobalCMInfo; + +typedef struct _CP_Info +{ + CP_GlobalCMInfo SharedCM; + FFSContext ffs_c; + FMContext fm_c; + FFSTypeHandle PerRankReaderInfoFormat; + FFSTypeHandle CombinedReaderInfoFormat; + FFSTypeHandle PerRankWriterInfoFormat; + FFSTypeHandle CombinedWriterInfoFormat; + FFSTypeHandle PerRankMetadataFormat; + FFSTypeHandle TimestepDistributionFormat; + FFSTypeHandle ReturnMetadataInfoFormat; + struct StructList CustomStructs; +} * CP_Info; struct _ReaderRegisterMsg; @@ -121,7 +132,7 @@ typedef struct FFSFormatBlock *FFSFormatList; struct _SstStream { - CP_GlobalInfo CPInfo; + CP_Info CPInfo; SMPI_Comm mpiComm; enum StreamRole Role; @@ -462,7 +473,7 @@ typedef struct _MetadataPlusDPInfo *MetadataPlusDPInfo; extern atom_t CM_TRANSPORT_ATOM; void CP_validateParams(SstStream stream, SstParams Params, int Writer); -extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule); +extern CP_Info CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule); extern char *CP_GetContactString(SstStream s, attr_list DPAttrs); extern SstStream CP_newStream(); extern void SstInternalProvideTimestep( diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 39aaf34520..0d804265fb 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -407,7 +407,7 @@ attr_list ContactWriter(SstStream Stream, char *Filename, SstParams Params, (globalNetinfoCallback)(2, CMContactString, NULL); } WriterRank0Contact = attr_list_from_string(CMContactString); - conn = CMget_conn(Stream->CPInfo->cm, WriterRank0Contact); + conn = CMget_conn(Stream->CPInfo->SharedCM->cm, WriterRank0Contact); free_attr_list(WriterRank0Contact); } if (conn) @@ -510,7 +510,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm) memset(&ReaderRegister, 0, sizeof(ReaderRegister)); ReaderRegister.WriterFile = WriterFileID; ReaderRegister.WriterResponseCondition = - CMCondition_get(Stream->CPInfo->cm, rank0_to_rank0_conn); + CMCondition_get(Stream->CPInfo->SharedCM->cm, rank0_to_rank0_conn); ReaderRegister.ReaderCohortSize = Stream->CohortSize; switch (Stream->ConfigParams->SpeculativePreloadMode) { @@ -543,11 +543,12 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm) /* the response value is set in the handler */ struct _WriterResponseMsg *response = NULL; - CMCondition_set_client_data(Stream->CPInfo->cm, + CMCondition_set_client_data(Stream->CPInfo->SharedCM->cm, ReaderRegister.WriterResponseCondition, &response); - if (CMwrite(rank0_to_rank0_conn, Stream->CPInfo->ReaderRegisterFormat, + if (CMwrite(rank0_to_rank0_conn, + Stream->CPInfo->SharedCM->ReaderRegisterFormat, &ReaderRegister) != 1) { CP_verbose(Stream, @@ -561,7 +562,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm) Stream, "Waiting for writer response message in SstReadOpen(\"%s\")\n", Filename, ReaderRegister.WriterResponseCondition); - CMCondition_wait(Stream->CPInfo->cm, + CMCondition_wait(Stream->CPInfo->SharedCM->cm, ReaderRegister.WriterResponseCondition); CP_verbose(Stream, "finished wait writer response message in read_open\n"); @@ -701,8 +702,9 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm) Stream->ConnectionsToWriter, ReturnData->DP_WriterInfo); CP_verbose(Stream, "Sending Reader Activate messages to writer\n"); memset(&Msg, 0, sizeof(Msg)); - sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderActivateFormat, &Msg, - &Msg.WSR_Stream); + sendOneToEachWriterRank(Stream, + Stream->CPInfo->SharedCM->ReaderActivateFormat, + &Msg, &Msg.WSR_Stream); CP_verbose(Stream, "Finish opening Stream \"%s\", starting with Step number %d\n", Filename, ReturnData->StartingStepNumber); @@ -780,9 +782,9 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream, "Sending ReleaseTimestep message for PRIOR DISCARD " "timestep %d, one to each writer\n", tsm->Timestep); - sendOneToEachWriterRank(Stream, - Stream->CPInfo->ReleaseTimestepFormat, &Msg, - &Msg.WSR_Stream); + sendOneToEachWriterRank( + Stream, Stream->CPInfo->SharedCM->ReleaseTimestepFormat, &Msg, + &Msg.WSR_Stream); } else { @@ -1076,7 +1078,7 @@ static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs) } TimeoutTask = - CMadd_delayed_task(Stream->CPInfo->cm, timeout_int_sec, + CMadd_delayed_task(Stream->CPInfo->SharedCM->cm, timeout_int_sec, timeout_int_usec, triggerDataCondition, Stream); while (1) { @@ -1155,13 +1157,13 @@ static void releasePriorTimesteps(SstStream Stream, long Latest) Last->Next = Next; } STREAM_MUTEX_UNLOCK(Stream); - sendOneToEachWriterRank(Stream, - Stream->CPInfo->ReleaseTimestepFormat, &Msg, - &Msg.WSR_Stream); + sendOneToEachWriterRank( + Stream, Stream->CPInfo->SharedCM->ReleaseTimestepFormat, &Msg, + &Msg.WSR_Stream); if (This->MetadataMsg == NULL) printf("READER RETURN_BUFFER, metadatamsg == %p, line %d\n", This->MetadataMsg, __LINE__); - CMreturn_buffer(Stream->CPInfo->cm, This->MetadataMsg); + CMreturn_buffer(Stream->CPInfo->SharedCM->cm, This->MetadataMsg); STREAM_MUTEX_LOCK(Stream); free(This); } @@ -1187,7 +1189,7 @@ static void FreeTimestep(SstStream Stream, long Timestep) if (List->MetadataMsg == NULL) printf("READER RETURN_BUFFER, List->MEtadataMsg == %p, line %d\n", List->MetadataMsg, __LINE__); - CMreturn_buffer(Stream->CPInfo->cm, List->MetadataMsg); + CMreturn_buffer(Stream->CPInfo->SharedCM->cm, List->MetadataMsg); free(List); } @@ -1204,7 +1206,8 @@ static void FreeTimestep(SstStream Stream, long Timestep) printf("READER RETURN_BUFFER, List->MEtadataMsg == %p, " "line %d\n", List->MetadataMsg, __LINE__); - CMreturn_buffer(Stream->CPInfo->cm, List->MetadataMsg); + CMreturn_buffer(Stream->CPInfo->SharedCM->cm, + List->MetadataMsg); free(List); break; @@ -1388,8 +1391,9 @@ extern void SstReaderDefinitionLock(SstStream Stream, long EffectiveTimestep) memset(&Msg, 0, sizeof(Msg)); Msg.Timestep = EffectiveTimestep; - sendOneToEachWriterRank(Stream, Stream->CPInfo->LockReaderDefinitionsFormat, - &Msg, &Msg.WSR_Stream); + sendOneToEachWriterRank( + Stream, Stream->CPInfo->SharedCM->LockReaderDefinitionsFormat, &Msg, + &Msg.WSR_Stream); } // SstReleaseStep is only called by the main program thread. It @@ -1430,8 +1434,9 @@ extern void SstReleaseStep(SstStream Stream) Stream, "Sending ReleaseTimestep message for timestep %d, one to each writer\n", Timestep); - sendOneToEachWriterRank(Stream, Stream->CPInfo->ReleaseTimestepFormat, &Msg, - &Msg.WSR_Stream); + sendOneToEachWriterRank(Stream, + Stream->CPInfo->SharedCM->ReleaseTimestepFormat, + &Msg, &Msg.WSR_Stream); if (Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS) { @@ -1990,12 +1995,12 @@ extern void SstReaderClose(SstStream Stream) gettimeofday(&CloseTime, NULL); timersub(&CloseTime, &Stream->ValidStartTime, &Diff); memset(&Msg, 0, sizeof(Msg)); - sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderCloseFormat, &Msg, - &Msg.WSR_Stream); + sendOneToEachWriterRank(Stream, Stream->CPInfo->SharedCM->ReaderCloseFormat, + &Msg, &Msg.WSR_Stream); if (Stream->Stats) Stream->Stats->ValidTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec; - CMusleep(Stream->CPInfo->cm, 100000); + CMusleep(Stream->CPInfo->SharedCM->cm, 100000); if (Stream->CurrentMetadata != NULL) { if (Stream->CurrentMetadata->FreeBlock) diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 007b0ba22e..c6fa39e28f 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -318,20 +318,6 @@ static void RemoveQueueEntries(SstStream Stream) } } -static void ReleaseAndDiscardRemainingTimesteps(SstStream Stream) -{ - CPTimestepList List = Stream->QueuedTimesteps; - - while (List) - { - List->Expired = 1; - List->PreciousTimestep = 0; - List->ReferenceCount = 0; - List = List->Next; - } - RemoveQueueEntries(Stream); -} - /* Queue maintenance: (ASSUME LOCKED) calculate smallest entry for CurrentTimestep in a reader. Update that @@ -517,7 +503,7 @@ static void SendPeerSetupMsg(WS_ReaderInfo reader, int reversePeer, int myRank) setup.WriterRank = myRank; setup.WriterCohortSize = Stream->CohortSize; STREAM_ASSERT_UNLOCKED(Stream); - if (CMwrite(conn, Stream->CPInfo->PeerSetupFormat, &setup) != 1) + if (CMwrite(conn, Stream->CPInfo->SharedCM->PeerSetupFormat, &setup) != 1) { CP_verbose(Stream, "Message failed to send to reader in sendPeerSetup in " @@ -606,7 +592,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, if (!reader->Connections[peer].CMconn) { reader->Connections[peer].CMconn = - CMget_conn(reader->ParentStream->CPInfo->cm, + CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm, reader->Connections[peer].ContactList); } @@ -660,7 +646,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, usleep(WriterRank * reader->ParentStream->ConnectionUsleepMultiplier); reader->Connections[peer].CMconn = - CMget_conn(reader->ParentStream->CPInfo->cm, + CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm, reader->Connections[peer].ContactList); if (!reader->Connections[peer].CMconn) @@ -694,7 +680,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, if (!reader->Connections[0].CMconn) { reader->Connections[0].CMconn = - CMget_conn(reader->ParentStream->CPInfo->cm, + CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm, reader->Connections[0].ContactList); } if (!reader->Connections[0].CMconn) @@ -811,7 +797,7 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream) &free_block); WriterResponseCondition = Req->Msg->WriterResponseCondition; conn = Req->Conn; - CMreturn_buffer(Stream->CPInfo->cm, Req->Msg); + CMreturn_buffer(Stream->CPInfo->SharedCM->cm, Req->Msg); free(Req); } else @@ -948,7 +934,8 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream) response.DP_WriterInfo[i] = pointers[i]->DP_Info; } STREAM_ASSERT_UNLOCKED(Stream); - if (CMwrite(conn, Stream->CPInfo->WriterResponseFormat, &response) != 1) + if (CMwrite(conn, Stream->CPInfo->SharedCM->WriterResponseFormat, + &response) != 1) { CP_verbose( Stream, @@ -1167,9 +1154,10 @@ static void SendTimestepEntryToSingleReader(SstStream Stream, Entry->Msg->PreloadMode = PMode; STREAM_MUTEX_LOCK(Stream); if (CP_WSR_Stream->ReaderStatus == Established) - sendOneToWSRCohort(CP_WSR_Stream, - Stream->CPInfo->DeliverTimestepMetadataFormat, - Entry->Msg, &Entry->Msg->RS_Stream); + sendOneToWSRCohort( + CP_WSR_Stream, + Stream->CPInfo->SharedCM->DeliverTimestepMetadataFormat, + Entry->Msg, &Entry->Msg->RS_Stream); } } @@ -1306,7 +1294,7 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, SMPI_Comm comm) if (Stream->RendezvousReaderCount > 0) { Stream->FirstReaderCondition = - CMCondition_get(Stream->CPInfo->cm, NULL); + CMCondition_get(Stream->CPInfo->SharedCM->cm, NULL); } else { @@ -1470,7 +1458,7 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream, if (NewState == PeerFailed) { // move to fully closed state later - CMfree(CMadd_delayed_task(ParentStream->CPInfo->cm, 2, 0, + CMfree(CMadd_delayed_task(ParentStream->CPInfo->SharedCM->cm, 2, 0, CloseWSRStream, CP_WSR_Stream)); } } @@ -1505,8 +1493,8 @@ void SstWriterClose(SstStream Stream) "SstWriterClose, Sending Close at Timestep %d, one to each reader\n", Msg.FinalTimestep); - sendOneToEachReaderRank(Stream, Stream->CPInfo->WriterCloseFormat, &Msg, - &Msg.RS_Stream); + sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat, + &Msg, &Msg.RS_Stream); UntagPreciousTimesteps(Stream); Stream->ConfigParams->ReserveQueueLimit = 0; @@ -1857,9 +1845,10 @@ static void ActOnTSLockStatus(SstStream Stream, long Timestep) } Msg.Timestep = Timestep; SomethingSent++; - sendOneToWSRCohort(Stream->Readers[i], - Stream->CPInfo->CommPatternLockedFormat, &Msg, - &Msg.RS_Stream); + sendOneToWSRCohort( + Stream->Readers[i], + Stream->CPInfo->SharedCM->CommPatternLockedFormat, &Msg, + &Msg.RS_Stream); Stream->Readers[i]->PreloadMode = SstPreloadLearned; Stream->Readers[i]->PreloadModeActiveTimestep = Timestep; CP_verbose(Stream, @@ -2273,9 +2262,9 @@ extern void SstInternalProvideTimestep( Timestep); STREAM_MUTEX_LOCK(Stream); - sendOneToEachReaderRank(Stream, - Stream->CPInfo->DeliverTimestepMetadataFormat, - Msg, &Msg->RS_Stream); + sendOneToEachReaderRank( + Stream, Stream->CPInfo->SharedCM->DeliverTimestepMetadataFormat, + Msg, &Msg->RS_Stream); Entry->Expired = 1; Entry->ReferenceCount = 0; diff --git a/source/adios2/toolkit/sst/dp/evpath_dp.c b/source/adios2/toolkit/sst/dp/evpath_dp.c index 72445ff018..8b0621090e 100644 --- a/source/adios2/toolkit/sst/dp/evpath_dp.c +++ b/source/adios2/toolkit/sst/dp/evpath_dp.c @@ -13,6 +13,16 @@ #include "adios2/toolkit/profiling/taustubs/taustubs.h" #include "dp_interface.h" +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) +#define NO_SANITIZE_THREAD __attribute__((no_sanitize("thread"))) +#endif +#endif + +#ifndef NO_SANITIZE_THREAD +#define NO_SANITIZE_THREAD +#endif + /* * Some conventions: * `RS` indicates a reader-side item. @@ -1475,21 +1485,22 @@ static FMStructDescRec EvpathWriterContactStructs[] = { // sizeof(struct _EvpathPerTimestepInfo), NULL}, // {NULL, NULL, 0, NULL}}; -static struct _CP_DP_Interface evpathDPInterface; +static struct _CP_DP_Interface evpathDPInterface = { + NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}; static int EvpathGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params) { // Define any unique attributes here - (void)attr_atom_from_string("EVPATH_DP_Attr"); + // (void)attr_atom_from_string("EVPATH_DP_Attr"); /* The evpath DP should be a lower priority than any RDMA dp, so return 1 */ return 1; } -extern CP_DP_Interface LoadEVpathDP() +extern NO_SANITIZE_THREAD CP_DP_Interface LoadEVpathDP() { - memset(&evpathDPInterface, 0, sizeof(evpathDPInterface)); evpathDPInterface.ReaderContactFormats = EvpathReaderContactStructs; evpathDPInterface.WriterContactFormats = EvpathWriterContactStructs; evpathDPInterface.TimestepInfoFormats = NULL; // EvpathTimestepInfoStructs; diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index 2776daa8a3..c0a02ea8b5 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -932,7 +932,9 @@ static FMStructDescRec RdmaTimestepInfoStructs[] = { sizeof(struct _RdmaPerTimestepInfo), NULL}, {NULL, NULL, 0, NULL}}; -static struct _CP_DP_Interface RdmaDPInterface; +static struct _CP_DP_Interface RdmaDPInterface = { + NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}; /* In RdmaGetPriority, the Rdma DP should do whatever is necessary to test to * see if it @@ -1050,7 +1052,6 @@ static void RdmaUnGetPriority(CP_Services Svcs, void *CP_Stream) extern CP_DP_Interface LoadRdmaDP() { - memset(&RdmaDPInterface, 0, sizeof(RdmaDPInterface)); RdmaDPInterface.ReaderContactFormats = RdmaReaderContactStructs; RdmaDPInterface.WriterContactFormats = RdmaWriterContactStructs; RdmaDPInterface.TimestepInfoFormats = RdmaTimestepInfoStructs; @@ -1063,8 +1064,12 @@ extern CP_DP_Interface LoadRdmaDP() RdmaDPInterface.notifyConnFailure = RdmaNotifyConnFailure; RdmaDPInterface.provideTimestep = RdmaProvideTimestep; RdmaDPInterface.readerRegisterTimestep = NULL; + RdmaDPInterface.timestepArrived = NULL; RdmaDPInterface.releaseTimestep = RdmaReleaseTimestep; RdmaDPInterface.readerReleaseTimestep = NULL; + RdmaDPInterface.RSReleaseTimestep = NULL; + RdmaDPInterface.WSRreadPatternLocked = NULL; + RdmaDPInterface.RSreadPatternLocked = NULL; RdmaDPInterface.destroyReader = RdmaDestroyReader; RdmaDPInterface.destroyWriter = RdmaDestroyWriter; RdmaDPInterface.destroyWriterPerReader = RdmaDestroyWriterPerReader; diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index 296a998f62..70ef5c46a5 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -9,6 +9,9 @@ gtest_add_tests_helper(StagingMPMD MPI_ONLY "" Engine.Staging. ".InSituMPI" EXTR if(ADIOS2_HAVE_SST) gtest_add_tests_helper(StagingMPMD MPI_ONLY "" Engine.Staging. ".SST.FFS" EXTRA_ARGS "SST" "MarshalMethod=FFS") gtest_add_tests_helper(StagingMPMD MPI_ONLY "" Engine.Staging. ".SST.BP" EXTRA_ARGS "SST" "MarshalMethod=BP") + gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. "SST.FFS" EXTRA_ARGS "SST" "MarshalMethod=FFS") + gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. "SST.BP" EXTRA_ARGS "SST" "MarshalMethod=BP") +# gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. "BP4_stream" EXTRA_ARGS "BP4" "") endif() foreach(helper From 228ed550bffa28f181cd2b84447913126f8285ca Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 16 Jun 2020 16:13:02 -0400 Subject: [PATCH 4/4] Add TestThreads.cpp --- .../engine/staging-common/TestThreads.cpp | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 testing/adios2/engine/staging-common/TestThreads.cpp diff --git a/testing/adios2/engine/staging-common/TestThreads.cpp b/testing/adios2/engine/staging-common/TestThreads.cpp new file mode 100644 index 0000000000..a9ade66fe6 --- /dev/null +++ b/testing/adios2/engine/staging-common/TestThreads.cpp @@ -0,0 +1,98 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ParseArgs.h" + +using dt = long long; + +int failure = 0; + +void Read() +{ + adios2::ADIOS adios; + adios2::IO io = adios.DeclareIO("IO"); + io.SetEngine("sst"); + + io.SetEngine(engine); + io.SetParameters(engineParams); + + adios2::Engine Reader = io.Open("communicate", adios2::Mode::Read); + + std::array ar; + + auto status = Reader.BeginStep(); + if (status == adios2::StepStatus::EndOfStream) + { + return; + } + + adios2::Variable
var = io.InquireVariable
("data"); + Reader.Get(var, ar.begin()); + Reader.EndStep(); + dt expect = 0; + for (auto &val : ar) + { + if (val != expect) + { + failure++; + break; + } + expect++; + } + + Reader.Close(); +} + +void Write() +{ + adios2::ADIOS adios; + adios2::IO io = adios.DeclareIO("IO"); + io.SetEngine(engine); + io.SetParameters(engineParams); + io.SetEngine("sst"); + adios2::Engine Writer = io.Open("communicate", adios2::Mode::Write); + + auto var = + io.DefineVariable
("data", adios2::Dims{10000, 10}, + adios2::Dims{0, 0}, adios2::Dims{10000, 10}); + + std::array ar; + + std::iota(ar.begin(), ar.end(), 0); + + Writer.BeginStep(); + Writer.Put
(var, ar.begin()); + Writer.EndStep(); + Writer.Close(); +} + +class TestThreads : public ::testing::Test +{ +public: + TestThreads() = default; +}; + +TEST_F(TestThreads, Basic) +{ + auto read_fut = std::async(std::launch::async, Read); + auto write_fut = std::async(std::launch::async, Write); + read_fut.wait(); + write_fut.wait(); +} + +int main(int argc, char **argv) +{ + ::testing::InitGoogleTest(&argc, argv); + ParseArgs(argc, argv); + + int result; + result = RUN_ALL_TESTS(); + return result; +}