Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patches series for concurrent read #15644

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/user/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ Additionally, there are several optional command-line options:
| --container=<label\|uuid\> | container label or uuid to open |
| --sys-name=<name\> | DAOS system name |
| --foreground | run in foreground |
| --singlethreaded | run single threaded |
| --thread-count=<count> | Number of threads to use |
| --multi-user | Run in multi user mode |
| --read-only | Mount in read-only mode |
Expand Down
1 change: 1 addition & 0 deletions src/client/dfuse/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ DFUSE_SRC = ['dfuse_core.c',
'dfuse_main.c',
'dfuse_fuseops.c',
'inval.c',
'file.c',
'dfuse_cont.c',
'dfuse_thread.c',
'dfuse_pool.c']
Expand Down
83 changes: 72 additions & 11 deletions src/client/dfuse/dfuse.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ struct dfuse_info {
char *di_mountpoint;
int32_t di_thread_count;
uint32_t di_eq_count;
bool di_threaded;
bool di_foreground;
bool di_caching;
bool di_multi_user;
Expand Down Expand Up @@ -96,6 +95,9 @@ struct dfuse_eq {
* memory consumption */
#define DFUSE_MAX_PRE_READ (1024 * 1024 * 4)

/* Maximum file-size for pre-read in all cases */
#define DFUSE_MAX_PRE_READ_ONCE (1024 * 1024 * 1)

/* Launch fuse, and do not return until complete */
int
dfuse_launch_fuse(struct dfuse_info *dfuse_info, struct fuse_args *args);
Expand Down Expand Up @@ -137,9 +139,10 @@ struct dfuse_inode_entry;
* when EOF is returned to the kernel. If it's still present on release then it's freed then.
*/
struct dfuse_pre_read {
pthread_mutex_t dra_lock;
d_list_t req_list;
struct dfuse_event *dra_ev;
int dra_rc;
bool complete;
};

/** what is returned as the handle for fuse fuse_file_info on create/open/opendir */
Expand All @@ -149,8 +152,6 @@ struct dfuse_obj_hdl {
/** the DFS object handle. Not created for directories. */
dfs_obj_t *doh_obj;

struct dfuse_pre_read *doh_readahead;

/** the inode entry for the file */
struct dfuse_inode_entry *doh_ie;

Expand All @@ -169,17 +170,24 @@ struct dfuse_obj_hdl {
/* Pointer to the last returned drc entry */
struct dfuse_readdir_c *doh_rd_nextc;

/* Linear read function, if a file is read from start to end then this normally requires
* a final read request at the end of the file that returns zero bytes. Detect this case
* and when the final read is detected then just return without a round trip.
* Store a flag for this being enabled (starts as true, but many I/O patterns will set it
* to false), the expected position of the next read and a boolean for if EOF has been
* detected.
/* Linear read tracking. If a file is opened and read from start to finish then this is
* called a linear read, linear reads however may or may not read EOF at the end of a file,
* as the reader may be checking the file size.
*
* Detect this case and track it at the file handle level, this is then used in two places:
* For read of EOF it means the round-trip can be avoided.
* On release we can use this flag to apply a setting to the directory inode.
*
* This flag starts enabled and many I/O patterns will disable it. We also store the next
* expected read position and if EOF has been reached.
*/

off_t doh_linear_read_pos;
bool doh_linear_read;
bool doh_linear_read_eof;

bool doh_set_linear_read;

/** True if caching is enabled for this file. */
bool doh_caching;

Expand All @@ -197,6 +205,10 @@ struct dfuse_obj_hdl {
bool doh_kreaddir_finished;

bool doh_evict_on_close;
/* the handle is doing readhead for the moment */
bool doh_readahead_inflight;

int doh_flags;
};

/* Readdir support.
Expand Down Expand Up @@ -401,11 +413,20 @@ struct dfuse_event {
d_iov_t de_iov;
d_sg_list_t de_sgl;
d_list_t de_list;

/* Position in a list of events, this will either be off active->open_reads or
* de->de_read_slaves.
*/
d_list_t de_read_list;
/* List of slave events */
d_list_t de_read_slaves;
struct dfuse_eq *de_eqt;
union {
struct dfuse_obj_hdl *de_oh;
struct dfuse_inode_entry *de_ie;
struct read_chunk_data *de_cd;
};
struct dfuse_info *de_di;
off_t de_req_position; /**< The file position requested by fuse */
union {
size_t de_req_len;
Expand Down Expand Up @@ -1009,10 +1030,35 @@ struct dfuse_inode_entry {
*/
ATOMIC bool ie_linear_read;

struct active_inode *ie_active;

/* Entry on the evict list */
d_list_t ie_evict_entry;
};

struct active_inode {
d_list_t chunks;
d_list_t open_reads;
pthread_spinlock_t lock;
ATOMIC uint64_t read_count;
struct dfuse_pre_read *readahead;
};

/* Increase active count on inode. This takes a reference and allocates ie->active as required */
int
active_ie_init(struct dfuse_inode_entry *ie);

int
active_ie_readahead_init(struct dfuse_inode_entry *ie);

/* Mark a oh as closing and drop the ref on inode active */
void
active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh);

/* Decrease active count on inode, called on error where there is no oh */
void
active_ie_decref(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie);

/* Flush write-back cache writes to a inode. It does this by waiting for and then releasing an
* exclusive lock on the inode. Writes take a shared lock so this will block until all pending
* writes are complete.
Expand Down Expand Up @@ -1108,6 +1154,13 @@ dfuse_compute_inode(struct dfuse_cont *dfs,
void
dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie);

/* Free any read chunk data for an inode.
*
* Returns true if feature was used.
*/
bool
read_chunk_close(struct dfuse_inode_entry *ie);

/* Metadata caching functions. */

/* Mark the cache as up-to-date from now */
Expand Down Expand Up @@ -1171,7 +1224,15 @@ bool
dfuse_dcache_get_valid(struct dfuse_inode_entry *ie, double max_age);

void
dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh);
dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, struct dfuse_event *ev);

int
dfuse_pre_read_init(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie,
struct dfuse_event **evp);

void
dfuse_pre_read_abort(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh,
struct dfuse_event *ev, int rc);

int
check_for_uns_ep(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, char *attr,
Expand Down
3 changes: 3 additions & 0 deletions src/client/dfuse/dfuse_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,7 @@ dfuse_ie_close(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie)
atomic_load_relaxed(&ie->ie_il_count));
D_ASSERTF(atomic_load_relaxed(&ie->ie_open_count) == 0, "open_count is %d",
atomic_load_relaxed(&ie->ie_open_count));
D_ASSERT(!ie->ie_active);

if (ie->ie_obj) {
rc = dfs_release(ie->ie_obj);
Expand Down Expand Up @@ -1317,6 +1318,8 @@ dfuse_read_event_size(void *arg, size_t size)
ev->de_sgl.sg_nr = 1;
}

D_INIT_LIST_HEAD(&ev->de_read_slaves);

rc = daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL);
if (rc != -DER_SUCCESS) {
return false;
Expand Down
9 changes: 7 additions & 2 deletions src/client/dfuse/dfuse_fuseops.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -88,7 +88,12 @@ dfuse_fuse_init(void *arg, struct fuse_conn_info *conn)
DFUSE_TRA_INFO(dfuse_info, "kernel readdir cache support compiled in");

conn->want |= FUSE_CAP_READDIRPLUS;
conn->want |= FUSE_CAP_READDIRPLUS_AUTO;
/* Temporarily force readdir plus for all cases now, which can
* help to save some lookup RPC for some cases. Though this can be
* removed once we use object enumeration to replace the normal key
* enumeration for readdir. XXX
*/
conn->want &= ~FUSE_CAP_READDIRPLUS_AUTO;

#ifdef FUSE_CAP_CACHE_SYMLINKS
conn->want |= FUSE_CAP_CACHE_SYMLINKS;
Expand Down
26 changes: 8 additions & 18 deletions src/client/dfuse/dfuse_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ dfuse_bg(struct dfuse_info *dfuse_info)
*
* Should be called from the post_start plugin callback and creates
* a filesystem.
* Returns a DAOS error code.
* Returns true on success, false on failure.
*/
int
Expand Down Expand Up @@ -204,18 +205,17 @@ dfuse_launch_fuse(struct dfuse_info *dfuse_info, struct fuse_args *args)
DFUSE_TRA_ERROR(dfuse_info, "Error sending signal to fg: "DF_RC, DP_RC(rc));

/* Blocking */
if (dfuse_info->di_threaded)
rc = dfuse_loop(dfuse_info);
else
rc = fuse_session_loop(dfuse_info->di_session);
if (rc != 0)
rc = dfuse_loop(dfuse_info);
if (rc != 0) {
DHS_ERROR(dfuse_info, rc, "Fuse loop exited");
rc = daos_errno2der(rc);
}

umount:

fuse_session_unmount(dfuse_info->di_session);

return daos_errno2der(rc);
return rc;
}

#define DF_POOL_PREFIX "pool="
Expand Down Expand Up @@ -279,7 +279,6 @@ show_help(char *name)
" --path=<path> Path to load UNS pool/container data\n"
" --sys-name=STR DAOS system name context for servers\n"
"\n"
" -S --singlethread Single threaded (deprecated)\n"
" -t --thread-count=count Total number of threads to use\n"
" -e --eq-count=count Number of event queues to use\n"
" -f --foreground Run in foreground\n"
Expand Down Expand Up @@ -423,7 +422,6 @@ main(int argc, char **argv)
{"pool", required_argument, 0, 'p'},
{"container", required_argument, 0, 'c'},
{"sys-name", required_argument, 0, 'G'},
{"singlethread", no_argument, 0, 'S'},
{"thread-count", required_argument, 0, 't'},
{"eq-count", required_argument, 0, 'e'},
{"foreground", no_argument, 0, 'f'},
Expand All @@ -447,13 +445,12 @@ main(int argc, char **argv)
if (dfuse_info == NULL)
D_GOTO(out_debug, rc = -DER_NOMEM);

dfuse_info->di_threaded = true;
dfuse_info->di_caching = true;
dfuse_info->di_wb_cache = true;
dfuse_info->di_eq_count = 1;

while (1) {
c = getopt_long(argc, argv, "Mm:St:o:fhe:v", long_options, NULL);
c = getopt_long(argc, argv, "Mm:t:o:fhe:v", long_options, NULL);

if (c == -1)
break;
Expand Down Expand Up @@ -491,13 +488,6 @@ main(int argc, char **argv)
case 'P':
path = optarg;
break;
case 'S':
/* Set it to be single threaded, but allow an extra one
* for the event queue processing
*/
dfuse_info->di_threaded = false;
dfuse_info->di_thread_count = 2;
break;
case 'e':
dfuse_info->di_eq_count = atoi(optarg);
break;
Expand Down Expand Up @@ -564,7 +554,7 @@ main(int argc, char **argv)
* check CPU binding. If bound to a number of cores then launch that number of threads,
* if not bound them limit to 16.
*/
if (dfuse_info->di_threaded && !have_thread_count) {
if (!have_thread_count) {
struct hwloc_topology *hwt;
hwloc_const_cpuset_t hw;
int total;
Expand Down
9 changes: 6 additions & 3 deletions src/client/dfuse/dfuse_thread.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2020-2023 Intel Corporation.
* (C) Copyright 2020-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -34,10 +34,11 @@ dfuse_do_work(void *arg)
struct dfuse_thread *dt = arg;
struct dfuse_tm *dtm = dt->dt_tm;
int rc;
struct fuse_chan *chan = fuse_clone_chan(dtm->tm_se);

while (!fuse_session_exited(dtm->tm_se)) {
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
rc = fuse_session_receive_buf(dtm->tm_se, &dt->dt_fbuf);
rc = fuse_session_receive_buf_chan(dtm->tm_se, &dt->dt_fbuf, chan);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
if (rc == -EINTR)
continue;
Expand All @@ -52,9 +53,11 @@ dfuse_do_work(void *arg)
if (atomic_load_relaxed(&dtm->tm_exit))
return NULL;

fuse_session_process_buf(dtm->tm_se, &dt->dt_fbuf);
fuse_session_process_buf_chan(dtm->tm_se, &dt->dt_fbuf, chan);
}

fuse_chan_put(chan);

sem_post(&dtm->tm_finish);

return NULL;
Expand Down
Loading
Loading