From 17414eeec91b1f45a3809be429c76d1aa43f9284 Mon Sep 17 00:00:00 2001 From: Houjun Tang Date: Tue, 5 Nov 2024 14:52:26 -0800 Subject: [PATCH] Update bulki usage with nested bulki/bulki entity structure --- scripts/kvtag_query_scale/gen_script.sh | 4 +- scripts/kvtag_query_scale/template.sh | 6 +- src/api/pdc_client_connect.c | 54 +++--- src/commons/serde/bulki/bulki.c | 25 --- src/commons/serde/bulki/include/bulki.h | 8 - src/server/pdc_server_metadata.c | 222 +++++++----------------- src/tests/kvtag_query.c | 4 +- 7 files changed, 98 insertions(+), 225 deletions(-) diff --git a/scripts/kvtag_query_scale/gen_script.sh b/scripts/kvtag_query_scale/gen_script.sh index 9daad526c..090b0d9ec 100755 --- a/scripts/kvtag_query_scale/gen_script.sh +++ b/scripts/kvtag_query_scale/gen_script.sh @@ -18,11 +18,11 @@ NUM_SERVER_PROC_PER_NODE=1 NUM_CLIENT_PROC_PER_NODE=1 -MAX_NODE=512 +MAX_NODE=64 MAX_ATTR=1024 MAX_ATTRLEN=1000 -PROG_BASENAME=kvtag_query_scale +PROG_BASENAME=kvtag_query for (( i = 1; i <= $MAX_NODE; i*=2 )); do for (( j = 0; j <= 1; j+=1 )); do diff --git a/scripts/kvtag_query_scale/template.sh b/scripts/kvtag_query_scale/template.sh index b9bad3bdf..202135cbb 100755 --- a/scripts/kvtag_query_scale/template.sh +++ b/scripts/kvtag_query_scale/template.sh @@ -59,10 +59,10 @@ rm -rf $PDC_TMPDIR/* export PDC_TMPDIR=${PDC_TMPDIR}/$N_NODE/$USE_DART mkdir -p $PDC_TMPDIR -EXECPATH=/global/cfs/cdirs/m2621/wzhang5/perlmutter/install/pdc/share/test/bin -TOOLPATH=/global/cfs/cdirs/m2621/wzhang5/perlmutter/install/pdc/share/test/bin +EXECPATH=/global/cfs/cdirs/m2621/perlmutter/pdc/build/bin +TOOLPATH=/global/cfs/cdirs/m2621/perlmutter/pdc/build/bin SERVER=$EXECPATH/pdc_server.exe -CLIENT=$TOOLPATH/kvtag_query_scale +CLIENT=$TOOLPATH/kvtag_query CLOSE=$EXECPATH/close_server chmod +x $EXECPATH/* diff --git a/src/api/pdc_client_connect.c b/src/api/pdc_client_connect.c index f9ef2e83e..52cb20a16 100644 --- a/src/api/pdc_client_connect.c +++ b/src/api/pdc_client_connect.c @@ -9451,32 +9451,38 @@ PDC_Client_query_kvtag_mpi(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_ } BULKI_KV_Pair_Iterator *bulki_iter = BULKI_KV_Pair_iterator_init(deserializedBulki_g); - BULKI_KV_Pair * bulki_kv; + BULKI_KV_Pair *obj_kv, *tag_kv; + BULKI *tag_bulki; // Iterate and get query result - while (NULL != (bulki_kv = BULKI_KV_Pair_iterator_next(bulki_iter))) { - /* printf("key: [%s]\n", (char*)bulki_kv->key.data); */ - if (strcmp("_pdc_id", (char *)bulki_kv->key.data) == 0) { - pdc_id = *((uint64_t *)bulki_kv->value.data); - /* printf("value: %llu\n", pdc_id); */ - } - else { - query_tag.type = bulki_iter->bulki->data->values[bulki_iter->current_idx - 1].pdc_type; - if (query_tag.type == kvtag->type) { - query_tag.name = (char *)bulki_kv->key.data; - query_tag.value = (void *)bulki_kv->value.data; - query_tag.size = bulki_iter->bulki->data->values[bulki_iter->current_idx - 1].size; - /* printf("value: %d\n", *((int*)bulki_kv->value.data)); */ - if (PDC_is_matching_kvtag(kvtag, &query_tag) == TRUE) { - if (iter >= alloc_size) { - alloc_size *= 2; - *pdc_ids = (void *)realloc(*pdc_ids, alloc_size * sizeof(uint64_t)); + while (NULL != (obj_kv = BULKI_KV_Pair_iterator_next(bulki_iter))) { + + pdc_id = *((uint64_t *)obj_kv->key.data); + /* printf("id: [%llu]\n", pdc_id); */ + + BULKI_Entity_Iterator *bent_iter = Bent_iterator_init(&obj_kv->value, NULL, PDC_BULKI); + while(NULL != (tag_bulki = Bent_iterator_next_BULKI(bent_iter))) { + + BULKI_KV_Pair_Iterator *tag_iter = BULKI_KV_Pair_iterator_init(tag_bulki); + while (NULL != (tag_kv = BULKI_KV_Pair_iterator_next(tag_iter))) { + + query_tag.type = tag_kv->value.pdc_type; + if (query_tag.type == kvtag->type) { + query_tag.name = (char *)tag_kv->key.data; + query_tag.value = (void *)tag_kv->value.data; + query_tag.size = tag_kv->value.size; + /* printf("value: %d\n", *((int*)tag_kv->value.data)); */ + if (PDC_is_matching_kvtag(kvtag, &query_tag) == TRUE) { + if (iter >= alloc_size) { + alloc_size *= 2; + *pdc_ids = (void *)realloc(*pdc_ids, alloc_size * sizeof(uint64_t)); + } + (*pdc_ids)[iter++] = pdc_id; + /* printf("Found match %s:%d\n", query_tag.name, *(int*)query_tag.value); */ } - (*pdc_ids)[iter++] = pdc_id; - /* printf("Found match %s:%d\n", query_tag.name, *(int*)query_tag.value); */ - } - } // End if same type - } // End else - } // End while + } // End if same type + } // End tag while + } + } // End obj while *n_res = iter; } else { diff --git a/src/commons/serde/bulki/bulki.c b/src/commons/serde/bulki/bulki.c index fcdbcdf8b..c9578287f 100644 --- a/src/commons/serde/bulki/bulki.c +++ b/src/commons/serde/bulki/bulki.c @@ -392,31 +392,6 @@ BULKI_put(BULKI *bulki, BULKI_Entity *key, BULKI_Entity *value) get_BULKI_size(bulki); } -void -BULKI_append(BULKI *bulki, BULKI_Entity *key, BULKI_Entity *value) -{ - if (bulki == NULL || key == NULL || value == NULL) { - printf("Error: bulki, key, or value is NULL\n"); - return; - } - - if (bulki->numKeys >= bulki->capacity) { - bulki->capacity *= 2; - bulki->header->keys = realloc(bulki->header->keys, bulki->capacity * sizeof(BULKI_Entity)); - bulki->data->values = realloc(bulki->data->values, bulki->capacity * sizeof(BULKI_Entity)); - } - memcpy(&bulki->header->keys[bulki->numKeys], key, sizeof(BULKI_Entity)); - // append bytes for type, size, and key - bulki->header->headerSize += key->size; - - memcpy(&bulki->data->values[bulki->numKeys], value, sizeof(BULKI_Entity)); - // append bytes for class, type, size, and data - bulki->data->dataSize += value->size; - - bulki->numKeys++; - get_BULKI_size(bulki); -} - BULKI_Entity * BULKI_delete(BULKI *bulki, BULKI_Entity *key) { diff --git a/src/commons/serde/bulki/include/bulki.h b/src/commons/serde/bulki/include/bulki.h index d52bec39e..6352d145c 100644 --- a/src/commons/serde/bulki/include/bulki.h +++ b/src/commons/serde/bulki/include/bulki.h @@ -266,14 +266,6 @@ BULKI *BULKI_init(int initial_field_count); */ void BULKI_put(BULKI *bulki, BULKI_Entity *key, BULKI_Entity *value); -/** - * @brief Append a key-value pair to the serialized data structure. - * @param data Pointer to the BULKI structure - * @param key Pointer to the BULKI_Entity structure representing the key - * @param value Pointer to the BULKI_Entity structure representing the value - */ -void BULKI_append(BULKI *bulki, BULKI_Entity *key, BULKI_Entity *value); - /** * @brief Delete a key-value pair from the serialized data structure * diff --git a/src/server/pdc_server_metadata.c b/src/server/pdc_server_metadata.c index 2a213591a..40c2f956d 100644 --- a/src/server/pdc_server_metadata.c +++ b/src/server/pdc_server_metadata.c @@ -1878,8 +1878,8 @@ PDC_Server_seralize_kvtag_someta_to_shm(uint32_t *n_meta, uint64_t **obj_ids, ui pdc_metadata_t * elt; pdc_kvtag_list_t * kvtag_list_elt; HashTableIterator hash_table_iter; - int n_entry, nkvtag_in_buf = 0, nkvtag_per_buf = 0, buf_i = 0; - int is_prev_objid = 0, nclient_per_server, i; + int n_entry, nkvtag_in_buf = 0, nkvtag_per_buf = 0, buf_i = 0, count; + int nclient_per_server, i; HashTablePair pair; BULKI_Entity * key, *obj_key, *val, *obj_val; BULKI * bulki; @@ -1921,82 +1921,74 @@ PDC_Server_seralize_kvtag_someta_to_shm(uint32_t *n_meta, uint64_t **obj_ids, ui // Init first BULKI buf bulki = BULKI_init(nkvtag_per_buf); + // iterate over hash table entry while (n_entry != 0 && hash_table_iter_has_more(&hash_table_iter)) { pair = hash_table_iter_next(&hash_table_iter); head = pair.value; + // iterate over each metadata obj, one hash table entry may have multiple obj DL_FOREACH(head->metadata, elt) { if (elt->kvtag_list_head) { - obj_key = BULKI_ENTITY("_pdc_id", 1, PDC_STRING, PDC_CLS_ITEM); - obj_val = BULKI_ENTITY(&elt->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM); - // Add obj_id before all kvtags of an obj - if (is_prev_objid == 0) { - BULKI_append(bulki, obj_key, obj_val); - is_prev_objid = 1; - } - } - else { - // If this obj doesn't have kvtag, need to put obj_id to bulki next - is_prev_objid = 0; - continue; - } - - DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt) - { - if (nkvtag_in_buf >= nkvtag_per_buf) { - - // Create a shm - bulki_size = get_BULKI_size(bulki); - snprintf(shm_name, 64, "meta_shm.%d.%d", pdc_server_rank_g, buf_i); - bufs[buf_i] = PDC_Server_create_shm(shm_name, bulki_size); - // Serialize the data to shm after the current one reached limit - offset = 0; - bufs[buf_i] = BULKI_serialize_to_buffer(bulki, bufs[buf_i], &offset); - buf_sizes[buf_i] = bulki_size; - BULKI_free(bulki, 1); - buf_i++; - nkvtag_in_buf = 0; - } - if (nkvtag_in_buf == 0 && buf_i > 0) { - if (buf_i >= nclient_per_server) { - printf("==PDC_SERVER[%d]: Error with %s, buf ptr overflow!\n", pdc_server_rank_g, - __func__); - ret_value = FAIL; - goto done; + obj_key = BULKI_ENTITY(&elt->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM); + count = 0; + DL_COUNT(elt->kvtag_list_head, kvtag_list_elt, count); + BULKI *kvtag_bulki = BULKI_init(count); + + // iterate over each kv pair of current obj + // save each kv pair as a bulki + DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt) + { + // Add to a BULKI buffer + key = BULKI_ENTITY(kvtag_list_elt->kvtag->name, 1, PDC_STRING, PDC_CLS_ITEM); + if (kvtag_list_elt->kvtag->type == PDC_STRING) { + val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, 1, kvtag_list_elt->kvtag->type, + PDC_CLS_ITEM); + } + else { + val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, kvtag_list_elt->kvtag->size, + kvtag_list_elt->kvtag->type, PDC_CLS_ITEM); } - // Init BULKI buf - bulki = BULKI_init(nkvtag_per_buf); - // Add obj_id before all kvtags of an obj - BULKI_append(bulki, obj_key, obj_val); - is_prev_objid = 1; - } - // Add to a BULKI buffer - key = BULKI_ENTITY(kvtag_list_elt->kvtag->name, 1, PDC_STRING, PDC_CLS_ITEM); - if (kvtag_list_elt->kvtag->type == PDC_STRING) { - val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, 1, kvtag_list_elt->kvtag->type, - PDC_CLS_ITEM); - } - else { - val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, kvtag_list_elt->kvtag->size, - kvtag_list_elt->kvtag->type, PDC_CLS_ITEM); - } - BULKI_append(bulki, key, val); - is_prev_objid = 0; - nkvtag_in_buf++; - } // End for each kvtag in list - } // End for each metadata from hash table entry - } // End looping metadata hash table + BULKI_append(kvtag_bulki, key, val); + nkvtag_in_buf++; + + } // End for each kvtag in list + + // Use bulki that has all kv pairs of an obj as an bulki entity + BULKI_Entity *obj_value = empty_BULKI_Array_Entity(); + BULKI_ENTITY_append_BULKI(obj_value, kvtag_bulki); + BULKI_put(bulki, obj_key, obj_value); + + } // End if obj has kv tag + } // End for each metadata from hash table entry + + if (nkvtag_in_buf >= nkvtag_per_buf) { + // Create a shm + bulki_size = get_BULKI_size(bulki); + snprintf(shm_name, 64, "meta_shm.%d.%d", pdc_server_rank_g, buf_i); + bufs[buf_i] = PDC_Server_create_shm(shm_name, bulki_size); + // Serialize the data to shm after the current one reached limit + offset = 0; + bufs[buf_i] = BULKI_serialize_to_buffer(bulki, bufs[buf_i], &offset); + buf_sizes[buf_i] = bulki_size; + BULKI_free(bulki, 1); + buf_i++; + nkvtag_in_buf = 0; + } + + } // End looping metadata hash table - // Create a shm - bulki_size = get_BULKI_size(bulki); - snprintf(shm_name, 64, "meta_shm.%d.%d", pdc_server_rank_g, buf_i); - bufs[buf_i] = PDC_Server_create_shm(shm_name, bulki_size); - // Serialize the data to shm after the current one reached limit - offset = 0; - bufs[buf_i] = BULKI_serialize_to_buffer(bulki, bufs[buf_i], &offset); - buf_sizes[buf_i] = bulki_size; - BULKI_free(bulki, 1); + if (nkvtag_in_buf > 0) { + // Create a shm + bulki_size = get_BULKI_size(bulki); + snprintf(shm_name, 64, "meta_shm.%d.%d", pdc_server_rank_g, buf_i); + bufs[buf_i] = PDC_Server_create_shm(shm_name, bulki_size); + // Serialize the data to shm after the current one reached limit + offset = 0; + bufs[buf_i] = BULKI_serialize_to_buffer(bulki, bufs[buf_i], &offset); + buf_sizes[buf_i] = bulki_size; + BULKI_free(bulki, 1); + } /* // Debug */ /* BULKI *deserializedBulki = BULKI_deserialize(bufs[buf_i]); */ @@ -3333,98 +3325,6 @@ PDC_Server_get_kvtag_someta(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_ return ret_value; } -// Serialize all kvtags in current server to nsplit buffers, each with -// approx the same number of kvtags so they can be sent to (node-local) clients -// for parallel search -static perr_t -PDC_Server_seralize_kvtag_someta(int nbuf, void **bufs, uint64_t *buf_sizes) -{ - perr_t ret_value = SUCCEED; - pdc_hash_table_entry_head *head; - pdc_metadata_t * elt; - pdc_kvtag_list_t * kvtag_list_elt; - HashTableIterator hash_table_iter; - int n_entry, nkvtag_in_buf = 0, nkvtag_per_buf = 0, buf_i = 0; - int is_prev_objid = 0; - HashTablePair pair; - BULKI_Entity * key, *obj_key, *val, *obj_val; - BULKI * bulki; - - nkvtag_per_buf = ceil(metadata_total_count_g / nbuf); - - if (metadata_hash_table_g != NULL) { - - n_entry = hash_table_num_entries(metadata_hash_table_g); - hash_table_iterate(metadata_hash_table_g, &hash_table_iter); - - // Init first BULKI buf - bulki = BULKI_init(nkvtag_per_buf); - - while (n_entry != 0 && hash_table_iter_has_more(&hash_table_iter)) { - pair = hash_table_iter_next(&hash_table_iter); - head = pair.value; - DL_FOREACH(head->metadata, elt) - { - obj_key = BULKI_ENTITY("_pdc_id", 1, PDC_STRING, PDC_CLS_ITEM); - obj_val = BULKI_ENTITY(&elt->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM); - // Add obj_id before all kvtags of an obj - if (is_prev_objid == 0) { - BULKI_put(bulki, obj_key, obj_val); - is_prev_objid = 1; - } - - DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt) - { - if (nkvtag_in_buf >= nkvtag_per_buf) { - // Serialize the data after the current one reached limit - bufs[buf_i] = BULKI_serialize(bulki, &buf_sizes[buf_i]); - BULKI_free(bulki, 1); - buf_i++; - nkvtag_in_buf = 0; - } - if (nkvtag_in_buf == 0 && buf_i > 0) { - if (buf_i >= nbuf) { - printf("==PDC_SERVER[%d]: Error with %s, buf ptr overflow!\n", pdc_server_rank_g, - __func__); - ret_value = FAIL; - goto done; - } - // Init BULKI buf - bulki = BULKI_init(nkvtag_per_buf); - // Add obj_id before all kvtags of an obj - BULKI_put(bulki, obj_key, obj_val); - is_prev_objid = 1; - } - - // Add to a BULKI buffer - key = BULKI_ENTITY(kvtag_list_elt->kvtag->name, 1, PDC_STRING, PDC_CLS_ITEM); - if (kvtag_list_elt->kvtag->type == PDC_STRING) { - val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, 1, kvtag_list_elt->kvtag->type, - PDC_CLS_ITEM); - } - else { - val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, kvtag_list_elt->kvtag->size, - kvtag_list_elt->kvtag->type, PDC_CLS_ITEM); - } - BULKI_put(bulki, key, val); - is_prev_objid = 0; - nkvtag_in_buf++; - } // End for each kvtag in list - } // End for each metadata from hash table entry - } // End looping metadata hash table - - // Serialize last buf - bufs[buf_i] = BULKI_serialize(bulki, &buf_sizes[buf_i]); - } // if (metadata_hash_table_g != NULL) - else { - printf("==PDC_SERVER: metadata_hash_table_g not initialized!\n"); - ret_value = FAIL; - } - -done: - return ret_value; -} - perr_t PDC_Server_get_kvtag(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out) { diff --git a/src/tests/kvtag_query.c b/src/tests/kvtag_query.c index e7c537254..8be59d3c0 100644 --- a/src/tests/kvtag_query.c +++ b/src/tests/kvtag_query.c @@ -173,8 +173,8 @@ main(int argc, char *argv[]) } } } - if (my_rank == 0) - println("Rank %d: Added %d kvtag to the %d th object\n", my_rank, round, i); + /* if (my_rank == 0) */ + /* println("Rank %d: Added %d kvtag to the %d th object", my_rank, round, i); */ } #ifdef ENABLE_MPI