Skip to content

Commit

Permalink
Update bulki usage with nested bulki/bulki entity structure
Browse files Browse the repository at this point in the history
  • Loading branch information
houjun committed Nov 5, 2024
1 parent 66af357 commit 17414ee
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 225 deletions.
4 changes: 2 additions & 2 deletions scripts/kvtag_query_scale/gen_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ NUM_SERVER_PROC_PER_NODE=1
NUM_CLIENT_PROC_PER_NODE=1


MAX_NODE=512
MAX_NODE=64
MAX_ATTR=1024
MAX_ATTRLEN=1000

PROG_BASENAME=kvtag_query_scale
PROG_BASENAME=kvtag_query

for (( i = 1; i <= $MAX_NODE; i*=2 )); do
for (( j = 0; j <= 1; j+=1 )); do
Expand Down
6 changes: 3 additions & 3 deletions scripts/kvtag_query_scale/template.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ rm -rf $PDC_TMPDIR/*
export PDC_TMPDIR=${PDC_TMPDIR}/$N_NODE/$USE_DART
mkdir -p $PDC_TMPDIR

EXECPATH=/global/cfs/cdirs/m2621/wzhang5/perlmutter/install/pdc/share/test/bin
TOOLPATH=/global/cfs/cdirs/m2621/wzhang5/perlmutter/install/pdc/share/test/bin
EXECPATH=/global/cfs/cdirs/m2621/perlmutter/pdc/build/bin
TOOLPATH=/global/cfs/cdirs/m2621/perlmutter/pdc/build/bin
SERVER=$EXECPATH/pdc_server.exe
CLIENT=$TOOLPATH/kvtag_query_scale
CLIENT=$TOOLPATH/kvtag_query
CLOSE=$EXECPATH/close_server

chmod +x $EXECPATH/*
Expand Down
54 changes: 30 additions & 24 deletions src/api/pdc_client_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -9451,32 +9451,38 @@ PDC_Client_query_kvtag_mpi(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_
}

BULKI_KV_Pair_Iterator *bulki_iter = BULKI_KV_Pair_iterator_init(deserializedBulki_g);
BULKI_KV_Pair * bulki_kv;
BULKI_KV_Pair *obj_kv, *tag_kv;
BULKI *tag_bulki;
// Iterate and get query result
while (NULL != (bulki_kv = BULKI_KV_Pair_iterator_next(bulki_iter))) {
/* printf("key: [%s]\n", (char*)bulki_kv->key.data); */
if (strcmp("_pdc_id", (char *)bulki_kv->key.data) == 0) {
pdc_id = *((uint64_t *)bulki_kv->value.data);
/* printf("value: %llu\n", pdc_id); */
}
else {
query_tag.type = bulki_iter->bulki->data->values[bulki_iter->current_idx - 1].pdc_type;
if (query_tag.type == kvtag->type) {
query_tag.name = (char *)bulki_kv->key.data;
query_tag.value = (void *)bulki_kv->value.data;
query_tag.size = bulki_iter->bulki->data->values[bulki_iter->current_idx - 1].size;
/* printf("value: %d\n", *((int*)bulki_kv->value.data)); */
if (PDC_is_matching_kvtag(kvtag, &query_tag) == TRUE) {
if (iter >= alloc_size) {
alloc_size *= 2;
*pdc_ids = (void *)realloc(*pdc_ids, alloc_size * sizeof(uint64_t));
while (NULL != (obj_kv = BULKI_KV_Pair_iterator_next(bulki_iter))) {

pdc_id = *((uint64_t *)obj_kv->key.data);
/* printf("id: [%llu]\n", pdc_id); */

BULKI_Entity_Iterator *bent_iter = Bent_iterator_init(&obj_kv->value, NULL, PDC_BULKI);
while(NULL != (tag_bulki = Bent_iterator_next_BULKI(bent_iter))) {

BULKI_KV_Pair_Iterator *tag_iter = BULKI_KV_Pair_iterator_init(tag_bulki);
while (NULL != (tag_kv = BULKI_KV_Pair_iterator_next(tag_iter))) {

query_tag.type = tag_kv->value.pdc_type;
if (query_tag.type == kvtag->type) {
query_tag.name = (char *)tag_kv->key.data;
query_tag.value = (void *)tag_kv->value.data;
query_tag.size = tag_kv->value.size;
/* printf("value: %d\n", *((int*)tag_kv->value.data)); */
if (PDC_is_matching_kvtag(kvtag, &query_tag) == TRUE) {
if (iter >= alloc_size) {
alloc_size *= 2;
*pdc_ids = (void *)realloc(*pdc_ids, alloc_size * sizeof(uint64_t));
}
(*pdc_ids)[iter++] = pdc_id;
/* printf("Found match %s:%d\n", query_tag.name, *(int*)query_tag.value); */
}
(*pdc_ids)[iter++] = pdc_id;
/* printf("Found match %s:%d\n", query_tag.name, *(int*)query_tag.value); */
}
} // End if same type
} // End else
} // End while
} // End if same type
} // End tag while
}
} // End obj while
*n_res = iter;
}
else {
Expand Down
25 changes: 0 additions & 25 deletions src/commons/serde/bulki/bulki.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,31 +392,6 @@ BULKI_put(BULKI *bulki, BULKI_Entity *key, BULKI_Entity *value)
get_BULKI_size(bulki);
}

