Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve zfs receive performance by batching writes #10099

Merged
merged 1 commit into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions man/man5/zfs-module-parameters.5
Original file line number Diff line number Diff line change
Expand Up @@ -2995,6 +2995,20 @@ must be at least twice the maximum block size in use.
Default value: \fB16,777,216\fR.
.RE

.sp
.ne 2
.na
\fBzfs_recv_write_batch_size\fR (int)
ahrens marked this conversation as resolved.
Show resolved Hide resolved
.ad
.RS 12n
The maximum amount of data (in bytes) that \fBzfs receive\fR will write in
one DMU transaction. This is the uncompressed size, even when receiving a
compressed send stream. This setting will not reduce the write size below
a single block. Capped at a maximum of 32MB
.sp
Default value: \fB1MB\fR.
.RE

.sp
.ne 2
.na
Expand Down
219 changes: 168 additions & 51 deletions module/zfs/dmu_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

int zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
int zfs_recv_queue_ff = 20;
int zfs_recv_write_batch_size = 1024 * 1024;

static char *dmu_recv_tag = "dmu_recv_tag";
const char *recv_clone_name = "%recv";
Expand Down Expand Up @@ -110,6 +111,8 @@ struct receive_writer_arg {
uint64_t max_object; /* highest object ID referenced in stream */
uint64_t bytes_read; /* bytes read when current record created */

list_t write_batch;

/* Encryption parameters for the last received DRR_OBJECT_RANGE */
boolean_t or_crypt_params_present;
uint64_t or_firstobj;
Expand Down Expand Up @@ -1698,13 +1701,108 @@ receive_freeobjects(struct receive_writer_arg *rwa,
return (0);
}

noinline static int
receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
arc_buf_t *abuf)
/*
* Note: if this fails, the caller will clean up any records left on the
* rwa->write_batch list.
*/
static int
flush_write_batch_impl(struct receive_writer_arg *rwa)
{
int err;
dmu_tx_t *tx;
dnode_t *dn;
int err;

if (dnode_hold(rwa->os, rwa->last_object, FTAG, &dn) != 0)
return (SET_ERROR(EINVAL));

struct receive_record_arg *last_rrd = list_tail(&rwa->write_batch);
struct drr_write *last_drrw = &last_rrd->header.drr_u.drr_write;

struct receive_record_arg *first_rrd = list_head(&rwa->write_batch);
struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write;

ASSERT3U(rwa->last_object, ==, last_drrw->drr_object);
ASSERT3U(rwa->last_offset, ==, last_drrw->drr_offset);

dmu_tx_t *tx = dmu_tx_create(rwa->os);
dmu_tx_hold_write_by_dnode(tx, dn, first_drrw->drr_offset,
last_drrw->drr_offset - first_drrw->drr_offset +
last_drrw->drr_logical_size);
err = dmu_tx_assign(tx, TXG_WAIT);
if (err != 0) {
dmu_tx_abort(tx);
dnode_rele(dn, FTAG);
return (err);
}

struct receive_record_arg *rrd;
while ((rrd = list_head(&rwa->write_batch)) != NULL) {
struct drr_write *drrw = &rrd->header.drr_u.drr_write;
arc_buf_t *abuf = rrd->arc_buf;

ASSERT3U(drrw->drr_object, ==, rwa->last_object);

if (rwa->byteswap && !arc_is_encrypted(abuf) &&
arc_get_compression(abuf) == ZIO_COMPRESS_OFF) {
dmu_object_byteswap_t byteswap =
DMU_OT_BYTESWAP(drrw->drr_type);
dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
DRR_WRITE_PAYLOAD_SIZE(drrw));
}

err = dmu_assign_arcbuf_by_dnode(dn,
drrw->drr_offset, abuf, tx);
if (err != 0) {
/*
* This rrd is left on the list, so the caller will
* free it (and the arc_buf).
*/
break;
}

/*
* Note: If the receive fails, we want the resume stream to
* start with the same record that we last successfully
* received (as opposed to the next record), so that we can
* verify that we are resuming from the correct location.
*/
save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);

list_remove(&rwa->write_batch, rrd);
kmem_free(rrd, sizeof (*rrd));
}

dmu_tx_commit(tx);
dnode_rele(dn, FTAG);
return (err);
}

noinline static int
flush_write_batch(struct receive_writer_arg *rwa)
{
if (list_is_empty(&rwa->write_batch))
return (0);
int err = rwa->err;
if (err == 0)
err = flush_write_batch_impl(rwa);
if (err != 0) {
struct receive_record_arg *rrd;
while ((rrd = list_remove_head(&rwa->write_batch)) != NULL) {
dmu_return_arcbuf(rrd->arc_buf);
kmem_free(rrd, sizeof (*rrd));
}
}
ASSERT(list_is_empty(&rwa->write_batch));
return (err);
}

noinline static int
receive_process_write_record(struct receive_writer_arg *rwa,
struct receive_record_arg *rrd)
{
int err = 0;

ASSERT3U(rrd->header.drr_type, ==, DRR_WRITE);
struct drr_write *drrw = &rrd->header.drr_u.drr_write;

if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset ||
!DMU_OT_IS_VALID(drrw->drr_type))
Expand All @@ -1719,52 +1817,31 @@ receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
drrw->drr_offset < rwa->last_offset)) {
return (SET_ERROR(EINVAL));
}

struct receive_record_arg *first_rrd = list_head(&rwa->write_batch);
struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write;
uint64_t batch_size =
MIN(zfs_recv_write_batch_size, DMU_MAX_ACCESS / 2);
if (first_rrd != NULL &&
(drrw->drr_object != first_drrw->drr_object ||
drrw->drr_offset >= first_drrw->drr_offset + batch_size)) {
err = flush_write_batch(rwa);
if (err != 0)
return (err);
}

