Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-16686 dfuse: Detect matching reads to avoid network access. #15528

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
8 changes: 8 additions & 0 deletions src/client/dfuse/dfuse.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,13 @@ struct dfuse_event {
d_iov_t de_iov;
d_sg_list_t de_sgl;
d_list_t de_list;

/* Position in a list of events, this will either be off active->open_reads or
* de->de_read_slaves.
*/
d_list_t de_read_list;
/* List of slave events */
d_list_t de_read_slaves;
struct dfuse_eq *de_eqt;
union {
struct dfuse_obj_hdl *de_oh;
Expand Down Expand Up @@ -1017,6 +1024,7 @@ struct dfuse_inode_entry {

struct active_inode {
d_list_t chunks;
d_list_t open_reads;
pthread_spinlock_t lock;
struct dfuse_pre_read *readahead;
};
Expand Down
2 changes: 2 additions & 0 deletions src/client/dfuse/dfuse_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,8 @@ dfuse_read_event_size(void *arg, size_t size)
ev->de_sgl.sg_nr = 1;
}

D_INIT_LIST_HEAD(&ev->de_read_slaves);

rc = daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL);
if (rc != -DER_SUCCESS) {
return false;
Expand Down
1 change: 1 addition & 0 deletions src/client/dfuse/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ active_ie_init(struct dfuse_inode_entry *ie, bool *preread)
goto out;
}
D_INIT_LIST_HEAD(&ie->ie_active->chunks);
D_INIT_LIST_HEAD(&ie->ie_active->open_reads);
if (preread && *preread) {
D_ALLOC_PTR(ie->ie_active->readahead);
if (ie->ie_active->readahead) {
Expand Down
95 changes: 67 additions & 28 deletions src/client/dfuse/ops/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "dfuse.h"

static void
dfuse_cb_read_complete(struct dfuse_event *ev)
cb_read_helper(struct dfuse_event *ev, void *buff)
{
struct dfuse_obj_hdl *oh = ev->de_oh;

Expand All @@ -22,32 +22,54 @@ dfuse_cb_read_complete(struct dfuse_event *ev)
oh->doh_linear_read = false;
} else {
oh->doh_linear_read_pos = ev->de_req_position + ev->de_len;
if (ev->de_len < ev->de_req_len) {
if (ev->de_len < ev->de_req_len)
oh->doh_linear_read_eof = true;
}
}
}

if (ev->de_len == 0) {
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position,
ev->de_req_position + ev->de_req_len - 1);

DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len);
D_GOTO(release, 0);
}

if (ev->de_len == ev->de_req_len)
if (ev->de_len == ev->de_req_len) {
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", ev->de_req_position,
ev->de_req_position + ev->de_req_len - 1);
else
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)",
ev->de_req_position, ev->de_req_position + ev->de_len - 1,
ev->de_req_position + ev->de_len,
ev->de_req_position + ev->de_req_len - 1);
} else {
if (ev->de_len == 0)
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position,
ev->de_req_position + ev->de_req_len - 1);
else
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)",
ev->de_req_position, ev->de_req_position + ev->de_len - 1,
ev->de_req_position + ev->de_len,
ev->de_req_position + ev->de_req_len - 1);
}

DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len);
DFUSE_REPLY_BUFQ(oh, ev->de_req, buff, ev->de_len);
release:
daos_event_fini(&ev->de_ev);
}

static void
dfuse_cb_read_complete(struct dfuse_event *ev)
{
struct dfuse_event *evs, *evn;

D_SPIN_LOCK(&ev->de_oh->doh_ie->ie_active->lock);
d_list_del(&ev->de_read_list);
D_SPIN_UNLOCK(&ev->de_oh->doh_ie->ie_active->lock);

d_list_for_each_entry(evs, &ev->de_read_slaves, de_read_list) {
DFUSE_TRA_DEBUG(ev->de_oh, "concurrent network read %p", evs->de_oh);
evs->de_len = min(ev->de_len, evs->de_req_len);
evs->de_ev.ev_error = ev->de_ev.ev_error;
cb_read_helper(evs, ev->de_iov.iov_buf);
}

cb_read_helper(ev, ev->de_iov.iov_buf);

d_list_for_each_entry_safe(evs, evn, &ev->de_read_slaves, de_read_list) {
d_list_del(&evs->de_read_list);
d_slab_restock(evs->de_eqt->de_read_slab);
d_slab_release(evs->de_eqt->de_read_slab, evs);
}

d_slab_restock(ev->de_eqt->de_read_slab);
d_slab_release(ev->de_eqt->de_read_slab, ev);
}
Expand Down Expand Up @@ -129,7 +151,7 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o
}

if (((position % K128) == 0) && ((len % K128) == 0)) {
DFUSE_TRA_INFO(oh, "allowing out-of-order pre read");
DFUSE_TRA_DEBUG(oh, "allowing out-of-order pre read");
/* Do not closely track the read position in this case, just the maximum,
* later checks will determine if the file is read to the end.
*/
Expand Down Expand Up @@ -493,8 +515,10 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct
eqt = pick_eqt(dfuse_info);

ev = d_slab_acquire(eqt->de_read_slab);
if (ev == NULL)
D_GOTO(err, rc = ENOMEM);
if (ev == NULL) {
DFUSE_REPLY_ERR_RAW(oh, req, ENOMEM);
return;
}

if (oh->doh_ie->ie_truncated && position + len < oh->doh_ie->ie_stat.st_size &&
((oh->doh_ie->ie_start_off == 0 && oh->doh_ie->ie_end_off == 0) ||
Expand Down Expand Up @@ -531,9 +555,30 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct

DFUSE_IE_WFLUSH(oh->doh_ie);

/* Check for open matching reads, if there are multiple readers of the same file offset
* then chain future requests off the first one to avoid extra network round-trips. This
* can and does happen even with caching enabled if there are multiple client processes.
*/
D_SPIN_LOCK(&active->lock);
{
struct dfuse_event *evc;

d_list_for_each_entry(evc, &active->open_reads, de_read_list) {
if (ev->de_req_position == evc->de_req_position &&
ev->de_req_len <= evc->de_req_len) {
d_list_add(&ev->de_read_list, &evc->de_read_slaves);
D_SPIN_UNLOCK(&active->lock);
return;
}
}
d_list_add_tail(&ev->de_read_list, &active->open_reads);
}
D_SPIN_UNLOCK(&active->lock);

rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_len, &ev->de_ev);
if (rc != 0) {
D_GOTO(err, rc);
ev->de_ev.ev_error = rc;
dfuse_cb_read_complete(ev);
return;
}

Expand All @@ -544,12 +589,6 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct
d_slab_restock(eqt->de_read_slab);

return;
err:
DFUSE_REPLY_ERR_RAW(oh, req, rc);
if (ev) {
daos_event_fini(&ev->de_ev);
d_slab_release(eqt->de_read_slab, ev);
}
}

static void
Expand Down
Loading