void
BULKI_append(BULKI *bulki, BULKI_Entity *key, BULKI_Entity *value)
{
if (bulki == NULL || key == NULL || value == NULL) {
printf("Error: bulki, key, or value is NULL\n");
return;
}

if (bulki->numKeys >= bulki->capacity) {
bulki->capacity *= 2;
bulki->header->keys = realloc(bulki->header->keys, bulki->capacity * sizeof(BULKI_Entity));
bulki->data->values = realloc(bulki->data->values, bulki->capacity * sizeof(BULKI_Entity));
}
memcpy(&bulki->header->keys[bulki->numKeys], key, sizeof(BULKI_Entity));
// append bytes for type, size, and key
bulki->header->headerSize += key->size;

memcpy(&bulki->data->values[bulki->numKeys], value, sizeof(BULKI_Entity));
// append bytes for class, type, size, and data
bulki->data->dataSize += value->size;

bulki->numKeys++;
get_BULKI_size(bulki);
}

BULKI_Entity *
BULKI_delete(BULKI *bulki, BULKI_Entity *key)
{
Expand Down
8 changes: 0 additions & 8 deletions src/commons/serde/bulki/include/bulki.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,6 @@ BULKI *BULKI_init(int initial_field_count);
*/
void BULKI_put(BULKI *bulki, BULKI_Entity *key, BULKI_Entity *value);

/**
* @brief Append a key-value pair to the serialized data structure.
* @param data Pointer to the BULKI structure
* @param key Pointer to the BULKI_Entity structure representing the key
* @param value Pointer to the BULKI_Entity structure representing the value
*/
void BULKI_append(BULKI *bulki, BULKI_Entity *key, BULKI_Entity *value);

/**
* @brief Delete a key-value pair from the serialized data structure
*
Expand Down
222 changes: 61 additions & 161 deletions src/server/pdc_server_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1878,8 +1878,8 @@ PDC_Server_seralize_kvtag_someta_to_shm(uint32_t *n_meta, uint64_t **obj_ids, ui
pdc_metadata_t * elt;
pdc_kvtag_list_t * kvtag_list_elt;
HashTableIterator hash_table_iter;
int n_entry, nkvtag_in_buf = 0, nkvtag_per_buf = 0, buf_i = 0;
int is_prev_objid = 0, nclient_per_server, i;
int n_entry, nkvtag_in_buf = 0, nkvtag_per_buf = 0, buf_i = 0, count;
int nclient_per_server, i;
HashTablePair pair;
BULKI_Entity * key, *obj_key, *val, *obj_val;
BULKI * bulki;
Expand Down Expand Up @@ -1921,82 +1921,74 @@ PDC_Server_seralize_kvtag_someta_to_shm(uint32_t *n_meta, uint64_t **obj_ids, ui
// Init first BULKI buf
bulki = BULKI_init(nkvtag_per_buf);

// iterate over hash table entry
while (n_entry != 0 && hash_table_iter_has_more(&hash_table_iter)) {
pair = hash_table_iter_next(&hash_table_iter);
head = pair.value;
// iterate over each metadata obj, one hash table entry may have multiple obj
DL_FOREACH(head->metadata, elt)
{
if (elt->kvtag_list_head) {
obj_key = BULKI_ENTITY("_pdc_id", 1, PDC_STRING, PDC_CLS_ITEM);
obj_val = BULKI_ENTITY(&elt->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM);
// Add obj_id before all kvtags of an obj
if (is_prev_objid == 0) {
BULKI_append(bulki, obj_key, obj_val);
is_prev_objid = 1;
}
}
else {
// If this obj doesn't have kvtag, need to put obj_id to bulki next
is_prev_objid = 0;
continue;
}

DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt)
{
if (nkvtag_in_buf >= nkvtag_per_buf) {

// Create a shm
bulki_size = get_BULKI_size(bulki);
snprintf(shm_name, 64, "meta_shm.%d.%d", pdc_server_rank_g, buf_i);
bufs[buf_i] = PDC_Server_create_shm(shm_name, bulki_size);
// Serialize the data to shm after the current one reached limit
offset = 0;
bufs[buf_i] = BULKI_serialize_to_buffer(bulki, bufs[buf_i], &offset);
buf_sizes[buf_i] = bulki_size;
BULKI_free(bulki, 1);
buf_i++;
nkvtag_in_buf = 0;
}
if (nkvtag_in_buf == 0 && buf_i > 0) {
if (buf_i >= nclient_per_server) {
printf("==PDC_SERVER[%d]: Error with %s, buf ptr overflow!\n", pdc_server_rank_g,
__func__);
ret_value = FAIL;
goto done;
obj_key = BULKI_ENTITY(&elt->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM);
count = 0;
DL_COUNT(elt->kvtag_list_head, kvtag_list_elt, count);
BULKI *kvtag_bulki = BULKI_init(count);

// iterate over each kv pair of current obj
// save each kv pair as a bulki
DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt)
{
// Add to a BULKI buffer
key = BULKI_ENTITY(kvtag_list_elt->kvtag->name, 1, PDC_STRING, PDC_CLS_ITEM);
if (kvtag_list_elt->kvtag->type == PDC_STRING) {
val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, 1, kvtag_list_elt->kvtag->type,
PDC_CLS_ITEM);
}
else {
val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, kvtag_list_elt->kvtag->size,
kvtag_list_elt->kvtag->type, PDC_CLS_ITEM);
}
// Init BULKI buf
bulki = BULKI_init(nkvtag_per_buf);
// Add obj_id before all kvtags of an obj
BULKI_append(bulki, obj_key, obj_val);
is_prev_objid = 1;
}

// Add to a BULKI buffer
key = BULKI_ENTITY(kvtag_list_elt->kvtag->name, 1, PDC_STRING, PDC_CLS_ITEM);
if (kvtag_list_elt->kvtag->type == PDC_STRING) {
val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, 1, kvtag_list_elt->kvtag->type,
PDC_CLS_ITEM);
}
else {
val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, kvtag_list_elt->kvtag->size,
kvtag_list_elt->kvtag->type, PDC_CLS_ITEM);
}
BULKI_append(bulki, key, val);
is_prev_objid = 0;
nkvtag_in_buf++;
} // End for each kvtag in list
} // End for each metadata from hash table entry
} // End looping metadata hash table
BULKI_append(kvtag_bulki, key, val);
nkvtag_in_buf++;

} // End for each kvtag in list

// Use bulki that has all kv pairs of an obj as an bulki entity
BULKI_Entity *obj_value = empty_BULKI_Array_Entity();
BULKI_ENTITY_append_BULKI(obj_value, kvtag_bulki);
BULKI_put(bulki, obj_key, obj_value);

} // End if obj has kv tag
} // End for each metadata from hash table entry

if (nkvtag_in_buf >= nkvtag_per_buf) {
// Create a shm
bulki_size = get_BULKI_size(bulki);
snprintf(shm_name, 64, "meta_shm.%d.%d", pdc_server_rank_g, buf_i);
bufs[buf_i] = PDC_Server_create_shm(shm_name, bulki_size);
// Serialize the data to shm after the current one reached limit
offset = 0;
bufs[buf_i] = BULKI_serialize_to_buffer(bulki, bufs[buf_i], &offset);
buf_sizes[buf_i] = bulki_size;
BULKI_free(bulki, 1);
buf_i++;
nkvtag_in_buf = 0;
}

} // End looping metadata hash table

// Create a shm
bulki_size = get_BULKI_size(bulki);
snprintf(shm_name, 64, "meta_shm.%d.%d", pdc_server_rank_g, buf_i);
bufs[buf_i] = PDC_Server_create_shm(shm_name, bulki_size);
// Serialize the data to shm after the current one reached limit
offset = 0;
bufs[buf_i] = BULKI_serialize_to_buffer(bulki, bufs[buf_i], &offset);
buf_sizes[buf_i] = bulki_size;
BULKI_free(bulki, 1);
if (nkvtag_in_buf > 0) {
// Create a shm
bulki_size = get_BULKI_size(bulki);
snprintf(shm_name, 64, "meta_shm.%d.%d", pdc_server_rank_g, buf_i);
bufs[buf_i] = PDC_Server_create_shm(shm_name, bulki_size);
// Serialize the data to shm after the current one reached limit
offset = 0;
bufs[buf_i] = BULKI_serialize_to_buffer(bulki, bufs[buf_i], &offset);
buf_sizes[buf_i] = bulki_size;
BULKI_free(bulki, 1);
}

/* // Debug */
/* BULKI *deserializedBulki = BULKI_deserialize(bufs[buf_i]); */
Expand Down Expand Up @@ -3333,98 +3325,6 @@ PDC_Server_get_kvtag_someta(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_
return ret_value;
}

// Serialize all kvtags in current server to nsplit buffers, each with
// approx the same number of kvtags so they can be sent to (node-local) clients
// for parallel search
static perr_t
PDC_Server_seralize_kvtag_someta(int nbuf, void **bufs, uint64_t *buf_sizes)
{
perr_t ret_value = SUCCEED;
pdc_hash_table_entry_head *head;
pdc_metadata_t * elt;
pdc_kvtag_list_t * kvtag_list_elt;
HashTableIterator hash_table_iter;
int n_entry, nkvtag_in_buf = 0, nkvtag_per_buf = 0, buf_i = 0;
int is_prev_objid = 0;
HashTablePair pair;
BULKI_Entity * key, *obj_key, *val, *obj_val;
BULKI * bulki;

nkvtag_per_buf = ceil(metadata_total_count_g / nbuf);

if (metadata_hash_table_g != NULL) {

n_entry = hash_table_num_entries(metadata_hash_table_g);
hash_table_iterate(metadata_hash_table_g, &hash_table_iter);

// Init first BULKI buf
bulki = BULKI_init(nkvtag_per_buf);

while (n_entry != 0 && hash_table_iter_has_more(&hash_table_iter)) {
pair = hash_table_iter_next(&hash_table_iter);
head = pair.value;
DL_FOREACH(head->metadata, elt)
{
obj_key = BULKI_ENTITY("_pdc_id", 1, PDC_STRING, PDC_CLS_ITEM);
obj_val = BULKI_ENTITY(&elt->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM);
// Add obj_id before all kvtags of an obj
if (is_prev_objid == 0) {
BULKI_put(bulki, obj_key, obj_val);
is_prev_objid = 1;
}

DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt)
{
if (nkvtag_in_buf >= nkvtag_per_buf) {
// Serialize the data after the current one reached limit
bufs[buf_i] = BULKI_serialize(bulki, &buf_sizes[buf_i]);
BULKI_free(bulki, 1);
buf_i++;
nkvtag_in_buf = 0;
}
if (nkvtag_in_buf == 0 && buf_i > 0) {
if (buf_i >= nbuf) {
printf("==PDC_SERVER[%d]: Error with %s, buf ptr overflow!\n", pdc_server_rank_g,
__func__);
ret_value = FAIL;
goto done;
}
// Init BULKI buf
bulki = BULKI_init(nkvtag_per_buf);
// Add obj_id before all kvtags of an obj
BULKI_put(bulki, obj_key, obj_val);
is_prev_objid = 1;
}

// Add to a BULKI buffer
key = BULKI_ENTITY(kvtag_list_elt->kvtag->name, 1, PDC_STRING, PDC_CLS_ITEM);
if (kvtag_list_elt->kvtag->type == PDC_STRING) {
val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, 1, kvtag_list_elt->kvtag->type,
PDC_CLS_ITEM);
}
else {
val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, kvtag_list_elt->kvtag->size,
kvtag_list_elt->kvtag->type, PDC_CLS_ITEM);
}
BULKI_put(bulki, key, val);
is_prev_objid = 0;
nkvtag_in_buf++;
} // End for each kvtag in list
} // End for each metadata from hash table entry
} // End looping metadata hash table

// Serialize last buf
bufs[buf_i] = BULKI_serialize(bulki, &buf_sizes[buf_i]);
} // if (metadata_hash_table_g != NULL)
else {
printf("==PDC_SERVER: metadata_hash_table_g not initialized!\n");
ret_value = FAIL;
}

done:
return ret_value;
}

perr_t
PDC_Server_get_kvtag(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out)
{
Expand Down
Loading

0 comments on commit 17414ee

Please sign in to comment.