rwa->last_object = drrw->drr_object;
rwa->last_offset = drrw->drr_offset;

if (rwa->last_object > rwa->max_object)
rwa->max_object = rwa->last_object;

if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
return (SET_ERROR(EINVAL));

tx = dmu_tx_create(rwa->os);
dmu_tx_hold_write(tx, drrw->drr_object,
drrw->drr_offset, drrw->drr_logical_size);
err = dmu_tx_assign(tx, TXG_WAIT);
if (err != 0) {
dmu_tx_abort(tx);
return (err);
}

if (rwa->byteswap && !arc_is_encrypted(abuf) &&
arc_get_compression(abuf) == ZIO_COMPRESS_OFF) {
dmu_object_byteswap_t byteswap =
DMU_OT_BYTESWAP(drrw->drr_type);
dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
DRR_WRITE_PAYLOAD_SIZE(drrw));
}

/* use the bonus buf to look up the dnode in dmu_assign_arcbuf */
VERIFY0(dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn));
err = dmu_assign_arcbuf_by_dnode(dn, drrw->drr_offset, abuf, tx);
if (err != 0) {
dnode_rele(dn, FTAG);
dmu_tx_commit(tx);
return (err);
}
dnode_rele(dn, FTAG);

list_insert_tail(&rwa->write_batch, rrd);
/*
* Note: If the receive fails, we want the resume stream to start
* with the same record that we last successfully received (as opposed
* to the next record), so that we can verify that we are
* resuming from the correct location.
* Return EAGAIN to indicate that we will use this rrd again,
* so the caller should not free it
*/
save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
dmu_tx_commit(tx);

return (0);
return (EAGAIN);
}

/*
Expand Down Expand Up @@ -2482,6 +2559,22 @@ receive_process_record(struct receive_writer_arg *rwa,
ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
rwa->bytes_read = rrd->bytes_read;

if (rrd->header.drr_type != DRR_WRITE) {
err = flush_write_batch(rwa);
if (err != 0) {
if (rrd->arc_buf != NULL) {
dmu_return_arcbuf(rrd->arc_buf);
rrd->arc_buf = NULL;
rrd->payload = NULL;
} else if (rrd->payload != NULL) {
kmem_free(rrd->payload, rrd->payload_size);
rrd->payload = NULL;
}

return (err);
}
}

switch (rrd->header.drr_type) {
case DRR_OBJECT:
{
Expand All @@ -2500,13 +2593,17 @@ receive_process_record(struct receive_writer_arg *rwa,
}
case DRR_WRITE:
{
struct drr_write *drrw = &rrd->header.drr_u.drr_write;
err = receive_write(rwa, drrw, rrd->arc_buf);
/* if receive_write() is successful, it consumes the arc_buf */
if (err != 0)
err = receive_process_write_record(rwa, rrd);
if (err != EAGAIN) {
/*
* On success, receive_process_write_record() returns
* EAGAIN to indicate that we do not want to free
* the rrd or arc_buf.
*/
ASSERT(err != 0);
dmu_return_arcbuf(rrd->arc_buf);
rrd->arc_buf = NULL;
rrd->payload = NULL;
rrd->arc_buf = NULL;
}
break;
}
case DRR_WRITE_BYREF:
Expand Down Expand Up @@ -2582,8 +2679,9 @@ receive_writer_thread(void *arg)
* on the queue, but we need to clear everything in it before we
* can exit.
*/
int err = 0;
if (rwa->err == 0) {
rwa->err = receive_process_record(rwa, rrd);
err = receive_process_record(rwa, rrd);
} else if (rrd->arc_buf != NULL) {
dmu_return_arcbuf(rrd->arc_buf);
rrd->arc_buf = NULL;
Expand All @@ -2592,9 +2690,22 @@ receive_writer_thread(void *arg)
kmem_free(rrd->payload, rrd->payload_size);
rrd->payload = NULL;
}
kmem_free(rrd, sizeof (*rrd));
/*
* EAGAIN indicates that this record has been saved (on
* raw->write_batch), and will be used again, so we don't
* free it.
*/
if (err != EAGAIN) {
ahrens marked this conversation as resolved.
Show resolved Hide resolved
rwa->err = err;
kmem_free(rrd, sizeof (*rrd));
}
}
kmem_free(rrd, sizeof (*rrd));

int err = flush_write_batch(rwa);
if (rwa->err == 0)
rwa->err = err;

mutex_enter(&rwa->mutex);
rwa->done = B_TRUE;
cv_signal(&rwa->cv);
Expand Down Expand Up @@ -2759,6 +2870,8 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, int cleanup_fd,
rwa->raw = drc->drc_raw;
rwa->spill = drc->drc_spill;
rwa->os->os_raw_receive = drc->drc_raw;
list_create(&rwa->write_batch, sizeof (struct receive_record_arg),
offsetof(struct receive_record_arg, node.bqn_node));

(void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc,
TS_RUN, minclsyspri);
Expand Down Expand Up @@ -2845,6 +2958,7 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, int cleanup_fd,
cv_destroy(&rwa->cv);
mutex_destroy(&rwa->mutex);
bqueue_destroy(&rwa->q);
list_destroy(&rwa->write_batch);
if (err == 0)
err = rwa->err;

Expand Down Expand Up @@ -3236,4 +3350,7 @@ ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, INT, ZMOD_RW,

ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, INT, ZMOD_RW,
"Receive queue fill fraction");

ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, INT, ZMOD_RW,
"Maximum amount of writes to batch into one transaction");
/* END CSTYLED */