diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index 84def83db13..261161e5f08 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -399,6 +399,13 @@ struct dfuse_event { d_iov_t de_iov; d_sg_list_t de_sgl; d_list_t de_list; + + /* Position in a list of events, this will either be off active->open_reads or + * de->de_read_slaves. + */ + d_list_t de_read_list; + /* List of slave events */ + d_list_t de_read_slaves; struct dfuse_eq *de_eqt; union { struct dfuse_obj_hdl *de_oh; @@ -1017,6 +1024,7 @@ struct dfuse_inode_entry { struct active_inode { d_list_t chunks; + d_list_t open_reads; pthread_spinlock_t lock; struct dfuse_pre_read *readahead; }; diff --git a/src/client/dfuse/dfuse_core.c b/src/client/dfuse/dfuse_core.c index de16408ab8a..f380eb79a8a 100644 --- a/src/client/dfuse/dfuse_core.c +++ b/src/client/dfuse/dfuse_core.c @@ -1322,6 +1322,8 @@ dfuse_read_event_size(void *arg, size_t size) ev->de_sgl.sg_nr = 1; } + D_INIT_LIST_HEAD(&ev->de_read_slaves); + rc = daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL); if (rc != -DER_SUCCESS) { return false; diff --git a/src/client/dfuse/file.c b/src/client/dfuse/file.c index f18fbde0c56..028685340d7 100644 --- a/src/client/dfuse/file.c +++ b/src/client/dfuse/file.c @@ -41,6 +41,7 @@ active_ie_init(struct dfuse_inode_entry *ie, bool *preread) goto out; } D_INIT_LIST_HEAD(&ie->ie_active->chunks); + D_INIT_LIST_HEAD(&ie->ie_active->open_reads); if (preread && *preread) { D_ALLOC_PTR(ie->ie_active->readahead); if (ie->ie_active->readahead) { diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 7dd0898101c..05f7d9bb9ef 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -8,7 +8,7 @@ #include "dfuse.h" static void -dfuse_cb_read_complete(struct dfuse_event *ev) +cb_read_helper(struct dfuse_event *ev, void *buff) { struct dfuse_obj_hdl *oh = ev->de_oh; @@ -22,32 +22,54 @@ dfuse_cb_read_complete(struct dfuse_event *ev) oh->doh_linear_read = false; } else { oh->doh_linear_read_pos = ev->de_req_position + ev->de_len; - if (ev->de_len < ev->de_req_len) { + if (ev->de_len < ev->de_req_len) oh->doh_linear_read_eof = true; - } } } - if (ev->de_len == 0) { - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, - ev->de_req_position + ev->de_req_len - 1); - - DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); - D_GOTO(release, 0); - } - - if (ev->de_len == ev->de_req_len) + if (ev->de_len == ev->de_req_len) { DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", ev->de_req_position, ev->de_req_position + ev->de_req_len - 1); - else - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", - ev->de_req_position, ev->de_req_position + ev->de_len - 1, - ev->de_req_position + ev->de_len, - ev->de_req_position + ev->de_req_len - 1); + } else { + if (ev->de_len == 0) + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, + ev->de_req_position + ev->de_req_len - 1); + else + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", + ev->de_req_position, ev->de_req_position + ev->de_len - 1, + ev->de_req_position + ev->de_len, + ev->de_req_position + ev->de_req_len - 1); + } - DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); + DFUSE_REPLY_BUFQ(oh, ev->de_req, buff, ev->de_len); release: daos_event_fini(&ev->de_ev); +} + +static void +dfuse_cb_read_complete(struct dfuse_event *ev) +{ + struct dfuse_event *evs, *evn; + + D_SPIN_LOCK(&ev->de_oh->doh_ie->ie_active->lock); + d_list_del(&ev->de_read_list); + D_SPIN_UNLOCK(&ev->de_oh->doh_ie->ie_active->lock); + + d_list_for_each_entry(evs, &ev->de_read_slaves, de_read_list) { + DFUSE_TRA_DEBUG(ev->de_oh, "concurrent network read %p", evs->de_oh); + evs->de_len = min(ev->de_len, evs->de_req_len); + evs->de_ev.ev_error = ev->de_ev.ev_error; + cb_read_helper(evs, ev->de_iov.iov_buf); + } + + cb_read_helper(ev, ev->de_iov.iov_buf); + + d_list_for_each_entry_safe(evs, evn, &ev->de_read_slaves, de_read_list) { + d_list_del(&evs->de_read_list); + d_slab_restock(evs->de_eqt->de_read_slab); + d_slab_release(evs->de_eqt->de_read_slab, evs); + } + d_slab_restock(ev->de_eqt->de_read_slab); d_slab_release(ev->de_eqt->de_read_slab, ev); } @@ -129,7 +151,7 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o } if (((position % K128) == 0) && ((len % K128) == 0)) { - DFUSE_TRA_INFO(oh, "allowing out-of-order pre read"); + DFUSE_TRA_DEBUG(oh, "allowing out-of-order pre read"); /* Do not closely track the read position in this case, just the maximum, * later checks will determine if the file is read to the end. */ @@ -493,8 +515,10 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct eqt = pick_eqt(dfuse_info); ev = d_slab_acquire(eqt->de_read_slab); - if (ev == NULL) - D_GOTO(err, rc = ENOMEM); + if (ev == NULL) { + DFUSE_REPLY_ERR_RAW(oh, req, ENOMEM); + return; + } if (oh->doh_ie->ie_truncated && position + len < oh->doh_ie->ie_stat.st_size && ((oh->doh_ie->ie_start_off == 0 && oh->doh_ie->ie_end_off == 0) || @@ -531,9 +555,30 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct DFUSE_IE_WFLUSH(oh->doh_ie); + /* Check for open matching reads, if there are multiple readers of the same file offset + * then chain future requests off the first one to avoid extra network round-trips. This + * can and does happen even with caching enabled if there are multiple client processes. + */ + D_SPIN_LOCK(&active->lock); + { + struct dfuse_event *evc; + + d_list_for_each_entry(evc, &active->open_reads, de_read_list) { + if (ev->de_req_position == evc->de_req_position && + ev->de_req_len <= evc->de_req_len) { + d_list_add(&ev->de_read_list, &evc->de_read_slaves); + D_SPIN_UNLOCK(&active->lock); + return; + } + } + d_list_add_tail(&ev->de_read_list, &active->open_reads); + } + D_SPIN_UNLOCK(&active->lock); + rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_len, &ev->de_ev); if (rc != 0) { - D_GOTO(err, rc); + ev->de_ev.ev_error = rc; + dfuse_cb_read_complete(ev); return; } @@ -544,12 +589,6 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct d_slab_restock(eqt->de_read_slab); return; -err: - DFUSE_REPLY_ERR_RAW(oh, req, rc); - if (ev) { - daos_event_fini(&ev->de_ev); - d_slab_release(eqt->de_read_slab, ev); - } } static void