diff --git a/src/client/dfs/SConscript b/src/client/dfs/SConscript index 5b98d6d7e8a..906b1bfd7d4 100644 --- a/src/client/dfs/SConscript +++ b/src/client/dfs/SConscript @@ -44,7 +44,7 @@ def scons(): libraries = ['daos_common', 'daos', 'uuid', 'gurt'] dfs_src = ['common.c', 'cont.c', 'dir.c', 'file.c', 'io.c', 'lookup.c', 'mnt.c', 'obj.c', - 'pipeline.c', 'readdir.c', 'rename.c', 'xattr.c', 'dfs_sys.c'] + 'pipeline.c', 'readdir.c', 'rename.c', 'xattr.c', 'dfs_sys.c', 'metrics.c'] dfs = denv.d_library('dfs', dfs_src, LIBS=libraries) denv.Install('$PREFIX/lib64/', dfs) diff --git a/src/client/dfs/common.c b/src/client/dfs/common.c index 92e7470d54a..acbc7eb11f7 100644 --- a/src/client/dfs/common.c +++ b/src/client/dfs/common.c @@ -625,6 +625,8 @@ entry_stat(dfs_t *dfs, daos_handle_t th, daos_handle_t oh, const char *name, siz stbuf->st_atim.tv_sec = stbuf->st_mtim.tv_sec; stbuf->st_atim.tv_nsec = stbuf->st_mtim.tv_nsec; } + + DFS_OP_STAT_INCR(dfs, DOS_STAT); return 0; } @@ -710,6 +712,7 @@ open_dir(dfs_t *dfs, dfs_obj_t *parent, int flags, daos_oclass_id_t cid, struct D_ASSERT(rc == 0); dir->d.chunk_size = entry->chunk_size; dir->d.oclass = entry->oclass; + DFS_OP_STAT_INCR(dfs, DOS_MKDIR); return 0; } } @@ -742,6 +745,7 @@ open_dir(dfs_t *dfs, dfs_obj_t *parent, int flags, daos_oclass_id_t cid, struct oid_cp(&dir->oid, entry->oid); dir->d.chunk_size = entry->chunk_size; dir->d.oclass = entry->oclass; + DFS_OP_STAT_INCR(dfs, DOS_OPENDIR); return 0; } diff --git a/src/client/dfs/dfs_internal.h b/src/client/dfs/dfs_internal.h index 7425fc2f00d..41be576c349 100644 --- a/src/client/dfs/dfs_internal.h +++ b/src/client/dfs/dfs_internal.h @@ -15,6 +15,8 @@ #include #include +#include "metrics.h" + /** D-key name of SB metadata */ #define SB_DKEY "DFS_SB_METADATA" @@ -190,6 +192,8 @@ struct dfs { struct dfs_mnt_hdls *cont_hdl; /** the root dir stat buf */ struct stat root_stbuf; + /** DFS top-level metrics */ + struct dfs_metrics *metrics; }; struct dfs_entry { diff --git a/src/client/dfs/dir.c b/src/client/dfs/dir.c index 000c0625f58..188b79c917d 100644 --- a/src/client/dfs/dir.c +++ b/src/client/dfs/dir.c @@ -65,6 +65,7 @@ dfs_mkdir(dfs_t *dfs, dfs_obj_t *parent, const char *name, mode_t mode, daos_ocl if (rc != 0) return daos_der2errno(rc); + DFS_OP_STAT_INCR(dfs, DOS_MKDIR); return rc; } @@ -220,6 +221,7 @@ dfs_remove(dfs_t *dfs, dfs_obj_t *parent, const char *name, bool force, daos_obj if (oid) oid_cp(oid, entry.oid); + DFS_OP_STAT_INCR(dfs, DOS_UNLINK); out: rc = check_tx(th, rc); if (rc == ERESTART) diff --git a/src/client/dfs/io.c b/src/client/dfs/io.c index 3919d8cfe19..db8f79ab105 100644 --- a/src/client/dfs/io.c +++ b/src/client/dfs/io.c @@ -15,7 +15,20 @@ #include "dfs_internal.h" +static void +dfs_update_file_metrics(dfs_t *dfs, daos_size_t read_bytes, daos_size_t write_bytes) +{ + if (dfs == NULL || dfs->metrics == NULL) + return; + + if (read_bytes > 0) + d_tm_inc_gauge(dfs->metrics->dm_read_bytes, read_bytes); + if (write_bytes > 0) + d_tm_inc_gauge(dfs->metrics->dm_write_bytes, write_bytes); +} + struct dfs_read_params { + dfs_t *dfs; daos_size_t *read_size; daos_array_iod_t arr_iod; daos_range_t rg; @@ -35,6 +48,8 @@ read_cb(tse_task_t *task, void *data) D_GOTO(out, rc); } + DFS_OP_STAT_INCR(params->dfs, DOS_READ); + dfs_update_file_metrics(params->dfs, params->arr_iod.arr_nr_read, 0); *params->read_size = params->arr_iod.arr_nr_read; out: D_FREE(params); @@ -61,6 +76,7 @@ dfs_read_int(dfs_t *dfs, dfs_obj_t *obj, daos_off_t off, dfs_iod_t *iod, d_sg_li if (params == NULL) D_GOTO(err_task, rc = -DER_NOMEM); + params->dfs = dfs; params->read_size = read_size; /** set array location */ @@ -90,6 +106,7 @@ dfs_read_int(dfs_t *dfs, dfs_obj_t *obj, daos_off_t off, dfs_iod_t *iod, d_sg_li * completion cb that frees params in this case, so we can just ignore the rc here. */ dc_task_schedule(task, true); + return 0; err_params: @@ -125,6 +142,7 @@ dfs_read(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_size daos_event_launch(ev); daos_event_complete(ev, 0); } + DFS_OP_STAT_INCR(dfs, DOS_READ); return 0; } @@ -146,7 +164,9 @@ dfs_read(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_size return daos_der2errno(rc); } + DFS_OP_STAT_INCR(dfs, DOS_READ); *read_size = iod.arr_nr_read; + dfs_update_file_metrics(dfs, iod.arr_nr_read, 0); return 0; } @@ -173,6 +193,7 @@ dfs_readx(dfs_t *dfs, dfs_obj_t *obj, dfs_iod_t *iod, d_sg_list_t *sgl, daos_siz daos_event_launch(ev); daos_event_complete(ev, 0); } + DFS_OP_STAT_INCR(dfs, DOS_READ); return 0; } @@ -189,7 +210,9 @@ dfs_readx(dfs_t *dfs, dfs_obj_t *obj, dfs_iod_t *iod, d_sg_list_t *sgl, daos_siz return daos_der2errno(rc); } + DFS_OP_STAT_INCR(dfs, DOS_READ); *read_size = arr_iod.arr_nr_read; + dfs_update_file_metrics(dfs, arr_iod.arr_nr_read, 0); return 0; } @@ -223,6 +246,7 @@ dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_eve daos_event_launch(ev); daos_event_complete(ev, 0); } + DFS_OP_STAT_INCR(dfs, DOS_WRITE); return 0; } @@ -238,8 +262,12 @@ dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_eve daos_event_errno_rc(ev); rc = daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev); - if (rc) + if (rc == 0) { + DFS_OP_STAT_INCR(dfs, DOS_WRITE); + dfs_update_file_metrics(dfs, 0, buf_size); + } else { D_ERROR("daos_array_write() failed, " DF_RC "\n", DP_RC(rc)); + } return daos_der2errno(rc); } @@ -248,6 +276,8 @@ int dfs_writex(dfs_t *dfs, dfs_obj_t *obj, dfs_iod_t *iod, d_sg_list_t *sgl, daos_event_t *ev) { daos_array_iod_t arr_iod; + daos_size_t buf_size; + int i; int rc; if (dfs == NULL || !dfs->mounted) @@ -266,6 +296,7 @@ dfs_writex(dfs_t *dfs, dfs_obj_t *obj, dfs_iod_t *iod, d_sg_list_t *sgl, daos_ev daos_event_launch(ev); daos_event_complete(ev, 0); } + DFS_OP_STAT_INCR(dfs, DOS_WRITE); return 0; } @@ -276,9 +307,18 @@ dfs_writex(dfs_t *dfs, dfs_obj_t *obj, dfs_iod_t *iod, d_sg_list_t *sgl, daos_ev if (ev) daos_event_errno_rc(ev); + buf_size = 0; + if (dfs->metrics != NULL && sgl != NULL) + for (i = 0; i < sgl->sg_nr; i++) + buf_size += sgl->sg_iovs[i].iov_len; + rc = daos_array_write(obj->oh, DAOS_TX_NONE, &arr_iod, sgl, ev); - if (rc) + if (rc == 0) { + DFS_OP_STAT_INCR(dfs, DOS_WRITE); + dfs_update_file_metrics(dfs, 0, buf_size); + } else { D_ERROR("daos_array_write() failed (%d)\n", rc); + } return daos_der2errno(rc); } diff --git a/src/client/dfs/metrics.c b/src/client/dfs/metrics.c new file mode 100644 index 00000000000..40aefc90e73 --- /dev/null +++ b/src/client/dfs/metrics.c @@ -0,0 +1,174 @@ +/** + * (C) Copyright 2024 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ +#define D_LOGFAC DD_FAC(dfs) + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "metrics.h" +#include "dfs_internal.h" + +#define DFS_METRICS_ROOT "dfs" + +#define STAT_METRICS_SIZE (D_TM_METRIC_SIZE * DOS_LIMIT) +#define FILE_METRICS_SIZE (((D_TM_METRIC_SIZE * NR_SIZE_BUCKETS) * 2) + D_TM_METRIC_SIZE * 2) +#define DFS_METRICS_SIZE (STAT_METRICS_SIZE + FILE_METRICS_SIZE) + +#define SPRINTF_TM_PATH(buf, pool_uuid, cont_uuid, path) \ + snprintf(buf, sizeof(buf), "pool/" DF_UUIDF "/container/" DF_UUIDF "/%s", \ + DP_UUID(pool_uuid), DP_UUID(cont_uuid), path); + +#define ADD_STAT_METRIC(name, ...) \ + SPRINTF_TM_PATH(tmp_path, pool_uuid, cont_uuid, DFS_METRICS_ROOT "/ops/" #name); \ + rc = d_tm_add_metric(&metrics->dm_op_stats[i], D_TM_COUNTER, "Count of " #name " calls", \ + "calls", tmp_path); \ + if (rc != 0) { \ + DL_ERROR(rc, "failed to create " #name " counter"); \ + return; \ + } \ + i++; + +static void +op_stats_init(struct dfs_metrics *metrics, uuid_t pool_uuid, uuid_t cont_uuid) +{ + char tmp_path[D_TM_MAX_NAME_LEN] = {0}; + int i = 0; + int rc; + + if (metrics == NULL) + return; + + D_FOREACH_DFS_OP_STAT(ADD_STAT_METRIC); +} + +static void +cont_stats_init(struct dfs_metrics *metrics, uuid_t pool_uuid, uuid_t cont_uuid) +{ + char tmp_path[D_TM_MAX_NAME_LEN] = {0}; + int rc = 0; + + if (metrics == NULL) + return; + + SPRINTF_TM_PATH(tmp_path, pool_uuid, cont_uuid, "mount_time"); + rc = d_tm_add_metric(&metrics->dm_mount_time, D_TM_TIMESTAMP, "container mount time", NULL, + tmp_path); + if (rc != 0) + DL_ERROR(rc, "failed to create mount_time timestamp"); +} + +static void +file_stats_init(struct dfs_metrics *metrics, uuid_t pool_uuid, uuid_t cont_uuid) +{ + char tmp_path[D_TM_MAX_NAME_LEN] = {0}; + int rc = 0; + + if (metrics == NULL) + return; + + SPRINTF_TM_PATH(tmp_path, pool_uuid, cont_uuid, DFS_METRICS_ROOT "/read_bytes"); + rc = d_tm_add_metric(&metrics->dm_read_bytes, D_TM_STATS_GAUGE, "dfs read bytes", "bytes", + tmp_path); + if (rc != 0) + DL_ERROR(rc, "failed to create dfs read_bytes counter"); + rc = + d_tm_init_histogram(metrics->dm_read_bytes, tmp_path, NR_SIZE_BUCKETS, 256, 2, "bytes"); + if (rc) + DL_ERROR(rc, "Failed to init dfs read size histogram"); + + SPRINTF_TM_PATH(tmp_path, pool_uuid, cont_uuid, DFS_METRICS_ROOT "/write_bytes"); + rc = d_tm_add_metric(&metrics->dm_write_bytes, D_TM_STATS_GAUGE, "dfs write bytes", "bytes", + tmp_path); + if (rc != 0) + DL_ERROR(rc, "failed to create dfs write_bytes counter"); + rc = d_tm_init_histogram(metrics->dm_write_bytes, tmp_path, NR_SIZE_BUCKETS, 256, 2, + "bytes"); + if (rc) + DL_ERROR(rc, "Failed to init dfs write size histogram"); +} + +bool +dfs_metrics_enabled() +{ + /* set in client/api/metrics.c */ + return daos_client_metric; +} + +void +dfs_metrics_init(dfs_t *dfs) +{ + uuid_t pool_uuid; + uuid_t cont_uuid; + char root_name[D_TM_MAX_NAME_LEN]; + pid_t pid = getpid(); + size_t root_size = DFS_METRICS_SIZE + (D_TM_METRIC_SIZE * 3); + int rc; + + if (dfs == NULL) + return; + + rc = dc_pool_hdl2uuid(dfs->poh, NULL, &pool_uuid); + if (rc != 0) { + DL_ERROR(rc, "failed to get pool UUID"); + goto error; + } + + rc = dc_cont_hdl2uuid(dfs->coh, NULL, &cont_uuid); + if (rc != 0) { + DL_ERROR(rc, "failed to get container UUID"); + goto error; + } + + snprintf(root_name, sizeof(root_name), "%d", pid); + /* if only container-level metrics are enabled; this will init a root for them */ + rc = d_tm_init_with_name(d_tm_cli_pid_key(pid), root_size, D_TM_OPEN_OR_CREATE, root_name); + if (rc != 0 && rc != -DER_ALREADY) { + DL_ERROR(rc, "failed to init DFS metrics"); + goto error; + } + + D_ALLOC_PTR(dfs->metrics); + if (dfs->metrics == NULL) { + D_ERROR("failed to alloc DFS metrics"); + goto error; + } + + SPRINTF_TM_PATH(root_name, pool_uuid, cont_uuid, DFS_METRICS_ROOT); + rc = d_tm_add_ephemeral_dir(NULL, DFS_METRICS_SIZE, root_name); + if (rc != 0) { + DL_ERROR(rc, "failed to add DFS metrics dir"); + goto error; + } + + cont_stats_init(dfs->metrics, pool_uuid, cont_uuid); + op_stats_init(dfs->metrics, pool_uuid, cont_uuid); + file_stats_init(dfs->metrics, pool_uuid, cont_uuid); + + d_tm_record_timestamp(dfs->metrics->dm_mount_time); + return; + +error: + if (dfs->metrics != NULL) + D_FREE(dfs->metrics); +} + +void +dfs_metrics_fini(dfs_t *dfs) +{ + D_FREE(dfs->metrics); +} \ No newline at end of file diff --git a/src/client/dfs/metrics.h b/src/client/dfs/metrics.h new file mode 100644 index 00000000000..722a57590d7 --- /dev/null +++ b/src/client/dfs/metrics.h @@ -0,0 +1,79 @@ +/** + * (C) Copyright 2024 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ +#ifndef __DFS_METRICS_H__ +#define __DFS_METRICS_H__ + +#if defined(__cplusplus) +extern "C" { +#endif + +#include +#include +#include +#include + +/* + * Report read/write counts on a per-I/O size. + * Buckets starts at [0; 256B[ and are increased by power of 2 + * (i.e. [256B; 512B[, [512B; 1KB[) up to [4MB; infinity[ + * Since 4MB = 2^22 and 256B = 2^8, this means + * (22 - 8 + 1) = 15 buckets plus the 4MB+ bucket, so + * 16 buckets in total. + */ +#define NR_SIZE_BUCKETS 16 + +/* define a set of ops that we'll count if metrics are enabled */ +#define D_FOREACH_DFS_OP_STAT(ACTION) \ + ACTION(CHMOD) \ + ACTION(CHOWN) \ + ACTION(CREATE) \ + ACTION(GETSIZE) \ + ACTION(GETXATTR) \ + ACTION(LSXATTR) \ + ACTION(MKDIR) \ + ACTION(OPEN) \ + ACTION(OPENDIR) \ + ACTION(READ) \ + ACTION(READDIR) \ + ACTION(READLINK) \ + ACTION(RENAME) \ + ACTION(RMXATTR) \ + ACTION(SETATTR) \ + ACTION(SETXATTR) \ + ACTION(STAT) \ + ACTION(SYMLINK) \ + ACTION(SYNC) \ + ACTION(TRUNCATE) \ + ACTION(UNLINK) \ + ACTION(WRITE) + +#define DFS_OP_STAT_DEFINE(name, ...) DOS_##name, + +enum dfs_op_stat { + D_FOREACH_DFS_OP_STAT(DFS_OP_STAT_DEFINE) DOS_LIMIT, +}; + +#define DFS_OP_STAT_INCR(_dfs, _name) \ + if (_dfs->metrics != NULL) \ + d_tm_inc_counter(_dfs->metrics->dm_op_stats[(_name)], 1); + +struct dfs_metrics { + struct d_tm_node_t *dm_op_stats[DOS_LIMIT]; + struct d_tm_node_t *dm_read_bytes; + struct d_tm_node_t *dm_write_bytes; + struct d_tm_node_t *dm_mount_time; +}; + +bool +dfs_metrics_enabled(); + +void +dfs_metrics_init(dfs_t *dfs); + +void +dfs_metrics_fini(dfs_t *dfs); + +#endif /* __DFS_METRICS_H__ */ \ No newline at end of file diff --git a/src/client/dfs/mnt.c b/src/client/dfs/mnt.c index d270be1e414..d6b988da580 100644 --- a/src/client/dfs/mnt.c +++ b/src/client/dfs/mnt.c @@ -701,6 +701,9 @@ dfs_mount(daos_handle_t poh, daos_handle_t coh, int flags, dfs_t **_dfs) daos_obj_oid_cycle(&dfs->oid); } + if (dfs_metrics_enabled()) + dfs_metrics_init(dfs); + dfs->mounted = DFS_MOUNT; *_dfs = dfs; daos_prop_free(prop); @@ -746,6 +749,8 @@ dfs_umount(dfs_t *dfs) daos_obj_close(dfs->root.oh, NULL); daos_obj_close(dfs->super_oh, NULL); + dfs_metrics_fini(dfs); + D_FREE(dfs->prefix); D_MUTEX_DESTROY(&dfs->lock); D_FREE(dfs); diff --git a/src/client/dfs/obj.c b/src/client/dfs/obj.c index 309439bc807..21c09027bef 100644 --- a/src/client/dfs/obj.c +++ b/src/client/dfs/obj.c @@ -208,6 +208,7 @@ open_file(dfs_t *dfs, dfs_obj_t *parent, int flags, daos_oclass_id_t cid, daos_s return rc; } else { D_ASSERT(rc == 0); + DFS_OP_STAT_INCR(dfs, DOS_CREATE); return 0; } } @@ -261,6 +262,7 @@ open_file(dfs_t *dfs, dfs_obj_t *parent, int flags, daos_oclass_id_t cid, daos_s } } oid_cp(&file->oid, entry->oid); + DFS_OP_STAT_INCR(dfs, DOS_OPEN); return 0; } @@ -320,6 +322,8 @@ open_symlink(dfs_t *dfs, dfs_obj_t *parent, int flags, daos_oclass_id_t cid, con D_FREE(sym->value); D_ERROR("Inserting entry '%s' failed: %d (%s)\n", sym->name, rc, strerror(rc)); + } else if (rc == 0) { + DFS_OP_STAT_INCR(dfs, DOS_SYMLINK); } return rc; } @@ -882,6 +886,8 @@ ostatx_cb(tse_task_t *task, void *data) rc2 = daos_obj_close(args->parent_oh, NULL); if (rc == 0) rc = rc2; + if (rc == 0) + DFS_OP_STAT_INCR(args->dfs, DOS_STAT); return rc; } @@ -1013,6 +1019,7 @@ statx_task(tse_task_t *task) err1_out: D_FREE(op_args); daos_obj_close(args->parent_oh, NULL); + return rc; } @@ -1243,6 +1250,7 @@ dfs_chmod(dfs_t *dfs, dfs_obj_t *parent, const char *name, mode_t mode) D_GOTO(out, rc = daos_der2errno(rc)); } + DFS_OP_STAT_INCR(dfs, DOS_CHMOD); out: if (S_ISLNK(entry.mode)) { dfs_release(sym); @@ -1378,6 +1386,7 @@ dfs_chown(dfs_t *dfs, dfs_obj_t *parent, const char *name, uid_t uid, gid_t gid, D_GOTO(out, rc = daos_der2errno(rc)); } + DFS_OP_STAT_INCR(dfs, DOS_CHOWN); out: if (!(flags & O_NOFOLLOW) && S_ISLNK(entry.mode)) { dfs_release(sym); @@ -1598,6 +1607,7 @@ dfs_osetattr(dfs_t *dfs, dfs_obj_t *obj, struct stat *stbuf, int flags) D_GOTO(out_obj, rc = daos_der2errno(rc)); } + DFS_OP_STAT_INCR(dfs, DOS_SETATTR); out_stat: *stbuf = rstat; out_obj: @@ -1662,6 +1672,7 @@ dfs_punch(dfs_t *dfs, dfs_obj_t *obj, daos_off_t offset, daos_size_t len) return daos_der2errno(rc); } + DFS_OP_STAT_INCR(dfs, DOS_TRUNCATE); return rc; } @@ -1697,6 +1708,7 @@ dfs_get_symlink_value(dfs_obj_t *obj, char *buf, daos_size_t *size) strcpy(buf, obj->value); *size = val_size; + DFS_OP_STAT_INCR(obj->dfs, DOS_READLINK); return 0; } @@ -1709,6 +1721,7 @@ dfs_sync(dfs_t *dfs) return EPERM; /** Take a snapshot here and allow rollover to that when supported. */ + /** Uncomment this when supported. DFS_OP_STAT_INCR(dfs, DOS_SYNC); */ return 0; } diff --git a/src/client/dfs/readdir.c b/src/client/dfs/readdir.c index a284b92e4c2..ba748a8d4e9 100644 --- a/src/client/dfs/readdir.c +++ b/src/client/dfs/readdir.c @@ -81,6 +81,7 @@ readdir_int(dfs_t *dfs, dfs_obj_t *obj, daos_anchor_t *anchor, uint32_t *nr, str break; } *nr = key_nr; + DFS_OP_STAT_INCR(dfs, DOS_READDIR); out: D_FREE(enum_buf); @@ -180,6 +181,7 @@ dfs_iterate(dfs_t *dfs, dfs_obj_t *obj, daos_anchor_t *anchor, uint32_t *nr, siz } *nr = keys_nr; + DFS_OP_STAT_INCR(dfs, DOS_READDIR); out: D_FREE(kds); D_FREE(enum_buf); diff --git a/src/client/dfs/rename.c b/src/client/dfs/rename.c index a7431f03536..cb6991e0e0b 100644 --- a/src/client/dfs/rename.c +++ b/src/client/dfs/rename.c @@ -299,6 +299,8 @@ dfs_move_internal(dfs_t *dfs, unsigned int flags, dfs_obj_t *parent, const char rc = check_tx(th, rc); if (rc == ERESTART) goto restart; + if (rc == 0) + DFS_OP_STAT_INCR(dfs, DOS_RENAME); if (entry.value) { D_ASSERT(S_ISLNK(entry.mode)); diff --git a/src/client/dfs/xattr.c b/src/client/dfs/xattr.c index 49b700e3def..b3a13a31a8d 100644 --- a/src/client/dfs/xattr.c +++ b/src/client/dfs/xattr.c @@ -122,6 +122,7 @@ dfs_setxattr(dfs_t *dfs, dfs_obj_t *obj, const char *name, const void *value, da } } + DFS_OP_STAT_INCR(dfs, DOS_SETXATTR); out: daos_obj_close(oh, NULL); free: @@ -194,6 +195,7 @@ dfs_getxattr(dfs_t *dfs, dfs_obj_t *obj, const char *name, void *value, daos_siz } *size = iod.iod_size; + DFS_OP_STAT_INCR(dfs, DOS_GETXATTR); close: daos_obj_close(oh, NULL); @@ -277,6 +279,7 @@ dfs_removexattr(dfs_t *dfs, dfs_obj_t *obj, const char *name) D_GOTO(out, rc = daos_der2errno(rc)); } + DFS_OP_STAT_INCR(dfs, DOS_RMXATTR); out: daos_obj_close(oh, NULL); free: @@ -354,6 +357,7 @@ dfs_listxattr(dfs_t *dfs, dfs_obj_t *obj, char *list, daos_size_t *size) } *size = ret_size; + DFS_OP_STAT_INCR(dfs, DOS_LSXATTR); out: daos_obj_close(oh, NULL); return rc; diff --git a/src/control/lib/telemetry/promexp/client_test.go b/src/control/lib/telemetry/promexp/client_test.go index d533b9c7931..5c5a0541adb 100644 --- a/src/control/lib/telemetry/promexp/client_test.go +++ b/src/control/lib/telemetry/promexp/client_test.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/daos-stack/daos/src/control/common/test" + "github.com/daos-stack/daos/src/control/lib/telemetry" "github.com/daos-stack/daos/src/control/logging" ) @@ -25,6 +26,8 @@ func TestPromExp_extractClientLabels(t *testing.T) { jobID := "testJob" pid := "12345" tid := "67890" + poolUUID := test.MockPoolUUID(1) + contUUID := test.MockPoolUUID(2) testPath := func(suffix string) string { return fmt.Sprintf("ID: %d/%s/%s/%s/%s", shmID, jobID, pid, tid, suffix) @@ -74,12 +77,32 @@ func TestPromExp_extractClientLabels(t *testing.T) { }, }, "pool ops": { - input: fmt.Sprintf("ID: %d/%s/%s/pool/%s/ops/foo", shmID, jobID, pid, test.MockPoolUUID(1)), + input: fmt.Sprintf("ID: %d/%s/%s/pool/%s/ops/foo", shmID, jobID, pid, poolUUID), expName: "pool_ops_foo", expLabels: labelMap{ "jobid": jobID, "pid": pid, - "pool": test.MockPoolUUID(1).String(), + "pool": poolUUID.String(), + }, + }, + "dfs ops": { + input: fmt.Sprintf("ID: %d/%s/%s/pool/%s/container/%s/dfs/ops/CHMOD", shmID, jobID, pid, poolUUID, contUUID), + expName: "dfs_ops_chmod", + expLabels: labelMap{ + "jobid": jobID, + "pid": pid, + "pool": poolUUID.String(), + "container": contUUID.String(), + }, + }, + "dfs read bytes": { + input: fmt.Sprintf("ID: %d/%s/%s/pool/%s/container/%s/dfs/read_bytes", shmID, jobID, pid, poolUUID, contUUID), + expName: "dfs_read_bytes", + expLabels: labelMap{ + "jobid": jobID, + "pid": pid, + "pool": poolUUID.String(), + "container": contUUID.String(), }, }, } { @@ -136,6 +159,7 @@ func TestPromExp_NewClientCollector(t *testing.T) { if err != nil { t.Fatal(err) } + defer telemetry.Fini() result, err := NewClientCollector(ctx, log, cs, tc.opts) test.CmpErr(t, tc.expErr, err) diff --git a/src/control/lib/telemetry/promexp/engine.go b/src/control/lib/telemetry/promexp/engine.go index bb0481f12a9..8c61528381c 100644 --- a/src/control/lib/telemetry/promexp/engine.go +++ b/src/control/lib/telemetry/promexp/engine.go @@ -148,6 +148,13 @@ func extractLabels(log logging.Logger, in string) (labels labelMap, name string) compsIdx++ name += "_ops_" + comps[compsIdx] compsIdx++ + case "container": + compsIdx++ + labels["container"] = comps[compsIdx] + compsIdx++ + if comps[compsIdx] == "dfs" { + name = "" // dfs metrics shouldn't go under pool + } } case "io": name = "io" @@ -204,7 +211,7 @@ func extractLabels(log logging.Logger, in string) (labels labelMap, name string) } } - name = sanitizeMetricName(name) + name = strings.ToLower(sanitizeMetricName(name)) return } diff --git a/src/control/lib/telemetry/telemetry.go b/src/control/lib/telemetry/telemetry.go index 479c41e2aab..2e5f763c9cb 100644 --- a/src/control/lib/telemetry/telemetry.go +++ b/src/control/lib/telemetry/telemetry.go @@ -612,6 +612,29 @@ func CollectMetrics(ctx context.Context, s *Schema, out chan<- Metric) error { return nil } +type pruneMap map[string]struct{} + +func (pm pruneMap) add(path string) { + pm[path] = struct{}{} +} + +func (pm pruneMap) removeParents(path string) { + for parent := range pm { + if strings.HasPrefix(path, parent) { + delete(pm, parent) + } + } +} + +func (pm pruneMap) toPrune() []string { + var paths []string + for path := range pm { + paths = append(paths, path) + } + sort.Sort(sort.Reverse(sort.StringSlice(paths))) + return paths +} + // PruneUnusedSegments removes shared memory segments associated with // unused ephemeral subdirectories. func PruneUnusedSegments(ctx context.Context, maxSegAge time.Duration) error { @@ -628,7 +651,7 @@ func PruneUnusedSegments(ctx context.Context, maxSegAge time.Duration) error { return errors.New("invalid handle") } - var toPrune []string + pruneCandidates := make(pruneMap) procNode := func(hdl *handle, id string, node *C.struct_d_tm_node_t) { if node == nil || node.dtn_type != C.D_TM_DIRECTORY { return @@ -652,21 +675,22 @@ func PruneUnusedSegments(ctx context.Context, maxSegAge time.Duration) error { // If the creator process was someone other than us, and it's still // around, don't mess with the segment. if _, err := common.GetProcName(st.Cpid()); err == nil && st.Cpid() != unix.Getpid() { + pruneCandidates.removeParents(path) return } if time.Since(st.Ctime()) <= maxSegAge { + pruneCandidates.removeParents(path) return } - log.Tracef("adding %s to prune list", path) - toPrune = append(toPrune, path) + log.Tracef("adding %s to prune candidates list", path) + pruneCandidates.add(path) } visit(hdl, hdl.root, "", true, procNode) - sort.Sort(sort.Reverse(sort.StringSlice(toPrune))) - for _, path := range toPrune { + for _, path := range pruneCandidates.toPrune() { log.Tracef("pruning %s", path) if err := removeLink(hdl, path); err != nil { log.Errorf("failed to prune %s: %s", path, err) diff --git a/src/control/lib/telemetry/telemetry_test.go b/src/control/lib/telemetry/telemetry_test.go index bc63cc81399..84abed5daf7 100644 --- a/src/control/lib/telemetry/telemetry_test.go +++ b/src/control/lib/telemetry/telemetry_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/pkg/errors" "github.com/daos-stack/daos/src/control/common/test" @@ -461,3 +462,56 @@ func TestTelemetry_garbageCollection(t *testing.T) { }) } } + +func TestTelemetry_pruneMap(t *testing.T) { + type inputPath struct { + path string + shouldPrune bool + } + for name, tc := range map[string]struct { + inputPaths []inputPath + expToPrune []string + }{ + "empty": {}, + "no shared parents": { + inputPaths: []inputPath{ + {"/a", true}, + {"/b", true}, + {"/c", true}, + }, + expToPrune: []string{"/c", "/b", "/a"}, + }, + "deeply nested should not be pruned": { + inputPaths: []inputPath{ + {"/a", true}, + {"/a/b", true}, + {"/a/b/c", false}, + }, + expToPrune: nil, + }, + "deeply nested should be pruned": { + inputPaths: []inputPath{ + {"/a", true}, + {"/a/b", true}, + {"/a/b/c", true}, + }, + expToPrune: []string{"/a/b/c", "/a/b", "/a"}, + }, + } { + t.Run(name, func(t *testing.T) { + pm := make(pruneMap) + + for _, ip := range tc.inputPaths { + if ip.shouldPrune { + pm.add(ip.path) + } else { + pm.removeParents(ip.path) + } + } + + if diff := cmp.Diff(tc.expToPrune, pm.toPrune()); diff != "" { + t.Fatalf("unexpected toPrune list (-want, +got): %s", diff) + } + }) + } +} diff --git a/src/gurt/telemetry.c b/src/gurt/telemetry.c index 33b5e410dca..3e285c8dcdc 100644 --- a/src/gurt/telemetry.c +++ b/src/gurt/telemetry.c @@ -804,6 +804,12 @@ destroy_shmem_with_key(key_t key) return 0; } +static bool +is_initialized(void) +{ + return tm_shmem.ctx != NULL && tm_shmem.ctx->shmem_root != NULL; +} + /** * Initialize an instance of the telemetry and metrics API for the producer * process with the root set to the provided name. @@ -833,6 +839,9 @@ d_tm_init_with_name(int id, uint64_t mem_size, int flags, const char *root_name) int shmid = 0; int rc = DER_SUCCESS; + if (is_initialized()) + return -DER_ALREADY; + if (root_name == NULL || strnlen(root_name, D_TM_MAX_NAME_LEN) == 0) { D_ERROR("root name cannot be empty\n"); return -DER_INVAL; @@ -2253,13 +2262,6 @@ d_tm_find_metric(struct d_tm_context *ctx, char *path) return node; } -static bool -is_initialized(void) -{ - return tm_shmem.ctx != NULL && - tm_shmem.ctx->shmem_root != NULL; -} - /* * Get a pointer to the last token in the path without modifying the original * string. diff --git a/src/include/daos/pool.h b/src/include/daos/pool.h index e51758e16b1..6e722746227 100644 --- a/src/include/daos/pool.h +++ b/src/include/daos/pool.h @@ -157,6 +157,8 @@ void dc_pool_put(struct dc_pool *pool); int dc_pool_local2global(daos_handle_t poh, d_iov_t *glob); int dc_pool_global2local(d_iov_t glob, daos_handle_t *poh); +int + dc_pool_hdl2uuid(daos_handle_t poh, uuid_t *hdl_uuid, uuid_t *pool_uuid); int dc_pool_connect(tse_task_t *task); int dc_pool_disconnect(tse_task_t *task); int dc_pool_query(tse_task_t *task); diff --git a/src/pool/cli.c b/src/pool/cli.c index 6825f57568f..d758fe3d67d 100644 --- a/src/pool/cli.c +++ b/src/pool/cli.c @@ -1574,6 +1574,23 @@ dc_pool_global2local(d_iov_t glob, daos_handle_t *poh) return rc; } +int +dc_pool_hdl2uuid(daos_handle_t poh, uuid_t *hdl_uuid, uuid_t *uuid) +{ + struct dc_pool *dp; + + dp = dc_hdl2pool(poh); + if (dp == NULL) + return -DER_NO_HDL; + + if (hdl_uuid != NULL) + uuid_copy(*hdl_uuid, dp->dp_pool_hdl); + if (uuid != NULL) + uuid_copy(*uuid, dp->dp_pool); + dc_pool_put(dp); + return 0; +} + struct pool_update_state { struct rsvc_client client; struct dc_mgmt_sys *sys; diff --git a/src/tests/ftest/telemetry/basic_client_telemetry.py b/src/tests/ftest/telemetry/basic_client_telemetry.py index 1d115b4c95e..71b976abe30 100644 --- a/src/tests/ftest/telemetry/basic_client_telemetry.py +++ b/src/tests/ftest/telemetry/basic_client_telemetry.py @@ -46,9 +46,7 @@ def test_client_metrics_exist(self): self.log_step('Reading client telemetry (reads & writes should be > 0)') after_metrics = self.telemetry.collect_client_data(metric_names) for metric in metric_names: - msum = 0 - for value in after_metrics[metric].values(): - msum += value - self.assertGreater(msum, 0) + msum = sum(after_metrics[metric].values()) + self.assertGreater(msum, 0, f'{metric} value not greater than zero after I/O') self.log_step('Test passed') diff --git a/src/tests/ftest/telemetry/dfs_client_telemetry.py b/src/tests/ftest/telemetry/dfs_client_telemetry.py new file mode 100644 index 00000000000..2c234bb9341 --- /dev/null +++ b/src/tests/ftest/telemetry/dfs_client_telemetry.py @@ -0,0 +1,62 @@ +""" + (C) Copyright 2024 Intel Corporation. + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" +from ior_utils import read_data, write_data +from telemetry_test_base import TestWithClientTelemetry + + +class DFSClientTelemetry(TestWithClientTelemetry): + """Tests to verify DFS telemetry. + + :avocado: recursive + """ + + def test_dfs_metrics(self): + """JIRA ID: DAOS-16837. + + Verify that the DFS metrics are incrementing as expected. + After performing some I/O, the DFS-level metrics should look reasonable. + + Test steps: + 1) Create a pool and container + 2) Perform some I/O with IOR + 3) Verify that the DFS metrics are sane + + :avocado: tags=all,daily_regression + :avocado: tags=vm + :avocado: tags=dfs,telemetry + :avocado: tags=DFSClientTelemetry,test_dfs_metrics + """ + # create pool and container + pool = self.get_pool(connect=True) + container = self.get_container(pool=pool) + + self.log_step('Writing data to the pool (ior)') + ior = write_data(self, container) + self.log_step('Reading data from the pool (ior)') + read_data(self, ior, container) + + # after an IOR run, we'd expect this set of metrics to have values > 0 + val_metric_names = [ + 'client_dfs_ops_create', + 'client_dfs_ops_open', + 'client_dfs_ops_read', + 'client_dfs_ops_write' + ] + bkt_metric_names = [ + 'client_dfs_read_bytes', + 'client_dfs_write_bytes' + ] + + self.log_step('Reading dfs telemetry') + after_metrics = self.telemetry.collect_client_data(val_metric_names + bkt_metric_names) + for metric in val_metric_names: + msum = sum(after_metrics[metric].values()) + self.assertGreater(msum, 0, f'{metric} value not greater than zero after I/O') + for metric in bkt_metric_names: + msum = sum(hist['sample_sum'] for hist in after_metrics[metric].values()) + self.assertGreater(msum, 0, f'{metric} sample_sum not greater than zero after I/O') + + self.log_step('Test passed') diff --git a/src/tests/ftest/telemetry/dfs_client_telemetry.yaml b/src/tests/ftest/telemetry/dfs_client_telemetry.yaml new file mode 100644 index 00000000000..e0dd33d1f87 --- /dev/null +++ b/src/tests/ftest/telemetry/dfs_client_telemetry.yaml @@ -0,0 +1,45 @@ +hosts: + test_servers: 1 + test_clients: 1 + +timeout: 180 + +server_config: + name: daos_server + engines_per_host: 1 + engines: + 0: + targets: 4 + nr_xs_helpers: 0 + storage: + 0: + class: ram + scm_mount: /mnt/daos + system_ram_reserved: 1 + +agent_config: + telemetry_port: 9191 + telemetry_retain: 30s + telemetry_enabled: true + +pool: + scm_size: 2G + +container: + type: POSIX + dfs_oclass: SX + +ior: &ior_base + ppn: 4 + api: DFS + transfer_size: 512K + block_size: 1M + dfs_oclass: SX + +ior_write: + <<: *ior_base + flags: "-k -v -w -W -G 1" + +ior_read: + <<: *ior_base + flags: "-v -r -R -G 1" diff --git a/src/tests/ftest/util/telemetry_utils.py b/src/tests/ftest/util/telemetry_utils.py index 70d2352881d..c04c9e0a560 100644 --- a/src/tests/ftest/util/telemetry_utils.py +++ b/src/tests/ftest/util/telemetry_utils.py @@ -1010,6 +1010,34 @@ class ClientTelemetryUtils(TelemetryUtils): CLIENT_IO_OPS_TGT_PUNCH_LATENCY_METRICS +\ CLIENT_IO_OPS_TGT_UPDATE_ACTIVE_METRICS +\ CLIENT_IO_OPS_UPDATE_ACTIVE_METRICS + CLIENT_DFS_OPS_METRICS = [ + "client_dfs_ops_chmod", + "client_dfs_ops_chown", + "client_dfs_ops_create", + "client_dfs_ops_getsize", + "client_dfs_ops_getxattr", + "client_dfs_ops_lsxattr", + "client_dfs_ops_mkdir", + "client_dfs_ops_open", + "client_dfs_ops_opendir", + "client_dfs_ops_read", + "client_dfs_ops_readdir", + "client_dfs_ops_readlink", + "client_dfs_ops_rename", + "client_dfs_ops_rmxattr", + "client_dfs_ops_setattr", + "client_dfs_ops_setxattr", + "client_dfs_ops_stat", + "client_dfs_ops_symlink", + "client_dfs_ops_sync", + "client_dfs_ops_truncate", + "client_dfs_ops_unlink", + "client_dfs_ops_write"] + CLIENT_DFS_IO_METRICS = [ + "client_dfs_read_bytes", + "client_dfs_write_bytes"] + CLIENT_DFS_METRICS = CLIENT_DFS_OPS_METRICS +\ + CLIENT_DFS_IO_METRICS def __init__(self, dmg, servers, clients): """Create a ClientTelemetryUtils object. @@ -1023,11 +1051,12 @@ def __init__(self, dmg, servers, clients): super().__init__(dmg, servers) self.clients = NodeSet.fromlist(clients) - def get_all_client_metrics_names(self, with_pools=False): + def get_all_client_metrics_names(self, with_pools=False, with_dfs=False): """Get all the telemetry metrics names for this client. Args: with_pools (bool): if True, include pool metrics in the results + with_dfs (bool): if True, include DFS metrics in the results Returns: list: all of the telemetry metrics names for this client @@ -1037,6 +1066,8 @@ def get_all_client_metrics_names(self, with_pools=False): all_metrics_names.extend(self.CLIENT_IO_METRICS) if with_pools: all_metrics_names.extend(self.CLIENT_POOL_METRICS) + if with_dfs: + all_metrics_names.extend(self.CLIENT_DFS_METRICS) return all_metrics_names @@ -1217,13 +1248,21 @@ def _get_data(self, names, info): if name not in data: data[name] = {} for metric in metrics[name]['metrics']: - if 'labels' not in metric or 'value' not in metric: + if 'labels' not in metric or \ + ('value' not in metric and 'buckets' not in metric): continue labels = [f'host:{host}'] for key, value in metric['labels'].items(): labels.append(":".join([str(key), str(value)])) label_key = ",".join(sorted(labels)) - data[name][label_key] = metric['value'] + if 'value' in metric: + data[name][label_key] = metric['value'] + else: + data[name][label_key] = { + 'sample_count': metric['sample_count'], + 'sample_sum': metric['sample_sum'], + 'buckets': metric['buckets'], + } return data def _set_display(self, compare=None):