From 060440ab2f5c0b14d203b69b9010c17ff4035e9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 28 Nov 2023 13:33:41 -0500 Subject: [PATCH] Properly select the CXI keys The service IDs have always been 5 so far, so it did not matter. But the ID technically has to be chosen according to the device that the local instance connects with. --- source/adios2/toolkit/sst/dp/rdma_dp.c | 353 +++++++++++++++++-------- 1 file changed, 241 insertions(+), 112 deletions(-) diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index 3ec2eb7afb..a4e829efa9 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -184,13 +184,24 @@ struct fabric_state * plane would replace one or both of these with RDMA functionality. */ +static char const *get_preferred_domain(struct _SstParams *Params) +{ + if (Params->DataInterface) + { + return Params->DataInterface; + } + else + { + return getenv("FABRIC_IFACE"); + } +} + static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, - CP_Services Svcs, void *CP_Stream) + CP_Services Svcs, void *CP_Stream, char const *ifname) { struct fi_info *hints, *info, *originfo, *useinfo; struct fi_av_attr av_attr = {FI_AV_UNSPEC}; struct fi_cq_attr cq_attr = {0}; - char *ifname; int result; hints = fi_allocinfo(); @@ -237,13 +248,17 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, hints->domain_attr->data_progress = FI_PROGRESS_AUTO; } - if (Params->DataInterface) - { - ifname = Params->DataInterface; - } - else + /* + * ifname is passed as a function parameter of init_fabric() if + * a provider-specific key was configured and sent to the reader. + * Since the key is generally domain-specific, we must use that one in this + * case. + * The preferred domain is already considered upon key configuration, + * so this is fine. + */ + if (!ifname) { - ifname = getenv("FABRIC_IFACE"); + ifname = get_preferred_domain(Params); } fabric->info = NULL; @@ -747,18 +762,183 @@ static TimestepList GetStep(Rdma_WS_Stream Stream, long Timestep) return (Step); } -int get_cxi_auth_key_from_writer(struct cxi_auth_key *key, attr_list WriterContact) +int get_cxi_auth_key_from_env(CP_Services Svcs, void *CP_Stream, + struct _SstParams *Params, + struct cxi_auth_key *key, + char **used_device) { - long svc_id, vni; - if(!get_long_attr(WriterContact, attr_atom_from_string("svc_id"), &svc_id)) + int vni, first_vni, second_vni, svc_id; + + // Just some safety against string processing. + size_t const no_infinite_loops = 10000; + + // struct cxi_auth_key { + // /* The CXI service assigned to the Domain and Endpoints. A CXI + // service + // * is associated with a set of local resource limits, VNIs, and + // Traffic + // * Classes. + // * + // * The svc_id used by an OFI Domain must match all Endpoints belonging + // * to the Domain. + // */ + // uint32_t svc_id; + + // /* The Virtual Network ID (VNI) assigned to the Endpoint. Two + // Endpoints + // * must use the same VNI in order to communicate. + // * + // * Note that while the CXI service may define one or more VNIs which a + // * process can access, an Endpoint is assigned to only one. + // */ + // uint16_t vni; + // }; + + // typical value SLINGSHOT_DEVICES=cxi0,cxi1,cxi2,cxi3 + char const *slingshot_devices = getenv("SLINGSHOT_DEVICES"); + char const *preferred_device = get_preferred_domain(Params); + + size_t device_index = 0; + for (size_t no_infinite_loop_counter = 0;; + ++device_index, ++no_infinite_loop_counter) + { + if (no_infinite_loop_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + int found_end = 0; + size_t find_end_of_current_string = 0; + for (size_t no_infinite_loop_inner_counter = 0;; + ++find_end_of_current_string, ++no_infinite_loop_inner_counter) + { + if (no_infinite_loop_inner_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + switch (slingshot_devices[find_end_of_current_string]) + { + case '\0': + found_end = 1; + goto break_first_loop; + case ',': + goto break_first_loop; + default: + break; + } + } + break_first_loop:; + int use_this_device = + !preferred_device || (strncmp(preferred_device, slingshot_devices, + find_end_of_current_string) == 0); + if (use_this_device) + { + char *construct_used_device = + malloc(find_end_of_current_string + 1); + memcpy(construct_used_device, slingshot_devices, + find_end_of_current_string); + construct_used_device[find_end_of_current_string] = '\0'; + *used_device = construct_used_device; + break; + } + else if (found_end) + { + return EXIT_FAILURE; + } + else + { + // go to next iteration + slingshot_devices += find_end_of_current_string + 1; + } + } + + Svcs->verbose(CP_Stream, DPTraceVerbose, "Found device %s at index %zu\n", + *used_device, device_index); + + // typical value SLINGSHOT_VNIS=4576,4530 + char const *vni_env_str = getenv("SLINGSHOT_VNIS"); + if (!vni_env_str) + { + return EXIT_FAILURE; + } + + // typical value SLINGSHOT_SVC_IDS=5,5,5,5 + char const *svc_ids_env_str = getenv("SLINGSHOT_SVC_IDS"); + if (!svc_ids_env_str) { return EXIT_FAILURE; } + + { + int num_vnis = sscanf(vni_env_str, "%d,%d", &first_vni, &second_vni); + switch (num_vnis) + { + // first VNI is the subjob's VNI + case 1: + Svcs->verbose(CP_Stream, DPTraceVerbose, "Using first vni.\n"); + vni = first_vni; + break; + // if present, the second VNI is the containing job's VNI + // the first VNI belongs to the subjob + case 2: + Svcs->verbose(CP_Stream, DPTraceVerbose, "Using second vni.\n"); + vni = second_vni; + break; + default: + return EXIT_FAILURE; + } + } + + { + // Pick the service ID according to the device_index found above. + for (size_t svc_id_index = 0; svc_id_index < device_index; + ++svc_id_index) + { + for (size_t no_infinite_loop_counter = 0;; + ++no_infinite_loop_counter) + { + if (no_infinite_loop_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + switch (*(svc_ids_env_str++)) + { + case ',': + goto break_second_loop; + case '\0': + return EXIT_FAILURE; + default: + continue; + } + } + break_second_loop:; + } + + int num_svc_ids = sscanf(svc_ids_env_str, "%d", &svc_id); + switch (num_svc_ids) + { + case 1: + break; + default: + return EXIT_FAILURE; + } + } + + key->vni = vni; + key->svc_id = svc_id; + + return EXIT_SUCCESS; +} + +int get_cxi_auth_key_from_writer(struct cxi_auth_key *key, attr_list WriterContact) +{ + long vni; if(!get_long_attr(WriterContact, attr_atom_from_string("vni"), &vni)) { return EXIT_FAILURE; } - key->svc_id = (uint32_t) svc_id; key->vni = (uint16_t) vni; return EXIT_SUCCESS; } @@ -812,19 +992,37 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, Stream->PreloadAvail = 0; } - struct cxi_auth_key key; - if(get_cxi_auth_key_from_writer(&key, WriterContact) == EXIT_SUCCESS) + struct + { + struct cxi_auth_key key; + int valid; + } tagged_key; + + /* + * The svc_id of the key must match the device that this particular reader + * connects with. + * The vni (virtual network ID) must be the same across all communicating + * instances (get this from the writer). + */ + + char *required_device = NULL; + tagged_key.valid = get_cxi_auth_key_from_env( + Svcs, CP_Stream, Params, &tagged_key.key, &required_device); + + if (tagged_key.valid == EXIT_SUCCESS && + get_cxi_auth_key_from_writer(&tagged_key.key, WriterContact) == + EXIT_SUCCESS) { Svcs->verbose(CP_Stream, DPSummaryVerbose, - "Reader found CXI auth key: %d %d\n", key.vni, - key.svc_id); + "Reader found CXI auth key: %d %d\n", tagged_key.key.vni, + tagged_key.key.svc_id); Stream->Fabric->cxi_auth_key = calloc(1, sizeof(struct cxi_auth_key)); - memcpy(Stream->Fabric->cxi_auth_key, &key, sizeof(struct cxi_auth_key)); + memcpy(Stream->Fabric->cxi_auth_key, &tagged_key.key, sizeof(struct cxi_auth_key)); } else { Svcs->verbose(CP_Stream, DPSummaryVerbose, - "Reader found no CXI auth key"); + "Reader found no CXI auth key\n"); } #ifdef SST_HAVE_CRAY_DRC @@ -861,7 +1059,11 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, #endif /* SST_HAVE_CRAY_DRC */ - init_fabric(Stream->Fabric, Stream->Params, Svcs, CP_Stream); + init_fabric(Stream->Fabric, Stream->Params, Svcs, CP_Stream, required_device); + if (required_device) + { + free(required_device); + } if (!Fabric->info) { Svcs->verbose(CP_Stream, DPCriticalVerbose, @@ -943,80 +1145,6 @@ static void RdmaWritePatternLocked(CP_Services Svcs, DP_RS_Stream Stream_v, } } -int get_cxi_auth_key_from_env(CP_Services Svcs, void *CP_Stream, - struct cxi_auth_key *key) -{ - int vni, first_vni, second_vni, svc_id; - // struct cxi_auth_key { - // /* The CXI service assigned to the Domain and Endpoints. A CXI - // service - // * is associated with a set of local resource limits, VNIs, and - // Traffic - // * Classes. - // * - // * The svc_id used by an OFI Domain must match all Endpoints belonging - // * to the Domain. - // */ - // uint32_t svc_id; - - // /* The Virtual Network ID (VNI) assigned to the Endpoint. Two - // Endpoints - // * must use the same VNI in order to communicate. - // * - // * Note that while the CXI service may define one or more VNIs which a - // * process can access, an Endpoint is assigned to only one. - // */ - // uint16_t vni; - // }; - - char const *vni_env_str = getenv("SLINGSHOT_VNIS"); - if (!vni_env_str) - { - return EXIT_FAILURE; - } - char const *svc_ids_env_str = getenv("SLINGSHOT_SVC_IDS"); - if (!svc_ids_env_str) - { - return EXIT_FAILURE; - } - - { - int num_vnis = sscanf(vni_env_str, "%d,%d", &first_vni, &second_vni); - switch (num_vnis) - { - // first VNI is the subjob's VNI - case 1: - Svcs->verbose(CP_Stream, DPTraceVerbose, "Using first vni.\n"); - vni = first_vni; - break; - // if present, the second VNI is the containing job's VNI - // the first VNI belongs to the subjob - case 2: - Svcs->verbose(CP_Stream, DPTraceVerbose, "Using second vni.\n"); - vni = second_vni; - break; - default: - return EXIT_FAILURE; - } - } - - { - int num_svc_ids = sscanf(svc_ids_env_str, "%d", &svc_id); - switch (num_svc_ids) - { - case 1: - break; - default: - return EXIT_FAILURE; - } - } - - key->vni = vni; - key->svc_id = svc_id; - - return EXIT_SUCCESS; -} - static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params, attr_list DPAttrs, SstStats Stats) @@ -1051,18 +1179,24 @@ static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, Stream->Fabric = calloc(1, sizeof(struct fabric_state)); Fabric = Stream->Fabric; - struct tagged_key + struct { struct cxi_auth_key key; int valid; } tagged_key; - if (Stream->Rank == 0) - { - tagged_key.valid = - get_cxi_auth_key_from_env(Svcs, CP_Stream, &tagged_key.key); - } - SMPI_Bcast(&tagged_key, sizeof(tagged_key), SMPI_BYTE, 0, comm); + /* + * The svc_id of the key must match the device that this particular writer + * connects with. + * The vni (virtual network ID) must be the same across all communicating + * instances (use the one seen by rank 0). + */ + char *required_device = NULL; + tagged_key.valid = get_cxi_auth_key_from_env( + Svcs, CP_Stream, Params, &tagged_key.key, &required_device); + + // Ensure that all writers use the same virtual network ID + SMPI_Bcast(&tagged_key.key.vni, sizeof(tagged_key.key.vni), SMPI_BYTE, 0, comm); if (tagged_key.valid == EXIT_SUCCESS) { @@ -1070,8 +1204,6 @@ static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, "Writer found CXI auth key: %d %d\n", tagged_key.key.vni, tagged_key.key.svc_id); - set_long_attr(DPAttrs, attr_atom_from_string("svc_id"), - tagged_key.key.svc_id); set_long_attr(DPAttrs, attr_atom_from_string("vni"), tagged_key.key.vni); Stream->Fabric->cxi_auth_key = calloc(1, sizeof(struct cxi_auth_key)); @@ -1132,7 +1264,11 @@ static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, set_long_attr(DPAttrs, attr_atom_from_string("RDMA_DRC_CRED"), attr_cred); #endif /* SST_HAVE_CRAY_DRC */ - init_fabric(Stream->Fabric, Params, Svcs, CP_Stream); + init_fabric(Stream->Fabric, Params, Svcs, CP_Stream, required_device); + if (required_device) + { + free(required_device); + } Fabric = Stream->Fabric; if (!Fabric->info) { @@ -2030,7 +2166,7 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params) { struct fi_info *hints, *info, *originfo; - char *ifname; + char const *ifname; char *forkunsafe; int Ret = -1; @@ -2072,14 +2208,7 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, hints->domain_attr->data_progress = FI_PROGRESS_AUTO; } - if (Params->DataInterface) - { - ifname = Params->DataInterface; - } - else - { - ifname = getenv("FABRIC_IFACE"); - } + ifname = get_preferred_domain(Params); forkunsafe = getenv("FI_FORK_UNSAFE"); if (!forkunsafe)