Skip to content

Commit

Permalink
Split dmu_zfetch() speculation and execution parts.
Browse files Browse the repository at this point in the history
To make better predictions on parrallel workloads dmu_zfetch() should
be called as early as possible to reduce possible request reordering.
In particular, it should be called before dmu_buf_hold_array_by_dnode()
calls dbuf_hold(), which may sleep waiting for indirect blocks, waking
up multiple threads same time on completion, that can significantly
reorder the requests, making the stream look like random.  But we
should not issue prefetch requests before the on-demand ones, since
they may get to the disks first despite the I/O scheduler, increading
on-demand request latency.

This patch splits dmu_zfetch() into two functions: dmu_zfetch_prepare()
and dmu_zfetch_run().  The first can be executed as early as needed.
It only updates statistics and makes predictions without issuing any
I/Os.  The I/O issuance is handled by dmu_zfetch_run(), which can be
called later when all on-demand I/Os are already issued.  It even
tracks the activity of other concurrent threads, issuing the prefetch
only when _all_ on-demand requests are issued.

For many years it was a big problem for storage servers, handling
deeper request queues from their clients, having to either serialize
consequential reads to make ZFS prefetcher usable, or execute the
incoming requests as-is and get almost no prefetch from ZFS, relying
only on deep enough prefetch by the clients.  Benefits of those ways
varied, but neither was perfect.  With this patch deeper queue
sequential read benchmarks with CrystalDiskMark from Windows via
iSCSI to FreeBSD target show me much better throughput with almost
100% prefetcher hit rate, comparing to almost zero before.

While there, I also removed per-stream zs_lock as useless, completely
covered by parent zf_lock.  Also I reused zs_blocks refcount to track
zf_stream linkage of the stream, since I believe previous zs_fetch ==
NULL check in dmu_zfetch_stream_done was racy.

Signed-off-by: Alexander Motin <[email protected]>
Sponsored-By: iXsystems, Inc.
  • Loading branch information
