From fd27521220dd41005428d449cbe1cfe04303431d Mon Sep 17 00:00:00 2001 From: Di Wang Date: Wed, 18 Dec 2024 19:34:43 +0000 Subject: [PATCH] DAOS-16686 dfuse: Detect matching reads to avoid network access From https://github.com/daos-stack/daos/pull/15528 If a read matches a current outstanding read then simply connect the two and when there's a reply from the network then respond to both requests. Ashley Pittman Required-githooks: true --- src/client/dfuse/dfuse.h | 10 +- src/client/dfuse/dfuse_core.c | 2 + src/client/dfuse/file.c | 5 +- src/client/dfuse/ops/read.c | 379 +++++++++++++++++++--------------- 4 files changed, 229 insertions(+), 167 deletions(-) diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index 6221002e3e0..80ce2d7f844 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -399,6 +399,13 @@ struct dfuse_event { d_iov_t de_iov; d_sg_list_t de_sgl; d_list_t de_list; + + /* Position in a list of events, this will either be off active->open_reads or + * de->de_read_slaves. + */ + d_list_t de_read_list; + /* List of slave events */ + d_list_t de_read_slaves; struct dfuse_eq *de_eqt; union { struct dfuse_obj_hdl *de_oh; @@ -1017,6 +1024,7 @@ struct dfuse_inode_entry { struct active_inode { d_list_t chunks; + d_list_t open_reads; pthread_spinlock_t lock; struct dfuse_pre_read *readahead; }; @@ -1133,7 +1141,7 @@ dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *i * Returns true if feature was used. */ bool -read_chunk_close(struct active_inode *active); +read_chunk_close(struct dfuse_inode_entry *ie); /* Metadata caching functions. */ diff --git a/src/client/dfuse/dfuse_core.c b/src/client/dfuse/dfuse_core.c index e69b598efb7..e0bc4ea3c3f 100644 --- a/src/client/dfuse/dfuse_core.c +++ b/src/client/dfuse/dfuse_core.c @@ -1318,6 +1318,8 @@ dfuse_read_event_size(void *arg, size_t size) ev->de_sgl.sg_nr = 1; } + D_INIT_LIST_HEAD(&ev->de_read_slaves); + rc = daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL); if (rc != -DER_SUCCESS) { return false; diff --git a/src/client/dfuse/file.c b/src/client/dfuse/file.c index 6a86134d628..028685340d7 100644 --- a/src/client/dfuse/file.c +++ b/src/client/dfuse/file.c @@ -41,6 +41,7 @@ active_ie_init(struct dfuse_inode_entry *ie, bool *preread) goto out; } D_INIT_LIST_HEAD(&ie->ie_active->chunks); + D_INIT_LIST_HEAD(&ie->ie_active->open_reads); if (preread && *preread) { D_ALLOC_PTR(ie->ie_active->readahead); if (ie->ie_active->readahead) { @@ -96,7 +97,7 @@ active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) if (oc != 1) goto out; - rcb = read_chunk_close(oh->doh_ie->ie_active); + rcb = read_chunk_close(oh->doh_ie); ah_free(dfuse_info, oh->doh_ie); out: @@ -118,7 +119,7 @@ active_ie_decref(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie) if (oc != 1) goto out; - read_chunk_close(ie->ie_active); + read_chunk_close(ie); ah_free(dfuse_info, ie); out: diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 95ba4586e00..05f7d9bb9ef 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -8,7 +8,7 @@ #include "dfuse.h" static void -dfuse_cb_read_complete(struct dfuse_event *ev) +cb_read_helper(struct dfuse_event *ev, void *buff) { struct dfuse_obj_hdl *oh = ev->de_oh; @@ -22,32 +22,54 @@ dfuse_cb_read_complete(struct dfuse_event *ev) oh->doh_linear_read = false; } else { oh->doh_linear_read_pos = ev->de_req_position + ev->de_len; - if (ev->de_len < ev->de_req_len) { + if (ev->de_len < ev->de_req_len) oh->doh_linear_read_eof = true; - } } } - if (ev->de_len == 0) { - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, - ev->de_req_position + ev->de_req_len - 1); - - DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); - D_GOTO(release, 0); - } - - if (ev->de_len == ev->de_req_len) + if (ev->de_len == ev->de_req_len) { DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", ev->de_req_position, ev->de_req_position + ev->de_req_len - 1); - else - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", - ev->de_req_position, ev->de_req_position + ev->de_len - 1, - ev->de_req_position + ev->de_len, - ev->de_req_position + ev->de_req_len - 1); + } else { + if (ev->de_len == 0) + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, + ev->de_req_position + ev->de_req_len - 1); + else + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", + ev->de_req_position, ev->de_req_position + ev->de_len - 1, + ev->de_req_position + ev->de_len, + ev->de_req_position + ev->de_req_len - 1); + } - DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); + DFUSE_REPLY_BUFQ(oh, ev->de_req, buff, ev->de_len); release: daos_event_fini(&ev->de_ev); +} + +static void +dfuse_cb_read_complete(struct dfuse_event *ev) +{ + struct dfuse_event *evs, *evn; + + D_SPIN_LOCK(&ev->de_oh->doh_ie->ie_active->lock); + d_list_del(&ev->de_read_list); + D_SPIN_UNLOCK(&ev->de_oh->doh_ie->ie_active->lock); + + d_list_for_each_entry(evs, &ev->de_read_slaves, de_read_list) { + DFUSE_TRA_DEBUG(ev->de_oh, "concurrent network read %p", evs->de_oh); + evs->de_len = min(ev->de_len, evs->de_req_len); + evs->de_ev.ev_error = ev->de_ev.ev_error; + cb_read_helper(evs, ev->de_iov.iov_buf); + } + + cb_read_helper(ev, ev->de_iov.iov_buf); + + d_list_for_each_entry_safe(evs, evn, &ev->de_read_slaves, de_read_list) { + d_list_del(&evs->de_read_list); + d_slab_restock(evs->de_eqt->de_read_slab); + d_slab_release(evs->de_eqt->de_read_slab, evs); + } + d_slab_restock(ev->de_eqt->de_read_slab); d_slab_release(ev->de_eqt->de_read_slab, ev); } @@ -173,33 +195,38 @@ pick_eqt(struct dfuse_info *dfuse_info) * * This code is entered when caching is enabled and reads are correctly size/aligned and not in the * last CHUNK_SIZE of a file. When open then the inode contains a single read_chunk_core pointer - * and this contains a list of read_chunk_data entries, one for each bucket. + * and this contains a list of read_chunk_data entries, one for each bucket. Buckets where all + * slots have been requested are remove from the list and closed when the last request is completed. * - * TODO: Currently there is no code to remove buckets from the list so all buckets will remain in - * memory until close. + * TODO: Currently there is no code to remove partially read buckets from the list so reading + * one slot every chunk would leave the entire file contents in memory until close and mean long + * list traversal times. */ #define CHUNK_SIZE (1024 * 1024) struct read_chunk_data { struct dfuse_event *ev; - bool slot_done[8]; struct active_inode *ia; + fuse_req_t reqs[8]; + struct dfuse_obj_hdl *ohs[8]; d_list_t list; uint64_t bucket; struct dfuse_eq *eqt; int rc; int entered; + ATOMIC int exited; + bool exiting; bool complete; - d_list_t req_list; }; -struct read_chunk_req { - d_list_t req_list; - struct dfuse_obj_hdl *oh; - fuse_req_t req; - int slot; -}; +static void +chunk_free(struct read_chunk_data *cd) +{ + d_list_del(&cd->list); + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); +} /* Called when the last open file handle on a inode is closed. This needs to free everything which * is complete and for anything that isn't flag it for deletion in the callback. @@ -207,20 +234,27 @@ struct read_chunk_req { * Returns true if the feature was used. */ bool -read_chunk_close(struct active_inode *active) +read_chunk_close(struct dfuse_inode_entry *ie) { struct read_chunk_data *cd, *cdn; + bool rcb = false; - if (d_list_empty(&active->chunks)) - return false; + D_SPIN_LOCK(&ie->ie_active->lock); + if (d_list_empty(&ie->ie_active->chunks)) + goto out; - d_list_for_each_entry_safe(cd, cdn, &active->chunks, list) { - D_ASSERT(cd->complete); - d_list_del(&cd->list); - d_slab_release(cd->eqt->de_read_slab, cd->ev); - D_FREE(cd); + rcb = true; + + d_list_for_each_entry_safe(cd, cdn, &ie->ie_active->chunks, list) { + if (cd->complete) { + chunk_free(cd); + } else { + cd->exiting = true; + } } - return true; +out: + D_SPIN_UNLOCK(&ie->ie_active->lock); + return rcb; } static void @@ -228,57 +262,119 @@ chunk_cb(struct dfuse_event *ev) { struct read_chunk_data *cd = ev->de_cd; struct active_inode *ia = cd->ia; - struct read_chunk_req *cr; - struct read_chunk_req *crn; + fuse_req_t req; + bool done = false; cd->rc = ev->de_ev.ev_error; if (cd->rc == 0 && (ev->de_len != CHUNK_SIZE)) { + cd->rc = EIO; DS_WARN(cd->rc, "Unexpected short read bucket %ld (%#zx) expected %i got %zi", cd->bucket, cd->bucket * CHUNK_SIZE, CHUNK_SIZE, ev->de_len); } daos_event_fini(&ev->de_ev); - /* Mark as complete so no more get put on list */ - D_SPIN_LOCK(&ia->lock); - cd->complete = true; + do { + int i; + req = 0; - /* Mark the slot as replied to. There's a race here as the slot hasn't been replied to - * however references are dropped by the DFUSE_REPLY macros below so an extra ref on active - * would be required. The danger is that the bucket gets put on the end of the list rather - * than the start. - */ - d_list_for_each_entry(cr, &cd->req_list, req_list) - cd->slot_done[cr->slot] = true; + D_SPIN_LOCK(&ia->lock); - D_SPIN_UNLOCK(&ia->lock); + if (cd->exiting) { + chunk_free(cd); + D_SPIN_UNLOCK(&ia->lock); + return; + } - d_list_for_each_entry_safe(cr, crn, &cd->req_list, req_list) { - size_t position = (cd->bucket * CHUNK_SIZE) + (cr->slot * K128); - size_t len; + cd->complete = true; + for (i = 0; i < 8; i++) { + if (cd->reqs[i]) { + req = cd->reqs[i]; + cd->reqs[i] = 0; + break; + } + } - DFUSE_TRA_DEBUG(cr->oh, "Replying for %ld[%d]", cd->bucket, cr->slot); + D_SPIN_UNLOCK(&ia->lock); - /* Delete from the list before replying as there's no reference held otherwise */ - d_list_del(&cr->req_list); + if (req) { + size_t position = (cd->bucket * CHUNK_SIZE) + (i * K128); - if (cd->rc != 0) { - DFUSE_REPLY_ERR_RAW(cr->oh, cr->req, cd->rc); - } else { - if ((((cr->slot + 1) * K128) - 1) >= ev->de_len) - len = max(ev->de_len - (cr->slot * K128), 0); - else - len = K128; - - DFUSE_TRA_DEBUG(cr->oh, "%#zx-%#zx read", position, position + len - 1); - DFUSE_REPLY_BUFQ(cr->oh, cr->req, ev->de_iov.iov_buf + (cr->slot * K128), - len); + if (cd->rc != 0) { + DFUSE_REPLY_ERR_RAW(cd->ohs[i], req, cd->rc); + } else { + DFUSE_TRA_DEBUG(cd->ohs[i], "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(cd->ohs[i], req, ev->de_iov.iov_buf + (i * K128), + K128); + } + + if (atomic_fetch_add_relaxed(&cd->exited, 1) == 7) + done = true; } - D_FREE(cr); + } while (req && !done); + + if (done) { + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); } } +/* Submut a read to dfs. + * + * Returns true on success. + */ +static bool +chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd, int slot) +{ + struct dfuse_info *dfuse_info = fuse_req_userdata(req); + struct dfuse_inode_entry *ie = oh->doh_ie; + struct dfuse_event *ev; + struct dfuse_eq *eqt; + int rc; + daos_off_t position = cd->bucket * CHUNK_SIZE; + + eqt = pick_eqt(dfuse_info); + + ev = d_slab_acquire(eqt->de_read_slab); + if (ev == NULL) { + cd->rc = ENOMEM; + return false; + } + + ev->de_iov.iov_len = CHUNK_SIZE; + ev->de_req = req; + ev->de_cd = cd; + ev->de_sgl.sg_nr = 1; + ev->de_len = 0; + ev->de_complete_cb = chunk_cb; + + cd->ev = ev; + cd->eqt = eqt; + cd->reqs[slot] = req; + cd->ohs[slot] = oh; + + rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, position, &ev->de_len, + &ev->de_ev); + if (rc != 0) + goto err; + + /* Send a message to the async thread to wake it up and poll for events */ + sem_post(&eqt->de_sem); + + /* Now ensure there are more descriptors for the next request */ + d_slab_restock(eqt->de_read_slab); + + return true; + +err: + daos_event_fini(&ev->de_ev); + d_slab_release(eqt->de_read_slab, ev); + cd->rc = rc; + return false; +} + /* Try and do a bulk read. * * Returns true if it was able to handle the read. @@ -288,13 +384,11 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) { struct dfuse_inode_entry *ie = oh->doh_ie; struct read_chunk_data *cd; - struct read_chunk_req *cr = NULL; off_t last; uint64_t bucket; int slot; bool submit = false; bool rcb; - bool all_done = true; if (len != K128) return false; @@ -319,6 +413,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) d_list_for_each_entry(cd, &ie->ie_active->chunks, list) if (cd->bucket == bucket) { + /* Remove from list to re-add again later. */ d_list_del(&cd->list); goto found; } @@ -327,121 +422,57 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) if (cd == NULL) goto err; - D_ALLOC_PTR(cr); - if (cr == NULL) { - D_FREE(cd); - goto err; - } - - D_INIT_LIST_HEAD(&cd->req_list); cd->ia = ie->ie_active; cd->bucket = bucket; submit = true; found: - for (int i = 0; i < 8; i++) { - if (!cd->slot_done[i]) - all_done = false; + if (++cd->entered < 8) { + /* Put on front of list for efficient searching */ + d_list_add(&cd->list, &ie->ie_active->chunks); } - if (all_done) - d_list_add(&cd->list, &ie->ie_active->chunks); - else - d_list_add_tail(&cd->list, &ie->ie_active->chunks); + D_SPIN_UNLOCK(&ie->ie_active->lock); if (submit) { - struct dfuse_info *dfuse_info = fuse_req_userdata(req); - struct dfuse_eq *eqt; - struct dfuse_event *ev; - int rc; - - /* Overwrite position here to the start of the bucket */ - position = cd->bucket * CHUNK_SIZE; - - eqt = pick_eqt(dfuse_info); - - ev = d_slab_acquire(eqt->de_read_slab); - if (ev == NULL) { - d_list_del(&cd->list); - D_FREE(cr); - D_FREE(cd); - goto err; - } - - d_list_add(&cr->req_list, &cd->req_list); + DFUSE_TRA_DEBUG(oh, "submit for bucket %ld[%d]", bucket, slot); + rcb = chunk_fetch(req, oh, cd, slot); + } else { + struct dfuse_event *ev = NULL; /* Now check if this read request is complete or not yet, if it isn't then just * save req in the right slot however if it is then reply here. After the call to * DFUSE_REPLY_* then no reference is held on either the open file or the inode so * at that point they could be closed. */ + rcb = true; - DFUSE_TRA_DEBUG(oh, "submit for bucket %ld[%d]", bucket, slot); - D_SPIN_UNLOCK(&ie->ie_active->lock); - - cd->eqt = eqt; - cd->ev = ev; - - cr->req = req; - cr->oh = oh; - cr->slot = slot; - - ev->de_iov.iov_len = CHUNK_SIZE; - ev->de_req = req; - ev->de_cd = cd; - ev->de_sgl.sg_nr = 1; - ev->de_len = 0; - ev->de_complete_cb = chunk_cb; - - rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, position, &ev->de_len, - &ev->de_ev); - if (rc == 0) { - /* Send a message to the async thread to wake it up and poll for events */ - sem_post(&eqt->de_sem); + D_SPIN_LOCK(&ie->ie_active->lock); + if (cd->complete) { + ev = cd->ev; } else { - ev->de_ev.ev_error = rc; - chunk_cb(ev); + cd->reqs[slot] = req; + cd->ohs[slot] = oh; } - - rcb = true; - } else if (cd->complete) { - cd->slot_done[slot] = true; - D_SPIN_UNLOCK(&ie->ie_active->lock); - if (cd->rc != 0) { - /* Don't pass fuse an error here, rather return false and - * the read will be tried over the network. - */ - rcb = false; - } else { - size_t read_len; - - if ((((slot + 1) * K128) - 1) >= cd->ev->de_len) - read_len = max(cd->ev->de_len - (slot * K128), 0); - else - read_len = K128; - - oh->doh_linear_read_pos = max(oh->doh_linear_read_pos, position + read_len); - - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, position + read_len - 1); - DFUSE_REPLY_BUFQ(oh, req, cd->ev->de_iov.iov_buf + (slot * K128), read_len); - rcb = true; - } - } else { - rcb = false; - - D_ALLOC_PTR(cr); - if (cr) { - cr->req = req; - cr->oh = oh; - cr->slot = slot; - d_list_add_tail(&cr->req_list, &cd->req_list); - rcb = true; + if (ev) { + if (cd->rc != 0) { + /* Don't pass fuse an error here, rather return false and the read + * will be tried over the network. + */ + rcb = false; + } else { + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128); + } + if (atomic_fetch_add_relaxed(&cd->exited, 1) == 7) { + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); + } } - - D_SPIN_UNLOCK(&ie->ie_active->lock); } return rcb; @@ -524,6 +555,26 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct DFUSE_IE_WFLUSH(oh->doh_ie); + /* Check for open matching reads, if there are multiple readers of the same file offset + * then chain future requests off the first one to avoid extra network round-trips. This + * can and does happen even with caching enabled if there are multiple client processes. + */ + D_SPIN_LOCK(&active->lock); + { + struct dfuse_event *evc; + + d_list_for_each_entry(evc, &active->open_reads, de_read_list) { + if (ev->de_req_position == evc->de_req_position && + ev->de_req_len <= evc->de_req_len) { + d_list_add(&ev->de_read_list, &evc->de_read_slaves); + D_SPIN_UNLOCK(&active->lock); + return; + } + } + d_list_add_tail(&ev->de_read_list, &active->open_reads); + } + D_SPIN_UNLOCK(&active->lock); + rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_len, &ev->de_ev); if (rc != 0) { ev->de_ev.ev_error = rc;