diff --git a/docs/user/filesystem.md b/docs/user/filesystem.md index cf9ca813dc2..937864a1bf9 100644 --- a/docs/user/filesystem.md +++ b/docs/user/filesystem.md @@ -228,7 +228,6 @@ Additionally, there are several optional command-line options: | --container= | container label or uuid to open | | --sys-name= | DAOS system name | | --foreground | run in foreground | -| --singlethreaded | run single threaded | | --thread-count= | Number of threads to use | | --multi-user | Run in multi user mode | | --read-only | Mount in read-only mode | diff --git a/src/client/dfuse/SConscript b/src/client/dfuse/SConscript index fdd60eac11d..12ef33841d4 100644 --- a/src/client/dfuse/SConscript +++ b/src/client/dfuse/SConscript @@ -7,6 +7,7 @@ DFUSE_SRC = ['dfuse_core.c', 'dfuse_main.c', 'dfuse_fuseops.c', 'inval.c', + 'file.c', 'dfuse_cont.c', 'dfuse_thread.c', 'dfuse_pool.c'] diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index 58847287391..223914f8442 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -29,7 +29,6 @@ struct dfuse_info { char *di_mountpoint; int32_t di_thread_count; uint32_t di_eq_count; - bool di_threaded; bool di_foreground; bool di_caching; bool di_multi_user; @@ -96,6 +95,9 @@ struct dfuse_eq { * memory consumption */ #define DFUSE_MAX_PRE_READ (1024 * 1024 * 4) +/* Maximum file-size for pre-read in all cases */ +#define DFUSE_MAX_PRE_READ_ONCE (1024 * 1024 * 1) + /* Launch fuse, and do not return until complete */ int dfuse_launch_fuse(struct dfuse_info *dfuse_info, struct fuse_args *args); @@ -137,9 +139,10 @@ struct dfuse_inode_entry; * when EOF is returned to the kernel. If it's still present on release then it's freed then. */ struct dfuse_pre_read { - pthread_mutex_t dra_lock; + d_list_t req_list; struct dfuse_event *dra_ev; int dra_rc; + bool complete; }; /** what is returned as the handle for fuse fuse_file_info on create/open/opendir */ @@ -149,8 +152,6 @@ struct dfuse_obj_hdl { /** the DFS object handle. Not created for directories. */ dfs_obj_t *doh_obj; - struct dfuse_pre_read *doh_readahead; - /** the inode entry for the file */ struct dfuse_inode_entry *doh_ie; @@ -169,17 +170,24 @@ struct dfuse_obj_hdl { /* Pointer to the last returned drc entry */ struct dfuse_readdir_c *doh_rd_nextc; - /* Linear read function, if a file is read from start to end then this normally requires - * a final read request at the end of the file that returns zero bytes. Detect this case - * and when the final read is detected then just return without a round trip. - * Store a flag for this being enabled (starts as true, but many I/O patterns will set it - * to false), the expected position of the next read and a boolean for if EOF has been - * detected. + /* Linear read tracking. If a file is opened and read from start to finish then this is + * called a linear read, linear reads however may or may not read EOF at the end of a file, + * as the reader may be checking the file size. + * + * Detect this case and track it at the file handle level, this is then used in two places: + * For read of EOF it means the round-trip can be avoided. + * On release we can use this flag to apply a setting to the directory inode. + * + * This flag starts enabled and many I/O patterns will disable it. We also store the next + * expected read position and if EOF has been reached. */ + off_t doh_linear_read_pos; bool doh_linear_read; bool doh_linear_read_eof; + bool doh_set_linear_read; + /** True if caching is enabled for this file. */ bool doh_caching; @@ -197,6 +205,10 @@ struct dfuse_obj_hdl { bool doh_kreaddir_finished; bool doh_evict_on_close; + /* the handle is doing readhead for the moment */ + bool doh_readahead_inflight; + + int doh_flags; }; /* Readdir support. @@ -401,11 +413,20 @@ struct dfuse_event { d_iov_t de_iov; d_sg_list_t de_sgl; d_list_t de_list; + + /* Position in a list of events, this will either be off active->open_reads or + * de->de_read_slaves. + */ + d_list_t de_read_list; + /* List of slave events */ + d_list_t de_read_slaves; struct dfuse_eq *de_eqt; union { struct dfuse_obj_hdl *de_oh; struct dfuse_inode_entry *de_ie; + struct read_chunk_data *de_cd; }; + struct dfuse_info *de_di; off_t de_req_position; /**< The file position requested by fuse */ union { size_t de_req_len; @@ -1009,10 +1030,35 @@ struct dfuse_inode_entry { */ ATOMIC bool ie_linear_read; + struct active_inode *ie_active; + /* Entry on the evict list */ d_list_t ie_evict_entry; }; +struct active_inode { + d_list_t chunks; + d_list_t open_reads; + pthread_spinlock_t lock; + ATOMIC uint64_t read_count; + struct dfuse_pre_read *readahead; +}; + +/* Increase active count on inode. This takes a reference and allocates ie->active as required */ +int +active_ie_init(struct dfuse_inode_entry *ie); + +int +active_ie_readahead_init(struct dfuse_inode_entry *ie); + +/* Mark a oh as closing and drop the ref on inode active */ +void +active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh); + +/* Decrease active count on inode, called on error where there is no oh */ +void +active_ie_decref(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie); + /* Flush write-back cache writes to a inode. It does this by waiting for and then releasing an * exclusive lock on the inode. Writes take a shared lock so this will block until all pending * writes are complete. @@ -1108,6 +1154,13 @@ dfuse_compute_inode(struct dfuse_cont *dfs, void dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie); +/* Free any read chunk data for an inode. + * + * Returns true if feature was used. + */ +bool +read_chunk_close(struct dfuse_inode_entry *ie); + /* Metadata caching functions. */ /* Mark the cache as up-to-date from now */ @@ -1171,7 +1224,15 @@ bool dfuse_dcache_get_valid(struct dfuse_inode_entry *ie, double max_age); void -dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh); +dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, struct dfuse_event *ev); + +int +dfuse_pre_read_init(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, + struct dfuse_event **evp); + +void +dfuse_pre_read_abort(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, + struct dfuse_event *ev, int rc); int check_for_uns_ep(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, char *attr, diff --git a/src/client/dfuse/dfuse_core.c b/src/client/dfuse/dfuse_core.c index 0895451f9f8..e0bc4ea3c3f 100644 --- a/src/client/dfuse/dfuse_core.c +++ b/src/client/dfuse/dfuse_core.c @@ -1274,6 +1274,7 @@ dfuse_ie_close(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie) atomic_load_relaxed(&ie->ie_il_count)); D_ASSERTF(atomic_load_relaxed(&ie->ie_open_count) == 0, "open_count is %d", atomic_load_relaxed(&ie->ie_open_count)); + D_ASSERT(!ie->ie_active); if (ie->ie_obj) { rc = dfs_release(ie->ie_obj); @@ -1317,6 +1318,8 @@ dfuse_read_event_size(void *arg, size_t size) ev->de_sgl.sg_nr = 1; } + D_INIT_LIST_HEAD(&ev->de_read_slaves); + rc = daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL); if (rc != -DER_SUCCESS) { return false; diff --git a/src/client/dfuse/dfuse_fuseops.c b/src/client/dfuse/dfuse_fuseops.c index 980e71ac9af..38f98576740 100644 --- a/src/client/dfuse/dfuse_fuseops.c +++ b/src/client/dfuse/dfuse_fuseops.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -88,7 +88,12 @@ dfuse_fuse_init(void *arg, struct fuse_conn_info *conn) DFUSE_TRA_INFO(dfuse_info, "kernel readdir cache support compiled in"); conn->want |= FUSE_CAP_READDIRPLUS; - conn->want |= FUSE_CAP_READDIRPLUS_AUTO; + /* Temporarily force readdir plus for all cases now, which can + * help to save some lookup RPC for some cases. Though this can be + * removed once we use object enumeration to replace the normal key + * enumeration for readdir. XXX + */ + conn->want &= ~FUSE_CAP_READDIRPLUS_AUTO; #ifdef FUSE_CAP_CACHE_SYMLINKS conn->want |= FUSE_CAP_CACHE_SYMLINKS; diff --git a/src/client/dfuse/dfuse_main.c b/src/client/dfuse/dfuse_main.c index d75656121a5..02db62cc4e9 100644 --- a/src/client/dfuse/dfuse_main.c +++ b/src/client/dfuse/dfuse_main.c @@ -166,6 +166,7 @@ dfuse_bg(struct dfuse_info *dfuse_info) * * Should be called from the post_start plugin callback and creates * a filesystem. + * Returns a DAOS error code. * Returns true on success, false on failure. */ int @@ -204,18 +205,17 @@ dfuse_launch_fuse(struct dfuse_info *dfuse_info, struct fuse_args *args) DFUSE_TRA_ERROR(dfuse_info, "Error sending signal to fg: "DF_RC, DP_RC(rc)); /* Blocking */ - if (dfuse_info->di_threaded) - rc = dfuse_loop(dfuse_info); - else - rc = fuse_session_loop(dfuse_info->di_session); - if (rc != 0) + rc = dfuse_loop(dfuse_info); + if (rc != 0) { DHS_ERROR(dfuse_info, rc, "Fuse loop exited"); + rc = daos_errno2der(rc); + } umount: fuse_session_unmount(dfuse_info->di_session); - return daos_errno2der(rc); + return rc; } #define DF_POOL_PREFIX "pool=" @@ -279,7 +279,6 @@ show_help(char *name) " --path= Path to load UNS pool/container data\n" " --sys-name=STR DAOS system name context for servers\n" "\n" - " -S --singlethread Single threaded (deprecated)\n" " -t --thread-count=count Total number of threads to use\n" " -e --eq-count=count Number of event queues to use\n" " -f --foreground Run in foreground\n" @@ -423,7 +422,6 @@ main(int argc, char **argv) {"pool", required_argument, 0, 'p'}, {"container", required_argument, 0, 'c'}, {"sys-name", required_argument, 0, 'G'}, - {"singlethread", no_argument, 0, 'S'}, {"thread-count", required_argument, 0, 't'}, {"eq-count", required_argument, 0, 'e'}, {"foreground", no_argument, 0, 'f'}, @@ -447,13 +445,12 @@ main(int argc, char **argv) if (dfuse_info == NULL) D_GOTO(out_debug, rc = -DER_NOMEM); - dfuse_info->di_threaded = true; dfuse_info->di_caching = true; dfuse_info->di_wb_cache = true; dfuse_info->di_eq_count = 1; while (1) { - c = getopt_long(argc, argv, "Mm:St:o:fhe:v", long_options, NULL); + c = getopt_long(argc, argv, "Mm:t:o:fhe:v", long_options, NULL); if (c == -1) break; @@ -491,13 +488,6 @@ main(int argc, char **argv) case 'P': path = optarg; break; - case 'S': - /* Set it to be single threaded, but allow an extra one - * for the event queue processing - */ - dfuse_info->di_threaded = false; - dfuse_info->di_thread_count = 2; - break; case 'e': dfuse_info->di_eq_count = atoi(optarg); break; @@ -564,7 +554,7 @@ main(int argc, char **argv) * check CPU binding. If bound to a number of cores then launch that number of threads, * if not bound them limit to 16. */ - if (dfuse_info->di_threaded && !have_thread_count) { + if (!have_thread_count) { struct hwloc_topology *hwt; hwloc_const_cpuset_t hw; int total; diff --git a/src/client/dfuse/dfuse_thread.c b/src/client/dfuse/dfuse_thread.c index 4e06e4335ce..d8de0ab3fe9 100644 --- a/src/client/dfuse/dfuse_thread.c +++ b/src/client/dfuse/dfuse_thread.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2020-2023 Intel Corporation. + * (C) Copyright 2020-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -34,10 +34,11 @@ dfuse_do_work(void *arg) struct dfuse_thread *dt = arg; struct dfuse_tm *dtm = dt->dt_tm; int rc; + struct fuse_chan *chan = fuse_clone_chan(dtm->tm_se); while (!fuse_session_exited(dtm->tm_se)) { pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - rc = fuse_session_receive_buf(dtm->tm_se, &dt->dt_fbuf); + rc = fuse_session_receive_buf_chan(dtm->tm_se, &dt->dt_fbuf, chan); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); if (rc == -EINTR) continue; @@ -52,9 +53,11 @@ dfuse_do_work(void *arg) if (atomic_load_relaxed(&dtm->tm_exit)) return NULL; - fuse_session_process_buf(dtm->tm_se, &dt->dt_fbuf); + fuse_session_process_buf_chan(dtm->tm_se, &dt->dt_fbuf, chan); } + fuse_chan_put(chan); + sem_post(&dtm->tm_finish); return NULL; diff --git a/src/client/dfuse/file.c b/src/client/dfuse/file.c new file mode 100644 index 00000000000..439359a422c --- /dev/null +++ b/src/client/dfuse/file.c @@ -0,0 +1,148 @@ +/** + * (C) Copyright 2024 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ + +#include "dfuse_common.h" +#include "dfuse.h" + +/* A lock is needed here, not for ie_open_count which is updated atomcially here and elsewhere + * but to ensure that ie_active is also atomically updated with the reference count. + */ +static pthread_mutex_t alock = PTHREAD_MUTEX_INITIALIZER; + +/* Perhaps combine with dfuse_open_handle_init? */ +int +active_ie_init(struct dfuse_inode_entry *ie) +{ + uint32_t oc; + int rc = -DER_SUCCESS; + + D_MUTEX_LOCK(&alock); + + oc = atomic_fetch_add_relaxed(&ie->ie_open_count, 1); + + DFUSE_TRA_DEBUG(ie, "Addref to %d", oc + 1); + + if (oc != 0) + goto out; + + D_ALLOC_PTR(ie->ie_active); + if (!ie->ie_active) + D_GOTO(out, rc = -DER_NOMEM); + + rc = D_SPIN_INIT(&ie->ie_active->lock, 0); + if (rc != -DER_SUCCESS) { + D_FREE(ie->ie_active); + goto out; + } + D_INIT_LIST_HEAD(&ie->ie_active->chunks); + D_INIT_LIST_HEAD(&ie->ie_active->open_reads); + atomic_init(&ie->ie_active->read_count, 0); + /* Take a reference on the inode to prevent it being released */ + atomic_fetch_add_relaxed(&ie->ie_ref, 1); +out: + D_MUTEX_UNLOCK(&alock); + return rc; +} + +int +active_ie_readahead_init(struct dfuse_inode_entry *ie) +{ + struct active_inode *ie_active = ie->ie_active; + + D_ASSERT(ie_active != NULL); + if (ie_active->readahead != NULL) + return 0; + + D_ALLOC_PTR(ie_active->readahead); + if (ie_active->readahead == NULL) + return -DER_NOMEM; + + D_INIT_LIST_HEAD(&ie_active->readahead->req_list); + atomic_fetch_add_relaxed(&ie->ie_open_count, 1); + + return 0; +} + +static void +ah_free(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie) +{ + struct active_inode *active = ie->ie_active; + + if (active->readahead) { + struct dfuse_event *ev; + + D_ASSERT(active->readahead->complete); + D_ASSERT(d_list_empty(&active->readahead->req_list)); + + ev = active->readahead->dra_ev; + + if (ev) { + daos_event_fini(&ev->de_ev); + d_slab_release(ev->de_eqt->de_pre_read_slab, ev); + } + D_FREE(active->readahead); + } + + D_SPIN_DESTROY(&active->lock); + D_FREE(ie->ie_active); + dfuse_inode_decref(dfuse_info, ie); +} + +void +active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) +{ + uint32_t oc; + + D_MUTEX_LOCK(&alock); + + oc = atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); + D_ASSERTF(oc >= 1, "Invalid decref from %d on %p %p", oc, oh, oh->doh_ie); + + DFUSE_TRA_DEBUG(oh->doh_ie, "Decref to %d", oc - 1); + + /* Leave set_linear_read as false in this case */ + if (oc != 1) + goto out; + + if (read_chunk_close(oh->doh_ie)) + oh->doh_linear_read = true; + + /* Do not set linear read in the case where there's no reads or writes, this could be + * simple open/close calls but it could also be cache use so leave the setting unchanged + * in this case. + */ + if (oh->doh_linear_read) { + if (oh->doh_ie->ie_active->read_count != 0) + oh->doh_set_linear_read = true; + } else { + oh->doh_set_linear_read = true; + } + + ah_free(dfuse_info, oh->doh_ie); +out: + D_MUTEX_UNLOCK(&alock); +} + +void +active_ie_decref(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie) +{ + uint32_t oc; + D_MUTEX_LOCK(&alock); + + oc = atomic_fetch_sub_relaxed(&ie->ie_open_count, 1); + D_ASSERTF(oc >= 1, "Invalid decref from %d on %p", oc, ie); + + DFUSE_TRA_DEBUG(ie, "Decref to %d", oc - 1); + + if (oc != 1) + goto out; + + read_chunk_close(ie); + + ah_free(dfuse_info, ie); +out: + D_MUTEX_UNLOCK(&alock); +} diff --git a/src/client/dfuse/ops/create.c b/src/client/dfuse/ops/create.c index a389c253795..5549d17113b 100644 --- a/src/client/dfuse/ops/create.c +++ b/src/client/dfuse/ops/create.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -190,7 +190,7 @@ dfuse_cb_create(fuse_req_t req, struct dfuse_inode_entry *parent, const char *na /** duplicate the file handle for the fuse handle */ rc = dfs_dup(dfs->dfs_ns, oh->doh_obj, O_RDWR, &ie->ie_obj); if (rc) - D_GOTO(release, rc); + D_GOTO(drop_ie, rc); oh->doh_writeable = true; @@ -217,14 +217,18 @@ dfuse_cb_create(fuse_req_t req, struct dfuse_inode_entry *parent, const char *na dfuse_compute_inode(dfs, &ie->ie_oid, &ie->ie_stat.st_ino); - atomic_fetch_add_relaxed(&ie->ie_open_count, 1); + rc = active_ie_init(ie); + if (rc != -DER_SUCCESS) + goto drop_oh; /* Return the new inode data, and keep the parent ref */ dfuse_reply_entry(dfuse_info, ie, &fi_out, true, req); return; -release: +drop_oh: dfs_release(oh->doh_obj); +drop_ie: + dfs_release(ie->ie_obj); err: DFUSE_REPLY_ERR_RAW(parent, req, rc); dfuse_oh_free(dfuse_info, oh); diff --git a/src/client/dfuse/ops/lookup.c b/src/client/dfuse/ops/lookup.c index 8e168102cb4..ac095eed4bc 100644 --- a/src/client/dfuse/ops/lookup.c +++ b/src/client/dfuse/ops/lookup.c @@ -88,6 +88,18 @@ dfuse_reply_entry(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, D_GOTO(out_err, rc = EIO); } + /* Make the inode active for the create case */ + if (ie->ie_active) { + D_ASSERT(atomic_load_relaxed(&ie->ie_open_count) == 1); + active_ie_decref(dfuse_info, ie); + rc = active_ie_init(inode); + if (rc != -DER_SUCCESS) { + atomic_fetch_sub_relaxed(&ie->ie_ref, 1); + dfuse_ie_close(dfuse_info, ie); + D_GOTO(out_err, rc); + } + } + DFUSE_TRA_DEBUG(inode, "Maybe updating parent inode %#lx dfs_ino %#lx", entry.ino, ie->ie_dfs->dfs_ino); diff --git a/src/client/dfuse/ops/open.c b/src/client/dfuse/ops/open.c index 46a49cf0767..4c0ca184fb3 100644 --- a/src/client/dfuse/ops/open.c +++ b/src/client/dfuse/ops/open.c @@ -14,9 +14,9 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) struct dfuse_inode_entry *ie; struct dfuse_obj_hdl *oh; struct fuse_file_info fi_out = {0}; + struct dfuse_event *ev; + bool preread = false; int rc; - bool prefetch = false; - bool preread = false; int flags; ie = dfuse_inode_lookup_nf(dfuse_info, ino); @@ -45,18 +45,18 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) LOG_FLAGS(ie, fi->flags); flags = fi->flags; + oh->doh_flags = flags; if (flags & O_APPEND) flags &= ~O_APPEND; - /** duplicate the file handle for the fuse handle */ - rc = dfs_dup(ie->ie_dfs->dfs_ns, ie->ie_obj, flags, &oh->doh_obj); - if (rc) - D_GOTO(err, rc); - if ((fi->flags & O_ACCMODE) != O_RDONLY) oh->doh_writeable = true; + rc = active_ie_init(ie); + if (rc) + goto err; + if (ie->ie_dfs->dfc_data_timeout != 0) { if (fi->flags & O_DIRECT) fi_out.direct_io = 1; @@ -67,12 +67,36 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) * which pre-existed in the container. */ - if (atomic_load_relaxed(&ie->ie_open_count) > 0 || + /* TODO: This probably wants reflowing to not reference ie_open_count */ + if (atomic_load_relaxed(&ie->ie_open_count) > 1 || ((ie->ie_dcache_last_update.tv_sec != 0) && dfuse_dcache_get_valid(ie, ie->ie_dfs->dfc_data_timeout))) { fi_out.keep_cache = 1; } else { - prefetch = true; + D_SPIN_LOCK(&ie->ie_active->lock); + /** + * size > 4M no pre-read + * 1M <= size <= 4M depend on other files under the directory. + * size <= 1M pre-read in any case. + */ + if ((oh->doh_parent_dir && + atomic_load_relaxed(&oh->doh_parent_dir->ie_linear_read) && + ie->ie_stat.st_size > 0 && + ie->ie_stat.st_size <= DFUSE_MAX_PRE_READ) || + (ie->ie_stat.st_size > 0 && + ie->ie_stat.st_size <= DFUSE_MAX_PRE_READ_ONCE)) { + preread = true; + /* Add the read extent to the list to make sure the following read + * will check the readahead list first. + */ + rc = dfuse_pre_read_init(dfuse_info, ie, &ev); + if (rc != 0) { + D_SPIN_UNLOCK(&ie->ie_active->lock); + D_GOTO(decref, rc); + } + oh->doh_readahead_inflight = 1; + } + D_SPIN_UNLOCK(&ie->ie_active->lock); } } else if (ie->ie_dfs->dfc_data_otoc) { /* Open to close caching, this allows the use of shared mmap */ @@ -92,7 +116,6 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) oh->doh_caching = true; fi_out.fh = (uint64_t)oh; - /* * dfs_dup() just locally duplicates the file handle. If we have * O_TRUNC flag, we need to truncate the file manually. @@ -100,45 +123,39 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) if (fi->flags & O_TRUNC) { rc = dfs_punch(ie->ie_dfs->dfs_ns, ie->ie_obj, 0, DFS_MAX_FSIZE); if (rc) - D_GOTO(err, rc); + D_GOTO(ie_decref, rc); dfuse_dcache_evict(oh->doh_ie); } - atomic_fetch_add_relaxed(&ie->ie_open_count, 1); - - /* Enable this for files up to the max read size. */ - if (prefetch && oh->doh_parent_dir && - atomic_load_relaxed(&oh->doh_parent_dir->ie_linear_read) && ie->ie_stat.st_size > 0 && - ie->ie_stat.st_size <= DFUSE_MAX_PRE_READ) { - D_ALLOC_PTR(oh->doh_readahead); - if (oh->doh_readahead) { - D_MUTEX_INIT(&oh->doh_readahead->dra_lock, 0); - D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); - preread = true; - } - } - DFUSE_REPLY_OPEN(oh, req, &fi_out); /* No reference is held on oh here but if preread is true then a lock is held which prevents * release from completing which also holds open the inode. */ if (preread) - dfuse_pre_read(dfuse_info, oh); + dfuse_pre_read(dfuse_info, oh, ev); return; +ie_decref: + dfuse_pre_read_abort(dfuse_info, oh, ev, rc); +decref: + active_ie_decref(dfuse_info, ie); err: dfuse_oh_free(dfuse_info, oh); DFUSE_REPLY_ERR_RAW(ie, req, rc); } +/* Release a file handle, called after close() by an application. + * + * Can be invoked concurrently on the same inode. + */ void dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) { struct dfuse_info *dfuse_info = fuse_req_userdata(req); struct dfuse_obj_hdl *oh = (struct dfuse_obj_hdl *)fi->fh; struct dfuse_inode_entry *ie = NULL; - int rc; + int rc = 0; uint32_t il_calls; /* Perform the opposite of what the ioctl call does, always change the open handle count @@ -149,26 +166,6 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) DFUSE_IE_WFLUSH(oh->doh_ie); - if (oh->doh_readahead) { - struct dfuse_event *ev; - - /* Grab this lock first to ensure that the read cb has been completed. The - * callback might register an error and release ev so do not read it's value - * until after this has completed. - */ - D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); - - ev = oh->doh_readahead->dra_ev; - - D_MUTEX_DESTROY(&oh->doh_readahead->dra_lock); - if (ev) { - daos_event_fini(&ev->de_ev); - d_slab_release(ev->de_eqt->de_pre_read_slab, ev); - } - D_FREE(oh->doh_readahead); - } - /* If the file was read from then set the data cache time for future use, however if the * file was written to then evict the metadata cache. * The problem here is that if the file was written to then the contents will be in the @@ -206,14 +203,28 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) if (il_calls != 0) { atomic_fetch_sub_relaxed(&oh->doh_ie->ie_il_count, 1); } - atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); + + /* Wait inflight readahead RPC finished before release */ + if (oh->doh_ie->ie_active != NULL) { +wait_readahead: + D_SPIN_LOCK(&oh->doh_ie->ie_active->lock); + if (oh->doh_readahead_inflight) { + D_SPIN_UNLOCK(&oh->doh_ie->ie_active->lock); + goto wait_readahead; + } + D_SPIN_UNLOCK(&oh->doh_ie->ie_active->lock); + } if (oh->doh_evict_on_close) { ie = oh->doh_ie; atomic_fetch_add_relaxed(&ie->ie_ref, 1); } - rc = dfs_release(oh->doh_obj); + active_oh_decref(dfuse_info, oh); + + if (oh->doh_obj != NULL) + rc = dfs_release(oh->doh_obj); + if (rc == 0) { DFUSE_REPLY_ZERO_OH(oh, req); } else { @@ -221,25 +232,13 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) oh->doh_ie = NULL; } if (oh->doh_parent_dir) { - bool use_linear_read = false; - bool set_linear_read = true; - - if (oh->doh_linear_read) { - /* If the file was not read from then this could indicate a cached read - * so do not disable pre-read for the directory. - */ - if (!oh->doh_linear_read_eof) - set_linear_read = false; - use_linear_read = true; - } - - if (set_linear_read) { + if (oh->doh_set_linear_read) { DFUSE_TRA_DEBUG(oh->doh_parent_dir, "Setting linear_read to %d", - use_linear_read); + oh->doh_linear_read); - atomic_store_relaxed(&oh->doh_parent_dir->ie_linear_read, use_linear_read); + atomic_store_relaxed(&oh->doh_parent_dir->ie_linear_read, + oh->doh_linear_read); } - dfuse_inode_decref(dfuse_info, oh->doh_parent_dir); } if (ie) { diff --git a/src/client/dfuse/ops/opendir.c b/src/client/dfuse/ops/opendir.c index 091d4102c1c..1a5f488f326 100644 --- a/src/client/dfuse/ops/opendir.c +++ b/src/client/dfuse/ops/opendir.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -19,6 +19,10 @@ dfuse_cb_opendir(fuse_req_t req, struct dfuse_inode_entry *ie, struct fuse_file_ if (!oh) D_GOTO(err, rc = ENOMEM); + rc = active_ie_init(ie); + if (rc != -DER_SUCCESS) + D_GOTO(free, rc = daos_der2errno(rc)); + DFUSE_TRA_UP(oh, ie, "open handle"); dfuse_open_handle_init(dfuse_info, oh, ie); @@ -31,16 +35,21 @@ dfuse_cb_opendir(fuse_req_t req, struct dfuse_inode_entry *ie, struct fuse_file_ if (ie->ie_dfs->dfc_dentry_timeout > 0) { fi_out.cache_readdir = 1; - if (dfuse_dcache_get_valid(ie, ie->ie_dfs->dfc_dentry_timeout)) + /** + * Set keep_check to 1 to avoid the dir cache being invalidated during + * concurrent opendir. + **/ + if ((ie->ie_dcache_last_update.tv_sec == 0 && + atomic_load_relaxed(&dfuse_info->di_fh_count) > 1) || + dfuse_dcache_get_valid(ie, ie->ie_dfs->dfc_dentry_timeout)) fi_out.keep_cache = 1; } - atomic_fetch_add_relaxed(&ie->ie_open_count, 1); - DFUSE_REPLY_OPEN_DIR(oh, req, &fi_out); return; -err: +free: D_FREE(oh); +err: DFUSE_REPLY_ERR_RAW(ie, req, rc); } @@ -57,7 +66,8 @@ dfuse_cb_releasedir(fuse_req_t req, struct dfuse_inode_entry *ino, struct fuse_f if (atomic_load_relaxed(&oh->doh_il_calls) != 0) atomic_fetch_sub_relaxed(&oh->doh_ie->ie_il_count, 1); - atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); + + active_oh_decref(dfuse_info, oh); DFUSE_TRA_DEBUG(oh, "Kernel cache flags invalid %d started %d finished %d", oh->doh_kreaddir_invalid, oh->doh_kreaddir_started, diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 1edf0a959ee..1deb4ee60eb 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -8,7 +8,7 @@ #include "dfuse.h" static void -dfuse_cb_read_complete(struct dfuse_event *ev) +cb_read_helper(struct dfuse_event *ev, void *buff) { struct dfuse_obj_hdl *oh = ev->de_oh; @@ -22,143 +22,494 @@ dfuse_cb_read_complete(struct dfuse_event *ev) oh->doh_linear_read = false; } else { oh->doh_linear_read_pos = ev->de_req_position + ev->de_len; - if (ev->de_len < ev->de_req_len) { + if (ev->de_len < ev->de_req_len) oh->doh_linear_read_eof = true; - } } } - if (ev->de_len == 0) { - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, - ev->de_req_position + ev->de_req_len - 1); - - DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); - D_GOTO(release, 0); - } - - if (ev->de_len == ev->de_req_len) + if (ev->de_len == ev->de_req_len) { DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", ev->de_req_position, ev->de_req_position + ev->de_req_len - 1); - else - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", - ev->de_req_position, ev->de_req_position + ev->de_len - 1, - ev->de_req_position + ev->de_len, - ev->de_req_position + ev->de_req_len - 1); + } else { + if (ev->de_len == 0) + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, + ev->de_req_position + ev->de_req_len - 1); + else + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", + ev->de_req_position, ev->de_req_position + ev->de_len - 1, + ev->de_req_position + ev->de_len, + ev->de_req_position + ev->de_req_len - 1); + } - DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); + DFUSE_REPLY_BUFQ(oh, ev->de_req, buff, ev->de_len); release: daos_event_fini(&ev->de_ev); +} + +/* read slave complete */ +static void +dfuse_cb_slave_read_complete(struct dfuse_event *ev) +{ + struct dfuse_event *evs, *evn; + d_list_t cblist; + char *buf = ev->de_iov.iov_buf; + + D_INIT_LIST_HEAD(&cblist); + D_SPIN_LOCK(&ev->de_oh->doh_ie->ie_active->lock); + d_list_del(&ev->de_read_list); + d_list_for_each_entry_safe(evs, evn, &ev->de_read_slaves, de_read_list) + d_list_move(&evs->de_read_list, &cblist); + D_SPIN_UNLOCK(&ev->de_oh->doh_ie->ie_active->lock); + + d_list_for_each_entry(evs, &cblist, de_read_list) { + DFUSE_TRA_DEBUG(ev->de_oh, "concurrent network read %p", evs->de_oh); + d_list_del(&evs->de_read_list); + evs->de_len = min(ev->de_len, evs->de_req_len); + evs->de_ev.ev_error = ev->de_ev.ev_error; + D_ASSERT(evs->de_req_position >= ev->de_req_position); + cb_read_helper(evs, buf + (evs->de_req_position - ev->de_req_position)); + d_slab_restock(evs->de_eqt->de_read_slab); + d_slab_release(evs->de_eqt->de_read_slab, evs); + } +} + +static void +dfuse_cb_read_complete(struct dfuse_event *ev) +{ + char *buf = ev->de_iov.iov_buf; + + dfuse_cb_slave_read_complete(ev); + cb_read_helper(ev, buf); d_slab_restock(ev->de_eqt->de_read_slab); d_slab_release(ev->de_eqt->de_read_slab, ev); } #define K128 (1024 * 128) +struct read_req { + d_list_t list; + fuse_req_t req; + size_t len; + off_t position; + struct dfuse_obj_hdl *oh; +}; + +static void +readahead_actual_reply(struct active_inode *active, struct read_req *rr) +{ + size_t reply_len; + + if (rr->position + rr->len >= active->readahead->dra_ev->de_readahead_len) { + rr->oh->doh_linear_read_eof = true; + } + + /* At this point there is a buffer of known length that contains the data, and a read + * request. + * If the attempted read is bigger than the data then it will be truncated. + * It the attempted read is smaller than the buffer it will be met in full. + */ + + if (rr->position + rr->len < active->readahead->dra_ev->de_readahead_len) { + reply_len = rr->len; + DFUSE_TRA_DEBUG(rr->oh, "%#zx-%#zx read", rr->position, + rr->position + reply_len - 1); + } else { + /* The read will be truncated */ + reply_len = active->readahead->dra_ev->de_readahead_len - rr->position; + DFUSE_TRA_DEBUG(rr->oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", + rr->position, rr->position + reply_len - 1, + rr->position + reply_len, rr->position + rr->len - 1); + } + + DFUSE_IE_STAT_ADD(rr->oh->doh_ie, DS_PRE_READ); + DFUSE_REPLY_BUFQ(rr->oh, rr->req, active->readahead->dra_ev->de_iov.iov_buf + rr->position, + reply_len); +} + static bool dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) { - size_t reply_len; + struct active_inode *active = oh->doh_ie->ie_active; - if (oh->doh_readahead->dra_rc) { - DFUSE_REPLY_ERR_RAW(oh, req, oh->doh_readahead->dra_rc); + if (active->readahead->dra_rc) { + DFUSE_REPLY_ERR_RAW(oh, req, active->readahead->dra_rc); return true; } - if (!oh->doh_linear_read || oh->doh_readahead->dra_ev == NULL) { - DFUSE_TRA_DEBUG(oh, "Pre read disabled"); + if (!oh->doh_linear_read || active->readahead->dra_ev == NULL || + !active->readahead->complete) { + DFUSE_TRA_DEBUG(oh, "Pre read disabled or not completed"); return false; } if (((position % K128) == 0) && ((len % K128) == 0)) { - DFUSE_TRA_INFO(oh, "allowing out-of-order pre read"); + DFUSE_TRA_DEBUG(oh, "allowing out-of-order pre read"); /* Do not closely track the read position in this case, just the maximum, * later checks will determine if the file is read to the end. */ oh->doh_linear_read_pos = max(oh->doh_linear_read_pos, position + len); - } else if (oh->doh_linear_read_pos != position) { - DFUSE_TRA_DEBUG(oh, "disabling pre read"); - daos_event_fini(&oh->doh_readahead->dra_ev->de_ev); - d_slab_release(oh->doh_readahead->dra_ev->de_eqt->de_pre_read_slab, - oh->doh_readahead->dra_ev); - oh->doh_readahead->dra_ev = NULL; + } else if ((position + len) <= active->readahead->dra_ev->de_readahead_len) { + DFUSE_TRA_DEBUG(oh, "disabling pre read %llu pos %zu != %zu", + (unsigned long long)oh->doh_ie->ie_stat.st_ino, + oh->doh_linear_read_pos, position); return false; } else { oh->doh_linear_read_pos = position + len; } - if (position + len >= oh->doh_readahead->dra_ev->de_readahead_len) { - oh->doh_linear_read_eof = true; + struct read_req rr; + + rr.req = req; + rr.len = len; + rr.position = position; + rr.oh = oh; + + readahead_actual_reply(active, &rr); + return true; +} + +static struct dfuse_eq * +pick_eqt(struct dfuse_info *dfuse_info) +{ + uint64_t eqt_idx; + + eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); + return &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; +} + +/* Chunk read and coalescing + * + * This code attempts to predict application and kernel I/O patterns and preemptively read file + * data ahead of when it's requested. + * + * For some kernels read I/O size is limited to 128k when using the page cache or 1Mb when using + * direct I/O. To get around the performance impact of them detect when well aligned 128k reads + * are received and read an entire buffers worth, then for future requests the data should already + * be in cache. + * + * This code is entered when caching is enabled and reads are correctly size/aligned and not in the + * last CHUNK_SIZE of a file. When open then the inode contains a single read_chunk_core pointer + * and this contains a list of read_chunk_data entries, one for each bucket. Buckets where all + * slots have been requested are remove from the list and closed when the last request is completed. + * + * TODO: Currently there is no code to remove partially read buckets from the list so reading + * one slot every chunk would leave the entire file contents in memory until close and mean long + * list traversal times. + */ + +#define CHUNK_SIZE (1024 * 1024) + +struct read_chunk_data { + struct dfuse_event *ev; + struct active_inode *ia; + fuse_req_t reqs[8]; + struct dfuse_obj_hdl *ohs[8]; + d_list_t list; + uint64_t bucket; + struct dfuse_eq *eqt; + int rc; + int entered; + ATOMIC int exited; + bool exiting; + bool complete; +}; + +static void +chunk_free(struct read_chunk_data *cd) +{ + d_list_del(&cd->list); + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); +} + +/* Called when the last open file handle on a inode is closed. This needs to free everything which + * is complete and for anything that isn't flag it for deletion in the callback. No locking is + * needed here as this is only called as active is being released. + * + * Returns true if the feature was used. + */ +bool +read_chunk_close(struct dfuse_inode_entry *ie) +{ + struct read_chunk_data *cd, *cdn; + bool rcb = false; + + D_SPIN_LOCK(&ie->ie_active->lock); + if (d_list_empty(&ie->ie_active->chunks)) + goto out; + + rcb = true; + + d_list_for_each_entry_safe(cd, cdn, &ie->ie_active->chunks, list) { + if (cd->complete) { + chunk_free(cd); + } else { + cd->exiting = true; + } } +out: + D_SPIN_UNLOCK(&ie->ie_active->lock); + return rcb; +} - /* At this point there is a buffer of known length that contains the data, and a read - * request. - * If the attempted read is bigger than the data then it will be truncated. - * It the attempted read is smaller than the buffer it will be met in full. - */ +static void +chunk_cb(struct dfuse_event *ev) +{ + struct read_chunk_data *cd = ev->de_cd; + struct active_inode *ia = cd->ia; + fuse_req_t req; + bool done = false; - if (position + len < oh->doh_readahead->dra_ev->de_readahead_len) { - reply_len = len; - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, position + reply_len - 1); - } else { - /* The read will be truncated */ - reply_len = oh->doh_readahead->dra_ev->de_readahead_len - position; - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", position, - position + reply_len - 1, position + reply_len, position + len - 1); + cd->rc = ev->de_ev.ev_error; + + if (cd->rc == 0 && (ev->de_len != CHUNK_SIZE)) { + cd->rc = EIO; + DS_WARN(cd->rc, "Unexpected short read bucket %ld (%#zx) expected %i got %zi", + cd->bucket, cd->bucket * CHUNK_SIZE, CHUNK_SIZE, ev->de_len); } - DFUSE_IE_STAT_ADD(oh->doh_ie, DS_PRE_READ); - DFUSE_REPLY_BUFQ(oh, req, oh->doh_readahead->dra_ev->de_iov.iov_buf + position, reply_len); + daos_event_fini(&ev->de_ev); + + do { + int i; + req = 0; + + D_SPIN_LOCK(&ia->lock); + + if (cd->exiting) { + chunk_free(cd); + D_SPIN_UNLOCK(&ia->lock); + return; + } + + cd->complete = true; + for (i = 0; i < 8; i++) { + if (cd->reqs[i]) { + req = cd->reqs[i]; + cd->reqs[i] = 0; + break; + } + } + + D_SPIN_UNLOCK(&ia->lock); + + if (req) { + size_t position = (cd->bucket * CHUNK_SIZE) + (i * K128); + + if (cd->rc != 0) { + DFUSE_REPLY_ERR_RAW(cd->ohs[i], req, cd->rc); + } else { + DFUSE_TRA_DEBUG(cd->ohs[i], "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(cd->ohs[i], req, ev->de_iov.iov_buf + (i * K128), + K128); + } + + if (atomic_fetch_add_relaxed(&cd->exited, 1) == 7) + done = true; + } + } while (req && !done); + + if (done) { + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); + } +} + +/* Submut a read to dfs. + * + * Returns true on success. + */ +static bool +chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd, int slot) +{ + struct dfuse_info *dfuse_info = fuse_req_userdata(req); + struct dfuse_inode_entry *ie = oh->doh_ie; + struct dfuse_event *ev; + struct dfuse_eq *eqt; + int rc; + daos_off_t position = cd->bucket * CHUNK_SIZE; + + eqt = pick_eqt(dfuse_info); + + ev = d_slab_acquire(eqt->de_read_slab); + if (ev == NULL) { + cd->rc = ENOMEM; + return false; + } + + ev->de_iov.iov_len = CHUNK_SIZE; + ev->de_req = req; + ev->de_cd = cd; + ev->de_sgl.sg_nr = 1; + ev->de_len = 0; + ev->de_complete_cb = chunk_cb; + + cd->ev = ev; + cd->eqt = eqt; + cd->reqs[slot] = req; + cd->ohs[slot] = oh; + + rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, position, &ev->de_len, + &ev->de_ev); + if (rc != 0) + goto err; + + /* Send a message to the async thread to wake it up and poll for events */ + sem_post(&eqt->de_sem); + + /* Now ensure there are more descriptors for the next request */ + d_slab_restock(eqt->de_read_slab); + return true; + +err: + daos_event_fini(&ev->de_ev); + d_slab_release(eqt->de_read_slab, ev); + cd->rc = rc; + return false; +} + +/* Try and do a bulk read. + * + * Returns true if it was able to handle the read. + */ +static bool +chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) +{ + struct dfuse_inode_entry *ie = oh->doh_ie; + struct read_chunk_data *cd; + off_t last; + uint64_t bucket; + int slot; + bool submit = false; + bool rcb; + + if (len != K128) + return false; + + if ((position % K128) != 0) + return false; + + last = D_ALIGNUP(position + len - 1, CHUNK_SIZE); + + if (last > oh->doh_ie->ie_stat.st_size) + return false; + + bucket = D_ALIGNUP(position + len, CHUNK_SIZE); + bucket = (bucket / CHUNK_SIZE) - 1; + + slot = (position / K128) % 8; + + DFUSE_TRA_DEBUG(oh, "read bucket %#zx-%#zx last %#zx size %#zx bucket %ld slot %d", + position, position + len - 1, last, ie->ie_stat.st_size, bucket, slot); + + D_SPIN_LOCK(&ie->ie_active->lock); + + d_list_for_each_entry(cd, &ie->ie_active->chunks, list) + if (cd->bucket == bucket) { + /* Remove from list to re-add again later. */ + d_list_del(&cd->list); + goto found; + } + + D_ALLOC_PTR(cd); + if (cd == NULL) + goto err; + + cd->ia = ie->ie_active; + cd->bucket = bucket; + submit = true; + +found: + + if (++cd->entered < 8) { + /* Put on front of list for efficient searching */ + d_list_add(&cd->list, &ie->ie_active->chunks); + } + + D_SPIN_UNLOCK(&ie->ie_active->lock); + + if (submit) { + DFUSE_TRA_DEBUG(oh, "submit for bucket %ld[%d]", bucket, slot); + rcb = chunk_fetch(req, oh, cd, slot); + } else { + struct dfuse_event *ev = NULL; + + /* Now check if this read request is complete or not yet, if it isn't then just + * save req in the right slot however if it is then reply here. After the call to + * DFUSE_REPLY_* then no reference is held on either the open file or the inode so + * at that point they could be closed. + */ + rcb = true; + + D_SPIN_LOCK(&ie->ie_active->lock); + if (cd->complete) { + ev = cd->ev; + } else { + cd->reqs[slot] = req; + cd->ohs[slot] = oh; + } + D_SPIN_UNLOCK(&ie->ie_active->lock); + + if (ev) { + if (cd->rc != 0) { + /* Don't pass fuse an error here, rather return false and the read + * will be tried over the network. + */ + rcb = false; + } else { + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128); + } + if (atomic_fetch_add_relaxed(&cd->exited, 1) == 7) { + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); + } + } + } + + return rcb; + +err: + D_SPIN_UNLOCK(&ie->ie_active->lock); + return false; } void dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct fuse_file_info *fi) { struct dfuse_obj_hdl *oh = (struct dfuse_obj_hdl *)fi->fh; + struct active_inode *active = oh->doh_ie->ie_active; struct dfuse_info *dfuse_info = fuse_req_userdata(req); bool mock_read = false; - struct dfuse_eq *eqt; + struct dfuse_eq *eqt = NULL; int rc; struct dfuse_event *ev; - uint64_t eqt_idx; DFUSE_IE_STAT_ADD(oh->doh_ie, DS_READ); + atomic_fetch_add_relaxed(&oh->doh_ie->ie_active->read_count, 1); + if (oh->doh_linear_read_eof && position == oh->doh_linear_read_pos) { DFUSE_TRA_DEBUG(oh, "Returning EOF early without round trip %#zx", position); oh->doh_linear_read_eof = false; - oh->doh_linear_read = false; - if (oh->doh_readahead) { - D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); - ev = oh->doh_readahead->dra_ev; - - oh->doh_readahead->dra_ev = NULL; - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); - - if (ev) { - daos_event_fini(&ev->de_ev); - d_slab_release(ev->de_eqt->de_pre_read_slab, ev); - DFUSE_IE_STAT_ADD(oh->doh_ie, DS_PRE_READ); - } - } + if (active->readahead) + DFUSE_IE_STAT_ADD(oh->doh_ie, DS_PRE_READ); DFUSE_REPLY_BUFQ(oh, req, NULL, 0); return; } - if (oh->doh_readahead) { - bool replied; - - D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); - replied = dfuse_readahead_reply(req, len, position, oh); - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + /* Then check if the request can be filled by readahead */ + if (active->readahead && dfuse_readahead_reply(req, len, position, oh)) + return; - if (replied) - return; - } + if (chunk_read(req, len, position, oh)) + return; - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); - eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; + eqt = pick_eqt(dfuse_info); ev = d_slab_acquire(eqt->de_read_slab); if (ev == NULL) @@ -182,6 +533,8 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct ev->de_iov.iov_buf_len); } + if (position + len > oh->doh_ie->ie_stat.st_size) + len = oh->doh_ie->ie_stat.st_size - position; ev->de_iov.iov_len = len; ev->de_req = req; ev->de_sgl.sg_nr = 1; @@ -199,12 +552,42 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct DFUSE_IE_WFLUSH(oh->doh_ie); + /* Check for open matching reads, if there are multiple readers of the same file offset + * then chain future requests off the first one to avoid extra network round-trips. This + * can and does happen even with caching enabled if there are multiple client processes. + */ + D_SPIN_LOCK(&active->lock); + { + struct dfuse_event *evc; + + d_list_for_each_entry(evc, &active->open_reads, de_read_list) { + if (ev->de_req_position >= evc->de_req_position && + ev->de_req_len <= evc->de_req_len) { + d_list_add(&ev->de_read_list, &evc->de_read_slaves); + D_SPIN_UNLOCK(&active->lock); + goto out; + } + } + d_list_add_tail(&ev->de_read_list, &active->open_reads); + } + D_SPIN_UNLOCK(&active->lock); + + if (oh->doh_obj == NULL) { + /** duplicate the file handle for the fuse handle */ + rc = dfs_dup(oh->doh_dfs, oh->doh_ie->ie_obj, oh->doh_flags, &oh->doh_obj); + if (rc) { + ev->de_ev.ev_error = rc; + dfuse_cb_read_complete(ev); + return; + } + } + rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_len, &ev->de_ev); if (rc != 0) { D_GOTO(err, rc); return; } - +out: /* Send a message to the async thread to wake it up and poll for events */ sem_post(&eqt->de_sem); @@ -221,17 +604,26 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct } static void -dfuse_cb_pre_read_complete(struct dfuse_event *ev) +pre_read_mark_done(struct active_inode *active) { - struct dfuse_obj_hdl *oh = ev->de_oh; + D_SPIN_LOCK(&active->lock); + active->readahead->complete = true; + D_SPIN_UNLOCK(&active->lock); +} - oh->doh_readahead->dra_rc = ev->de_ev.ev_error; +static void +dfuse_cb_pre_read_complete(struct dfuse_event *ev) +{ + struct dfuse_info *dfuse_info = ev->de_di; + struct dfuse_inode_entry *ie = ev->de_oh->doh_ie; + struct active_inode *active = ie->ie_active; + active->readahead->dra_rc = ev->de_ev.ev_error; if (ev->de_ev.ev_error != 0) { - oh->doh_readahead->dra_rc = ev->de_ev.ev_error; + active->readahead->dra_rc = ev->de_ev.ev_error; daos_event_fini(&ev->de_ev); d_slab_release(ev->de_eqt->de_pre_read_slab, ev); - oh->doh_readahead->dra_ev = NULL; + active->readahead->dra_ev = NULL; } /* If the length is not as expected then the file has been modified since the last stat so @@ -241,44 +633,94 @@ dfuse_cb_pre_read_complete(struct dfuse_event *ev) if (ev->de_len != ev->de_readahead_len) { daos_event_fini(&ev->de_ev); d_slab_release(ev->de_eqt->de_pre_read_slab, ev); - oh->doh_readahead->dra_ev = NULL; + active->readahead->dra_ev = NULL; } + pre_read_mark_done(active); + ev->de_oh->doh_readahead_inflight = 0; - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + dfuse_cb_slave_read_complete(ev); + /* Drop the extra ref on active, the file could be closed before this read completes */ + active_ie_decref(dfuse_info, ie); } -void -dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) +int +dfuse_pre_read_init(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, + struct dfuse_event **evp) { - struct dfuse_eq *eqt; - int rc; + struct active_inode *active = ie->ie_active; + struct dfuse_eq *eqt; struct dfuse_event *ev; - uint64_t eqt_idx; - size_t len = oh->doh_ie->ie_stat.st_size; - - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); - eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; + size_t len = ie->ie_stat.st_size; + eqt = pick_eqt(dfuse_info); ev = d_slab_acquire(eqt->de_pre_read_slab); if (ev == NULL) - D_GOTO(err, rc = ENOMEM); + return ENOMEM; ev->de_iov.iov_len = len; ev->de_req = 0; ev->de_sgl.sg_nr = 1; - ev->de_oh = oh; ev->de_readahead_len = len; ev->de_req_position = 0; ev->de_complete_cb = dfuse_cb_pre_read_complete; - oh->doh_readahead->dra_ev = ev; - rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, 0, &ev->de_len, &ev->de_ev); - if (rc != 0) { - D_GOTO(err, rc); - return; + if (active->readahead == NULL) { + int rc; + + rc = active_ie_readahead_init(ie); + if (rc != 0) + return rc; + } + active->readahead->dra_ev = ev; + + /* NB: the inode_entry has been locked by ie_read_lock */ + d_list_add_tail(&ev->de_read_list, &active->open_reads); + + *evp = ev; + return 0; +} + +void +dfuse_pre_read_abort(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, + struct dfuse_event *ev, int rc) +{ + struct dfuse_eq *eqt = pick_eqt(dfuse_info); + struct active_inode *active = oh->doh_ie->ie_active; + + oh->doh_readahead_inflight = 0; + active->readahead->dra_rc = rc; + if (ev) { + daos_event_fini(&ev->de_ev); + d_slab_release(eqt->de_pre_read_slab, ev); + active->readahead->dra_ev = NULL; + } + active_ie_decref(dfuse_info, oh->doh_ie); + pre_read_mark_done(active); +} + +void +dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, struct dfuse_event *ev) +{ + struct dfuse_eq *eqt; + int rc; + + eqt = pick_eqt(dfuse_info); + + if (oh->doh_obj == NULL) { + /** duplicate the file handle for the fuse handle */ + rc = dfs_dup(oh->doh_dfs, oh->doh_ie->ie_obj, oh->doh_flags, &oh->doh_obj); + if (rc) + D_GOTO(err, rc); } + ev->de_oh = oh; + ev->de_di = dfuse_info; + + rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, 0, &ev->de_len, &ev->de_ev); + if (rc != 0) + goto err; + /* Send a message to the async thread to wake it up and poll for events */ sem_post(&eqt->de_sem); @@ -287,11 +729,5 @@ dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) return; err: - oh->doh_readahead->dra_rc = rc; - if (ev) { - daos_event_fini(&ev->de_ev); - d_slab_release(eqt->de_pre_read_slab, ev); - oh->doh_readahead->dra_ev = NULL; - } - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + dfuse_pre_read_abort(dfuse_info, oh, ev, rc); } diff --git a/src/client/dfuse/ops/readdir.c b/src/client/dfuse/ops/readdir.c index 10ea454fd42..c38007c84ae 100644 --- a/src/client/dfuse/ops/readdir.c +++ b/src/client/dfuse/ops/readdir.c @@ -502,6 +502,7 @@ dfuse_do_readdir(struct dfuse_info *dfuse_info, fuse_req_t req, struct dfuse_obj } set_entry_params(&entry, ie); + dfuse_mcache_set_time(ie); written = FADP(req, &reply_buff[buff_offset], size - buff_offset, drc->drc_name, &entry, drc->drc_next_offset); @@ -733,6 +734,7 @@ dfuse_do_readdir(struct dfuse_info *dfuse_info, fuse_req_t req, struct dfuse_obj } set_entry_params(&entry, ie); + dfuse_mcache_set_time(ie); written = FADP(req, &reply_buff[buff_offset], size - buff_offset, dre->dre_name, &entry, dre->dre_next_offset); diff --git a/src/client/dfuse/ops/write.c b/src/client/dfuse/ops/write.c index 943f9b75e78..f0e89bf9ecf 100644 --- a/src/client/dfuse/ops/write.c +++ b/src/client/dfuse/ops/write.c @@ -101,6 +101,13 @@ dfuse_cb_write(fuse_req_t req, fuse_ino_t ino, struct fuse_bufvec *bufv, off_t p if (len + position > oh->doh_ie->ie_stat.st_size) oh->doh_ie->ie_stat.st_size = len + position; + if (oh->doh_obj == NULL) { + /** duplicate the file handle for the fuse handle */ + rc = dfs_dup(oh->doh_dfs, oh->doh_ie->ie_obj, oh->doh_flags, &oh->doh_obj); + if (rc) + D_GOTO(err, rc); + } + rc = dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev); if (rc != 0) D_GOTO(err, rc); diff --git a/src/tests/ftest/util/dfuse_utils.py b/src/tests/ftest/util/dfuse_utils.py index a26f372e76d..900da63ebf1 100644 --- a/src/tests/ftest/util/dfuse_utils.py +++ b/src/tests/ftest/util/dfuse_utils.py @@ -30,7 +30,6 @@ def __init__(self, namespace, command, path=""): self.sys_name = FormattedParameter("--sys-name {}") self.thread_count = FormattedParameter("--thread-count {}") self.eq_count = FormattedParameter("--eq-count {}") - self.singlethreaded = FormattedParameter("--singlethread", False) self.foreground = FormattedParameter("--foreground", False) self.enable_caching = FormattedParameter("--enable-caching", False) self.enable_wb_cache = FormattedParameter("--enable-wb-cache", False) diff --git a/utils/build.config b/utils/build.config index a14ad039c15..6313639fc29 100644 --- a/utils/build.config +++ b/utils/build.config @@ -3,7 +3,7 @@ component=daos [commit_versions] argobots=v1.1 -fuse=fuse-3.16.2 +fuse=a6a219f5344a5c09cec34416818342ac220a0df2 pmdk=2.1.0 isal=v2.30.0 isal_crypto=v2.23.0 @@ -27,6 +27,6 @@ ucx=https://github.com/openucx/ucx.git [patch_versions] spdk=https://github.com/spdk/spdk/commit/b0aba3fcd5aceceea530a702922153bc75664978.diff,https://github.com/spdk/spdk/commit/445a4c808badbad3942696ecf16fa60e8129a747.diff -fuse=https://github.com/libfuse/libfuse/commit/c9905341ea34ff9acbc11b3c53ba8bcea35eeed8.diff +fuse=https://patch-diff.githubusercontent.com/raw/ashleypittman/fused/pull/1.patch mercury=https://raw.githubusercontent.com/daos-stack/mercury/f3dc286fb40ec1a3a38a2e17c45497bc2aa6290d/na_ucx.patch pmdk=https://github.com/pmem/pmdk/commit/2abe15ac0b4eed894b6768cd82a3b0a7c4336284.diff diff --git a/utils/cq/words.dict b/utils/cq/words.dict index 7a2784b61f8..102ca21a882 100644 --- a/utils/cq/words.dict +++ b/utils/cq/words.dict @@ -140,6 +140,7 @@ debian debuginfo defusedxml del +dentry deps dereference dereferencing diff --git a/utils/node_local_test.py b/utils/node_local_test.py index b3f7d9a457c..950c1a6dcf5 100755 --- a/utils/node_local_test.py +++ b/utils/node_local_test.py @@ -1337,7 +1337,7 @@ def __str__(self): return f'DFuse instance at {self.dir} ({running})' - def start(self, v_hint=None, single_threaded=False, use_oopt=False): + def start(self, v_hint=None, use_oopt=False): """Start a dfuse instance""" # pylint: disable=too-many-branches dfuse_bin = join(self.conf['PREFIX'], 'bin', 'dfuse') @@ -1385,9 +1385,7 @@ def start(self, v_hint=None, single_threaded=False, use_oopt=False): if self.multi_user: cmd.append('--multi-user') - if single_threaded: - cmd.append('--singlethread') - elif not self.cores: + if not self.cores: # Use a lower default thread-count for NLT due to running tests in parallel. cmd.extend(['--thread-count', '4']) @@ -1562,17 +1560,17 @@ def run_query(self, use_json=False, quiet=False): return rc def check_usage(self, ino=None, inodes=None, open_files=None, pools=None, containers=None, - qpath=None): + qpath=None, old=None): """Query and verify the dfuse statistics. + Optionally verify numbers are as expected/defined or return a delta to a previous sample. Returns the raw numbers in a dict. """ cmd = ['filesystem', 'query', qpath or self.dir] if ino is not None: cmd.extend(['--inode', str(ino)]) - rc = run_daos_cmd(self.conf, cmd, use_json=True) - print(rc) + rc = run_daos_cmd(self.conf, cmd, use_json=True, valgrind=False, log_check=False) assert rc.returncode == 0, rc if inodes: @@ -1583,6 +1581,16 @@ def check_usage(self, ino=None, inodes=None, open_files=None, pools=None, contai assert rc.json['response']['pools'] == pools, rc if containers: assert rc.json['response']['containers'] == containers, rc + + # If a prior version of the statistics was supplied then take a delta, but take a delta of + # the prior version as it came from daos, not as it was reported. + if 'statistics' in rc.json['response']: + rc.json['response']['raw'] = copy.deepcopy(rc.json['response']['statistics']) + if old: + for key in rc.json['response']['statistics']: + rc.json['response']['statistics'][key] -= old['raw'].get(key, 0) + + print(f"Usage: {rc.json['response']}") return rc.json['response'] def _evict_path(self, path): @@ -1606,7 +1614,7 @@ def evict_and_wait(self, paths, qpath=None): for inode in inodes: found = True while found: - rc = self.check_usage(inode, qpath=qpath) + rc = self.check_usage(ino=inode, qpath=qpath) print(rc) found = rc['resident'] if not found: @@ -1980,11 +1988,9 @@ class needs_dfuse_with_opt(): wrapping_lock = threading.Lock() # pylint: disable=too-few-public-methods - def __init__(self, caching_variants=None, wbcache=True, single_threaded=False, - dfuse_inval=True, ro=False): + def __init__(self, caching_variants=None, wbcache=True, dfuse_inval=True, ro=False): self.caching_variants = caching_variants if caching_variants else [False, True] self.wbcache = wbcache - self.single_threaded = single_threaded self.dfuse_inval = dfuse_inval self.ro = ro @@ -2011,6 +2017,12 @@ def _helper(obj): 'dfuse-dentry-dir-time': '5m', 'dfuse-ndentry-time': '5m'} obj.container.set_attrs(cont_attrs) + elif caching: + cont_attrs = {'dfuse-attr-time': '1m', + 'dfuse-dentry-time': '1m', + 'dfuse-dentry-dir-time': '1m', + 'dfuse-ndentry-time': '1m'} + obj.container.set_attrs(cont_attrs) if self.ro: args["ro"] = True @@ -2020,7 +2032,7 @@ def _helper(obj): caching=caching, wbcache=self.wbcache, **args) - obj.dfuse.start(v_hint=method.__name__, single_threaded=self.single_threaded) + obj.dfuse.start(v_hint=method.__name__) try: rc = method(obj) finally: @@ -2310,7 +2322,7 @@ def test_read(self): def test_pre_read(self): """Test the pre-read code. - Test reading a file which is previously unknown to fuse with caching on. This should go + Test reading a files which are previously unknown to fuse with caching on. This should go into the pre_read code and load the file contents automatically after the open call. """ dfuse = DFuse(self.server, self.conf, container=self.container) @@ -2339,32 +2351,57 @@ def test_pre_read(self): dfuse = DFuse(self.server, self.conf, caching=True, container=self.container) dfuse.start(v_hint='pre_read_1') + # Open a file and read in one go. with open(join(dfuse.dir, 'file0'), 'r') as fd: data0 = fd.read() + res = dfuse.check_usage() + assert res['statistics']['pre_read'] == 1, res + # Open a file and read in one go. with open(join(dfuse.dir, 'file1'), 'r') as fd: data1 = fd.read(16) + res = dfuse.check_usage(old=res) + assert res['statistics']['pre_read'] == 1, res + + # Open a file and read two bytes at a time. Despite disabling buffering python will try and + # read a whole page the first time. + fd = os.open(join(dfuse.dir, 'file2'), os.O_RDONLY) + data2 = os.read(fd, 2) + res = dfuse.check_usage(old=res) + assert res['statistics']['pre_read'] == 1, res + _ = os.read(fd, 2) + res = dfuse.check_usage(old=res) + assert res['statistics']['pre_read'] == 0, res + os.close(fd) - with open(join(dfuse.dir, 'file2'), 'r') as fd: - data2 = fd.read(2) - + # Open a MB file. This reads 8 128k chunks and 1 EOF. with open(join(dfuse.dir, 'file3'), 'r') as fd: data3 = fd.read() + res = dfuse.check_usage(old=res) + assert res['statistics']['pre_read'] == 9, res + # Open a (1MB-1) file. This reads 8 128k chunks, the last is truncated. There is no EOF + # returned by dfuse here, just a truncated read but I assume python is interpreting a + # truncated read at the expected file size as an EOF. with open(join(dfuse.dir, 'file4'), 'r') as fd: data4 = fd.read() data5 = fd.read() - # This should not use the pre-read feature, to be validated via the logs. + res = dfuse.check_usage(old=res) + assert res['statistics']['pre_read'] == 8, res + + # This should now be read from cache. with open(join(dfuse.dir, 'file4'), 'r') as fd: data6 = fd.read() + res = dfuse.check_usage(old=res) + assert res['statistics']['read'] == 0, res if dfuse.stop(): self.fatal_errors = True print(data0) assert data0 == 'test' assert data1 == 'test' - assert data2 == 'te' + assert data2 == b'te', data2 assert raw_data0 == data3 assert raw_data1 == data4 assert len(data5) == 0 @@ -2678,11 +2715,6 @@ def test_readdir_unlink(self): assert len(post_files) == len(files) - 1 assert post_files == files[:-2] + [files[-1]] - @needs_dfuse_with_opt(single_threaded=True, caching_variants=[True]) - def test_single_threaded(self): - """Test single-threaded mode""" - self.readdir_test(10) - @needs_dfuse def test_open_replaced(self): """Test that fstat works on file clobbered by rename""" @@ -3378,7 +3410,7 @@ def test_complex_unlink(self): print(os.listdir(self.dfuse.dir)) print(os.listdir(dfuse.dir)) - # Rename file 0 to file 0 in the background, this will remove file 1 + # Rename file 0 to file 1 in the background, this will remove the old file 1 os.rename(join(dfuse.dir, 'file.0'), join(dfuse.dir, 'file.1')) # Perform the unlink, this will unlink the other file. @@ -3395,6 +3427,45 @@ def test_complex_unlink(self): for fd in fds: fd.close() + @needs_dfuse_with_opt(caching_variants=[False]) + def test_create_exists(self): + """Test creating a file. + + This tests for create where the dentry being created already exists and is a file that's + known to dfuse. + + To do this make a file in dfuse, use a back channel to rename it and then create a file + using the new name.""" + + filename = join(self.dfuse.dir, 'myfile') + + with open(filename, 'w') as fd: + fd.write('hello') + + filename = join(self.dfuse.dir, 'newfile') + try: + os.stat(filename) + raise NLTestFail("File exists") + except FileNotFoundError: + pass + + # Start another dfuse instance to move the files around without the kernel knowing. + dfuse = DFuse(self.server, + self.conf, + container=self.container, + caching=False) + dfuse.start(v_hint='create_exists_1') + + os.rename(join(dfuse.dir, 'myfile'), join(dfuse.dir, 'newfile')) + + filename = join(self.dfuse.dir, 'newfile') + + with open(filename, 'w') as fd: + fd.write('hello') + + if dfuse.stop(): + self.fatal_errors = True + def test_cont_rw(self): """Test write access to another users container""" dfuse = DFuse(self.server, @@ -5987,7 +6058,7 @@ def test_dfuse_start(server, conf, wf): cmd = [join(conf['PREFIX'], 'bin', 'dfuse'), '--mountpoint', mount_point, - '--pool', pool.id(), '--cont', container.id(), '--foreground', '--singlethread'] + '--pool', pool.id(), '--cont', container.id(), '--foreground', '--thread-count=2'] test_cmd = AllocFailTest(conf, 'dfuse', cmd) test_cmd.wf = wf