amotin committed Feb 25, 2021
1 parent 778fa36 commit 6f7ef50
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 83 deletions.
11 changes: 8 additions & 3 deletions include/sys/dmu_zfetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,21 @@ typedef struct zfetch {

typedef struct zstream {
uint64_t zs_blkid; /* expect next access at this blkid */
uint64_t zs_pf_blkid; /* next block to prefetch */
uint64_t zs_pf_blkid1; /* first block to prefetch */
uint64_t zs_pf_blkid; /* block to prefetch up to */

/*
* We will next prefetch the L1 indirect block of this level-0
* block id.
*/
uint64_t zs_ipf_blkid;
uint64_t zs_ipf_blkid1; /* first block to prefetch */
uint64_t zs_ipf_blkid; /* block to prefetch up to */

kmutex_t zs_lock; /* protects stream */
hrtime_t zs_atime; /* time last prefetch issued */
hrtime_t zs_start_time; /* start of last prefetch */
list_node_t zs_node; /* link for zf_stream */
zfetch_t *zs_fetch; /* parent fetch */
zfs_refcount_t zs_callers; /* number of pending callers */
zfs_refcount_t zs_blocks; /* number of pending blocks in the stream */
} zstream_t;

Expand All @@ -70,6 +72,9 @@ void zfetch_fini(void);

void dmu_zfetch_init(zfetch_t *, struct dnode *);
void dmu_zfetch_fini(zfetch_t *);
zstream_t * dmu_zfetch_prepare(zfetch_t *, uint64_t, uint64_t, boolean_t,
boolean_t);
void dmu_zfetch_run(zstream_t *, boolean_t);
void dmu_zfetch(zfetch_t *, uint64_t, uint64_t, boolean_t,
boolean_t);

Expand Down
15 changes: 10 additions & 5 deletions module/zfs/dmu.c
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
boolean_t read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
{
dmu_buf_t **dbp;
zstream_t *zs = NULL;
uint64_t blkid, nblks, i;
uint32_t dbuf_flags;
int err;
Expand Down Expand Up @@ -536,9 +537,16 @@ dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
zio = zio_root(dn->dn_objset->os_spa, NULL, NULL,
ZIO_FLAG_CANFAIL);
blkid = dbuf_whichblock(dn, 0, offset);
if ((flags & DMU_READ_NO_PREFETCH) == 0 &&
DNODE_META_IS_CACHEABLE(dn) && length <= zfetch_array_rd_sz) {
zs = dmu_zfetch_prepare(&dn->dn_zfetch, blkid, nblks,
read && DNODE_IS_CACHEABLE(dn), B_TRUE);
}
for (i = 0; i < nblks; i++) {
dmu_buf_impl_t *db = dbuf_hold(dn, blkid + i, tag);
if (db == NULL) {
if (zs)
dmu_zfetch_run(zs, B_TRUE);
rw_exit(&dn->dn_struct_rwlock);
dmu_buf_rele_array(dbp, nblks, tag);
if (read)
Expand All @@ -555,11 +563,8 @@ dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
if (!read)
zfs_racct_write(length, nblks);

if ((flags & DMU_READ_NO_PREFETCH) == 0 &&
DNODE_META_IS_CACHEABLE(dn) && length <= zfetch_array_rd_sz) {
dmu_zfetch(&dn->dn_zfetch, blkid, nblks,
read && DNODE_IS_CACHEABLE(dn), B_TRUE);
}
if (zs)
dmu_zfetch_run(zs, B_TRUE);
rw_exit(&dn->dn_struct_rwlock);

if (read) {
Expand Down
176 changes: 101 additions & 75 deletions module/zfs/dmu_zfetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,26 +129,18 @@ dmu_zfetch_init(zfetch_t *zf, dnode_t *dno)
static void
dmu_zfetch_stream_fini(zstream_t *zs)
{
mutex_destroy(&zs->zs_lock);
kmem_free(zs, sizeof (*zs));
}

static void
dmu_zfetch_stream_remove(zfetch_t *zf, zstream_t *zs)
{
ASSERT(MUTEX_HELD(&zf->zf_lock));
list_remove(&zf->zf_stream, zs);
dmu_zfetch_stream_fini(zs);
zf->zf_numstreams--;
}

static void
dmu_zfetch_stream_orphan(zfetch_t *zf, zstream_t *zs)
{
ASSERT(MUTEX_HELD(&zf->zf_lock));
list_remove(&zf->zf_stream, zs);
zs->zs_fetch = NULL;
zf->zf_numstreams--;
if (zfs_refcount_remove(&zs->zs_blocks, NULL) == 0)
dmu_zfetch_stream_fini(zs);
}

/*
Expand All @@ -161,12 +153,8 @@ dmu_zfetch_fini(zfetch_t *zf)
zstream_t *zs;

mutex_enter(&zf->zf_lock);
while ((zs = list_head(&zf->zf_stream)) != NULL) {
if (zfs_refcount_count(&zs->zs_blocks) != 0)
dmu_zfetch_stream_orphan(zf, zs);
else
dmu_zfetch_stream_remove(zf, zs);
}
while ((zs = list_head(&zf->zf_stream)) != NULL)
dmu_zfetch_stream_remove(zf, zs);
mutex_exit(&zf->zf_lock);
list_destroy(&zf->zf_stream);
mutex_destroy(&zf->zf_lock);
Expand Down Expand Up @@ -195,9 +183,9 @@ dmu_zfetch_stream_create(zfetch_t *zf, uint64_t blkid)
zs != NULL; zs = zs_next) {
zs_next = list_next(&zf->zf_stream, zs);
/*
* Skip gethrtime() call if there are still references
* Skip if still active. 1 -- zf_stream reference.
*/
if (zfs_refcount_count(&zs->zs_blocks) != 0)
if (zfs_refcount_count(&zs->zs_blocks) != 1)
continue;
if (((now - zs->zs_atime) / NANOSEC) >
zfetch_min_sec_reap)
Expand All @@ -222,12 +210,16 @@ dmu_zfetch_stream_create(zfetch_t *zf, uint64_t blkid)

zstream_t *zs = kmem_zalloc(sizeof (*zs), KM_SLEEP);
zs->zs_blkid = blkid;
zs->zs_pf_blkid1 = blkid;
zs->zs_pf_blkid = blkid;
zs->zs_ipf_blkid1 = blkid;
zs->zs_ipf_blkid = blkid;
zs->zs_atime = now;
zs->zs_fetch = zf;
zfs_refcount_create(&zs->zs_callers);
zfs_refcount_create(&zs->zs_blocks);
mutex_init(&zs->zs_lock, NULL, MUTEX_DEFAULT, NULL);
/* One reference for zf_stream. */
zfs_refcount_add(&zs->zs_blocks, NULL);
zf->zf_numstreams++;
list_insert_head(&zf->zf_stream, zs);
}
Expand All @@ -247,13 +239,7 @@ dmu_zfetch_stream_done(void *arg, boolean_t io_issued)
ZFETCHSTAT_SET(zfetchstat_max_completion_us, delta);
}

if (zfs_refcount_remove(&zs->zs_blocks, NULL) != 0)
return;

/*
* The parent fetch structure has gone away
*/
if (zs->zs_fetch == NULL)
if (zfs_refcount_remove(&zs->zs_blocks, NULL) == 0)
dmu_zfetch_stream_fini(zs);
}

Expand All @@ -265,20 +251,20 @@ dmu_zfetch_stream_done(void *arg, boolean_t io_issued)
* FALSE -- prefetch only indirect blocks for predicted data blocks;
* TRUE -- prefetch predicted data blocks plus following indirect blocks.
*/
void
dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
boolean_t have_lock)
zstream_t *
dmu_zfetch_prepare(zfetch_t *zf, uint64_t blkid, uint64_t nblks,
boolean_t fetch_data, boolean_t have_lock)
{
zstream_t *zs;
int64_t pf_start, ipf_start, ipf_istart, ipf_iend;
int64_t pf_start, ipf_start;
int64_t pf_ahead_blks, max_blks;
int epbs, max_dist_blks, pf_nblks, ipf_nblks, issued;
int max_dist_blks, pf_nblks, ipf_nblks;
uint64_t end_of_access_blkid;
end_of_access_blkid = blkid + nblks;
spa_t *spa = zf->zf_dnode->dn_objset->os_spa;

if (zfs_prefetch_disable)
return;
return (NULL);
/*
* If we haven't yet loaded the indirect vdevs' mappings, we
* can only read from blocks that we carefully ensure are on
Expand All @@ -287,14 +273,14 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
* blocks (e.g. of the MOS's dnode object).
*/
if (!spa_indirect_vdevs_loaded(spa))
return;
return (NULL);

/*
* As a fast path for small (single-block) files, ignore access
* to the first block.
*/
if (!have_lock && blkid == 0)
return;
return (NULL);

if (!have_lock)
rw_enter(&zf->zf_dnode->dn_struct_rwlock, RW_READER);
Expand All @@ -306,7 +292,7 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
if (zf->zf_dnode->dn_maxblkid < 2) {
if (!have_lock)
rw_exit(&zf->zf_dnode->dn_struct_rwlock);
return;
return (NULL);
}
mutex_enter(&zf->zf_lock);

Expand All @@ -317,30 +303,21 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
*/
for (zs = list_head(&zf->zf_stream); zs != NULL;
zs = list_next(&zf->zf_stream, zs)) {
if (blkid == zs->zs_blkid || blkid + 1 == zs->zs_blkid) {
mutex_enter(&zs->zs_lock);
/*
* zs_blkid could have changed before we
* acquired zs_lock; re-check them here.
*/
if (blkid == zs->zs_blkid) {
break;
} else if (blkid + 1 == zs->zs_blkid) {
blkid++;
nblks--;
if (nblks == 0) {
/* Already prefetched this before. */
mutex_exit(&zs->zs_lock);
mutex_exit(&zf->zf_lock);
if (!have_lock) {
rw_exit(&zf->zf_dnode->
dn_struct_rwlock);
}
return;
if (blkid == zs->zs_blkid) {
break;
} else if (blkid + 1 == zs->zs_blkid) {
blkid++;
nblks--;
if (nblks == 0) {
/* Already prefetched this before. */
mutex_exit(&zf->zf_lock);
if (!have_lock) {
rw_exit(&zf->zf_dnode->
dn_struct_rwlock);
}
break;
return (NULL);
}
mutex_exit(&zs->zs_lock);
break;
}
}

Expand All @@ -355,7 +332,7 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
mutex_exit(&zf->zf_lock);
if (!have_lock)
rw_exit(&zf->zf_dnode->dn_struct_rwlock);
return;
return (NULL);
}

/*
Expand All @@ -369,6 +346,8 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
* start just after the block we just accessed.
*/
pf_start = MAX(zs->zs_pf_blkid, end_of_access_blkid);
if (zs->zs_pf_blkid1 < end_of_access_blkid)
zs->zs_pf_blkid1 = end_of_access_blkid;

/*
* Double our amount of prefetched data, but don't let the
Expand Down Expand Up @@ -398,6 +377,8 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
* that point to them).
*/
ipf_start = MAX(zs->zs_ipf_blkid, zs->zs_pf_blkid);
if (zs->zs_ipf_blkid1 < zs->zs_pf_blkid)
zs->zs_ipf_blkid1 = zs->zs_pf_blkid;
max_dist_blks = zfetch_max_idistance >> zf->zf_dnode->dn_datablkshift;
/*
* We want to double our distance ahead of the data prefetch
Expand All @@ -411,45 +392,90 @@ dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
ipf_nblks = MIN(pf_ahead_blks, max_blks);
zs->zs_ipf_blkid = ipf_start + ipf_nblks;

epbs = zf->zf_dnode->dn_indblkshift - SPA_BLKPTRSHIFT;
ipf_istart = P2ROUNDUP(ipf_start, 1 << epbs) >> epbs;
ipf_iend = P2ROUNDUP(zs->zs_ipf_blkid, 1 << epbs) >> epbs;

zs->zs_atime = gethrtime();
/* no prior reads in progress */
if (zfs_refcount_count(&zs->zs_blocks) == 0)
/* Protect the stream from reclamation. 2 -- zf_stream + us. */
if (zfs_refcount_add(&zs->zs_blocks, NULL) == 2)
zs->zs_start_time = zs->zs_atime;
/* Count concurrent callers. */
zfs_refcount_add(&zs->zs_callers, NULL);
zs->zs_blkid = end_of_access_blkid;
zfs_refcount_add_many(&zs->zs_blocks, pf_nblks + ipf_iend - ipf_istart,
NULL);
mutex_exit(&zs->zs_lock);
mutex_exit(&zf->zf_lock);
issued = 0;

if (!have_lock)
rw_exit(&zf->zf_dnode->dn_struct_rwlock);

ZFETCHSTAT_BUMP(zfetchstat_hits);
return (zs);
}

void
dmu_zfetch_run(zstream_t *zs, boolean_t have_lock)
{
zfetch_t *zf = zs->zs_fetch;
int64_t pf_start, pf_end, ipf_start, ipf_end;
int epbs, issued;

/*
* dbuf_prefetch() is asynchronous (even when it needs to read
* indirect blocks), but we still prefer to drop our locks before
* calling it to reduce the time we hold them.
* Postpone the prefetch if there are more concurrent callers.
* It happens when multiple requests are waiting for the same
* indirect block. The last one will run the prefetch for all.
*/
if (zfs_refcount_remove(&zs->zs_callers, NULL) != 0) {
/* Drop reference taken in dmu_zfetch_prepare(). */
VERIFY3S(zfs_refcount_remove(&zs->zs_blocks, NULL), >, 0);
return;
}

for (int i = 0; i < pf_nblks; i++) {
issued += dbuf_prefetch_impl(zf->zf_dnode, 0, pf_start + i,
pf_end = zs->zs_pf_blkid;
pf_start = atomic_swap_64(&zs->zs_pf_blkid1, pf_end);
ipf_end = zs->zs_ipf_blkid;
ipf_start = atomic_swap_64(&zs->zs_ipf_blkid1, ipf_end);

if (!have_lock)
rw_enter(&zf->zf_dnode->dn_struct_rwlock, RW_READER);

epbs = zf->zf_dnode->dn_indblkshift - SPA_BLKPTRSHIFT;
ipf_start = P2ROUNDUP(ipf_start, 1 << epbs) >> epbs;
ipf_end = P2ROUNDUP(ipf_end, 1 << epbs) >> epbs;
issued = pf_end - pf_start + ipf_end - ipf_start;
if (issued > 1) {
/* More references on top of taken in dmu_zfetch_prepare(). */
zfs_refcount_add_many(&zs->zs_blocks, issued - 1, NULL);
} else if (issued == 0) {
/* Some other thread has done our work, so drop the ref. */
VERIFY3S(zfs_refcount_remove(&zs->zs_blocks, NULL), >, 0);
}

issued = 0;
for (int64_t blk = pf_start; blk < pf_end; blk++) {
issued += dbuf_prefetch_impl(zf->zf_dnode, 0, blk,
ZIO_PRIORITY_ASYNC_READ, ARC_FLAG_PREDICTIVE_PREFETCH,
dmu_zfetch_stream_done, zs);
}
for (int64_t iblk = ipf_istart; iblk < ipf_iend; iblk++) {
for (int64_t iblk = ipf_start; iblk < ipf_end; iblk++) {
issued += dbuf_prefetch_impl(zf->zf_dnode, 1, iblk,
ZIO_PRIORITY_ASYNC_READ, ARC_FLAG_PREDICTIVE_PREFETCH,
dmu_zfetch_stream_done, zs);
}

if (!have_lock)
rw_exit(&zf->zf_dnode->dn_struct_rwlock);
ZFETCHSTAT_BUMP(zfetchstat_hits);

if (issued)
ZFETCHSTAT_ADD(zfetchstat_io_issued, issued);
}

void
dmu_zfetch(zfetch_t *zf, uint64_t blkid, uint64_t nblks, boolean_t fetch_data,
boolean_t have_lock)
{
zstream_t *zs;

zs = dmu_zfetch_prepare(zf, blkid, nblks, fetch_data, have_lock);
if (zs)
dmu_zfetch_run(zs, have_lock);
}

/* BEGIN CSTYLED */
ZFS_MODULE_PARAM(zfs_prefetch, zfs_prefetch_, disable, INT, ZMOD_RW,
"Disable all ZFS prefetching");
Expand Down

0 comments on commit 6f7ef50

Please sign in to comment.