Skip to content

Commit

Permalink
DAOS-16877 client: implement utilities for shared memory
Browse files Browse the repository at this point in the history
Features: shm

1. use tlsf as memory allocator
2. shared memory create/destroy
3. robust mutex in shared memory
4. hash table in shared memory

Required-githooks: true
Skipped-githooks: codespell

Signed-off-by: Lei Huang <[email protected]>
  • Loading branch information
wiliamhuang committed Dec 13, 2024
1 parent bd6f926 commit 5cf0088
Show file tree
Hide file tree
Showing 19 changed files with 3,043 additions and 11 deletions.
1 change: 1 addition & 0 deletions debian/daos-client-tests.install
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ usr/bin/daos_perf
usr/bin/daos_racer
usr/bin/daos_test
usr/bin/dfs_test
usr/bin/shm_test
usr/bin/jobtest
usr/bin/crt_launch
usr/bin/daos_gen_io_conf
Expand Down
77 changes: 71 additions & 6 deletions src/client/dfuse/pil4dfs/int_dfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
#include <daos/common.h>
#include <daos/dfs_lib_int.h>

#include <gurt/list.h>
#include <gurt/shm_alloc.h>
#include <gurt/shm_dict.h>

#include "hook.h"
#include "pil4dfs_int.h"

Expand Down Expand Up @@ -93,6 +97,12 @@ static int fd_dummy = -1;
/* Default dir cache garbage collector time-out in seconds */
#define DCACHE_GC_PERIOD 120

/* the pointer to global shared memory buffer across processes */
//static struct d_shm_buf_loc *p_shm_buf;

/* the hash table in shared memory for flock() across processes */
static struct d_shm_ht_head *ht_head_flock;

/* the number of low fd reserved */
static uint16_t low_fd_count;
/* the list of low fd reserved */
Expand Down Expand Up @@ -957,6 +967,8 @@ child_hdlr(void)
{
int rc;

shm_inc_ref();

/* daos is not initialized yet */
if (atomic_load_relaxed(&d_daos_inited) == false)
return;
Expand Down Expand Up @@ -4562,6 +4574,8 @@ reset_daos_env_before_exec(void)
{
int rc;

shm_dec_ref();

/* bash does fork(), then close opened files before exec(),
* so the fd for log file probably is invalid now.
*/
Expand Down Expand Up @@ -6608,7 +6622,13 @@ posix_fadvise64(int fd, off_t offset, off_t len, int advice)
int
flock(int fd, int operation)
{
int fd_directed;
int fd_directed;
daos_obj_id_t obj_id;
/* obj_id.hi + obj_id.low + item_name */
char key[DFS_MAX_NAME + sizeof(daos_obj_id_t)*2];
pthread_rwlock_t *p_locks;
int rc;
struct shm_ht_rec *link;

if (next_flock == NULL) {
next_flock = dlsym(RTLD_NEXT, "flock");
Expand All @@ -6623,11 +6643,32 @@ flock(int fd, int operation)
if (d_compatible_mode && fd < FD_FILE_BASE)
return next_flock(fd, operation);

/* We output the message only if env "D_IL_REPORT" is set. */
if (report)
DS_ERROR(ENOTSUP, "flock() is not implemented yet");
errno = ENOTSUP;
return -1;
if (shm_inited() == false)
/* shared memory is not properly setup */
return ENOTSUP;

rc = dfs_obj2id(d_file_list[fd_directed - FD_FILE_BASE]->file, &obj_id);
D_ASSERT(rc == 0);
rc = snprintf(key, DFS_MAX_NAME + sizeof(daos_obj_id_t), "%lx%lx%s", obj_id.hi, obj_id.lo,
d_file_list[fd_directed - FD_FILE_BASE]->item_name);
D_ASSERT((rc < (DFS_MAX_NAME + sizeof(daos_obj_id_t)*2)) && (rc > 0));
p_locks = (pthread_rwlock_t *)shm_ht_rec_find_insert(ht_head_flock, key, rc,
KEY_VALUE_PTHREAD_RWLOCK, sizeof(pthread_rwlock_t), &link);

if (operation == LOCK_SH) {
/* use read lock to mimic shared file lock */
rc = pthread_rwlock_rdlock(p_locks);
} else if (operation == LOCK_EX) {
/* use write lock to mimic exclusive file lock */
rc = pthread_rwlock_wrlock(p_locks);
} else if (operation == LOCK_UN) {
rc = pthread_rwlock_unlock(p_locks);
} else {
errno = EINVAL;
return -1;
}

return rc;
}

int
Expand Down Expand Up @@ -7133,6 +7174,27 @@ init_myhook(void)
else
daos_debug_inited = true;

rc = shm_init();
if (rc == 0) {
rc = shm_ht_create("shm_ht_flock", 8, 16, &ht_head_flock);
if (rc) {
/* decrease shared memory reference */
shm_dec_ref();
DS_ERROR(rc, "failed to create shm_ht_flock in shared memory");
}
} else {
/* shared memory cache will not be used. */
DS_ERROR(rc, "failed to initialize shared memory");
}

rc = d_agetenv_str(&env_log, "D_IL_REPORT");
if (env_log) {
report = true;
if (strncmp(env_log, "0", 2) == 0 || strncasecmp(env_log, "false", 6) == 0)
report = false;
d_freeenv_str(&env_log);
}

d_compatible_mode = false;
d_getenv_bool("D_IL_COMPATIBLE", &d_compatible_mode);

Expand Down Expand Up @@ -7383,6 +7445,9 @@ finalize_myhook(void)
int rc;
d_list_t *rlink;

if (shm_inited())
shm_dec_ref();

if (bypass)
return;

Expand Down
5 changes: 3 additions & 2 deletions src/gurt/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"""Build libgurt"""

SRC = ['debug.c', 'dlog.c', 'hash.c', 'misc.c', 'heap.c', 'errno.c',
'fault_inject.c', 'slab.c', 'telemetry.c', 'hlc.c', 'hlct.c', 'signals.c']
'fault_inject.c', 'slab.c', 'telemetry.c', 'hlc.c', 'hlct.c', 'signals.c',
'shm_alloc.c', 'shm_dict.c', 'shm_tlsf.c', 'shm_utils.c']


def scons():
Expand All @@ -19,7 +20,7 @@ def scons():

denv = env.Clone()

denv.AppendUnique(LIBS=['pthread', 'yaml', 'm', 'dl'])
denv.AppendUnique(LIBS=['pthread', 'yaml', 'm', 'dl', 'rt'])
denv.require('uuid')

gurt_targets = denv.SharedObject(SRC)
Expand Down
Loading

0 comments on commit 5cf0088

Please sign in to comment.