diff --git a/module/zfs/zio.c b/module/zfs/zio.c
index a85eaeb5c6..4062352af8 100644
--- a/module/zfs/zio.c
+++ b/module/zfs/zio.c
@@ -63,7 +63,8 @@ uint8_t zio_priority_table[ZIO_PRIORITY_TABLE_SIZE] = {
  * ==========================================================================
  */
 char *zio_type_name[ZIO_TYPES] = {
-	"z_null", "z_rd", "z_wr", "z_fr", "z_cl", "z_ioctl"
+	"zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
+	"zio_ioctl"
 };
 
 /*
@@ -73,24 +74,37 @@ char *zio_type_name[ZIO_TYPES] = {
  */
 kmem_cache_t *zio_cache;
 kmem_cache_t *zio_link_cache;
-kmem_cache_t *zio_vdev_cache;
 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
-int zio_bulk_flags = 0;
-int zio_delay_max = ZIO_DELAY_MAX;
 
 #ifdef _KERNEL
 extern vmem_t *zio_alloc_arena;
 #endif
 extern int zfs_mg_alloc_failures;
 
+/*
+ * The following actions directly effect the spa's sync-to-convergence logic.
+ * The values below define the sync pass when we start performing the action.
+ * Care should be taken when changing these values as they directly impact
+ * spa_sync() performance. Tuning these values may introduce subtle performance
+ * pathologies and should only be done in the context of performance analysis.
+ * These tunables will eventually be removed and replaced with #defines once
+ * enough analysis has been done to determine optimal values.
+ *
+ * The 'zfs_sync_pass_deferred_free' pass must be greater than 1 to ensure that
+ * regular blocks are not deferred.
+ */
+int zfs_sync_pass_deferred_free = 2; /* defer frees starting in this pass */
+int zfs_sync_pass_dont_compress = 5; /* don't compress starting in this pass */
+int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
+
 /*
  * An allocating zio is one that either currently has the DVA allocate
  * stage set or will have it later in its lifetime.
  */
 #define	IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
 
-int zio_requeue_io_start_cut_in_line = 1;
+boolean_t	zio_requeue_io_start_cut_in_line = B_TRUE;
 
 #ifdef ZFS_DEBUG
 int zio_buf_debug_limit = 16384;
@@ -98,37 +112,6 @@ int zio_buf_debug_limit = 16384;
 int zio_buf_debug_limit = 0;
 #endif
 
-static inline void __zio_execute(zio_t *zio);
-
-static int
-zio_cons(void *arg, void *unused, int kmflag)
-{
-	zio_t *zio = arg;
-
-	bzero(zio, sizeof (zio_t));
-
-	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
-	cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
-
-	list_create(&zio->io_parent_list, sizeof (zio_link_t),
-	    offsetof(zio_link_t, zl_parent_node));
-	list_create(&zio->io_child_list, sizeof (zio_link_t),
-	    offsetof(zio_link_t, zl_child_node));
-
-	return (0);
-}
-
-static void
-zio_dest(void *arg, void *unused)
-{
-	zio_t *zio = arg;
-
-	mutex_destroy(&zio->io_lock);
-	cv_destroy(&zio->io_cv);
-	list_destroy(&zio->io_parent_list);
-	list_destroy(&zio->io_child_list);
-}
-
 void
 zio_init(void)
 {
@@ -138,12 +121,10 @@ zio_init(void)
 #ifdef _KERNEL
 	data_alloc_arena = zio_alloc_arena;
 #endif
-	zio_cache = kmem_cache_create("zio_cache", sizeof (zio_t), 0,
-	    zio_cons, zio_dest, NULL, NULL, NULL, KMC_KMEM);
+	zio_cache = kmem_cache_create("zio_cache",
+	    sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
 	zio_link_cache = kmem_cache_create("zio_link_cache",
-	    sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, KMC_KMEM);
-	zio_vdev_cache = kmem_cache_create("zio_vdev_cache", sizeof(vdev_io_t),
-	    PAGESIZE, NULL, NULL, NULL, NULL, NULL, KMC_VMEM);
+	    sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
 
 	/*
 	 * For small buffers, we want a cache for each multiple of
@@ -155,41 +136,44 @@ zio_init(void)
 		size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
 		size_t p2 = size;
 		size_t align = 0;
+		size_t cflags = (size > zio_buf_debug_limit) ? KMC_NODEBUG : 0;
 
 		while (p2 & (p2 - 1))
 			p2 &= p2 - 1;
 
+#ifndef _KERNEL
+		/*
+		 * If we are using watchpoints, put each buffer on its own page,
+		 * to eliminate the performance overhead of trapping to the
+		 * kernel when modifying a non-watched buffer that shares the
+		 * page with a watched buffer.
+		 */
+		if (arc_watch && !IS_P2ALIGNED(size, PAGESIZE))
+			continue;
+#endif
 		if (size <= 4 * SPA_MINBLOCKSIZE) {
 			align = SPA_MINBLOCKSIZE;
-		} else if (P2PHASE(size, PAGESIZE) == 0) {
+		} else if (IS_P2ALIGNED(size, PAGESIZE)) {
 			align = PAGESIZE;
-		} else if (P2PHASE(size, p2 >> 2) == 0) {
+		} else if (IS_P2ALIGNED(size, p2 >> 2)) {
 			align = p2 >> 2;
 		}
 
 		if (align != 0) {
 			char name[36];
-			int flags = zio_bulk_flags;
+			(void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
+			zio_buf_cache[c] = kmem_cache_create(name, size,
+			    align, NULL, NULL, NULL, NULL, NULL, cflags);
 
 			/*
-			 * The smallest buffers (512b) are heavily used and
-			 * experience a lot of churn.  The slabs allocated
-			 * for them are also relatively small (32K).  Thus
-			 * in over to avoid expensive calls to vmalloc() we
-			 * make an exception to the usual slab allocation
-			 * policy and force these buffers to be kmem backed.
+			 * Since zio_data bufs do not appear in crash dumps, we
+			 * pass KMC_NOTOUCH so that no allocator metadata is
+			 * stored with the buffers.
 			 */
-			if (size == (1 << SPA_MINBLOCKSHIFT))
-				flags |= KMC_KMEM;
-
-			(void) snprintf(name, sizeof(name), "zio_buf_%lu", (ulong_t)size);
-			zio_buf_cache[c] = kmem_cache_create(name, size,
-			    align, NULL, NULL, NULL, NULL, NULL, flags);
-
-			(void) snprintf(name, sizeof(name), "zio_data_buf_%lu", (ulong_t)size);
+			(void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
 			zio_data_buf_cache[c] = kmem_cache_create(name, size,
-			    align, NULL, NULL, NULL, NULL,
-			    data_alloc_arena, flags);
+			    align, NULL, NULL, NULL, NULL, data_alloc_arena,
+			    cflags | KMC_NOTOUCH);
 		}
 	}
 
@@ -233,7 +217,6 @@ zio_fini(void)
 		zio_data_buf_cache[c] = NULL;
 	}
 
-	kmem_cache_destroy(zio_vdev_cache);
 	kmem_cache_destroy(zio_link_cache);
 	kmem_cache_destroy(zio_cache);
 
@@ -259,7 +242,7 @@ zio_buf_alloc(size_t size)
 
 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
-	return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE | KM_NODEBUG));
+	return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
 }
 
 /*
@@ -275,8 +258,7 @@ zio_data_buf_alloc(size_t size)
 
 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
 
-	return (kmem_cache_alloc(zio_data_buf_cache[c],
-	    KM_PUSHPAGE | KM_NODEBUG));
+	return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
 }
 
 void
@@ -299,24 +281,6 @@ zio_data_buf_free(void *buf, size_t size)
 	kmem_cache_free(zio_data_buf_cache[c], buf);
 }
 
-/*
- * Dedicated I/O buffers to ensure that memory fragmentation never prevents
- * or significantly delays the issuing of a zio.   These buffers are used
- * to aggregate I/O and could be used for raidz stripes.
- */
-void *
-zio_vdev_alloc(void)
-{
-	return (kmem_cache_alloc(zio_vdev_cache, KM_PUSHPAGE));
-}
-
-void
-zio_vdev_free(void *buf)
-{
-	kmem_cache_free(zio_vdev_cache, buf);
-
-}
-
 /*
  * ==========================================================================
  * Push and pop I/O transform buffers
@@ -326,7 +290,7 @@ static void
 zio_push_transform(zio_t *zio, void *data, uint64_t size, uint64_t bufsize,
 	zio_transform_func_t *transform)
 {
-	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_PUSHPAGE);
+	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
 
 	zt->zt_orig_data = zio->io_data;
 	zt->zt_orig_size = zio->io_size;
@@ -441,9 +405,8 @@ zio_unique_parent(zio_t *cio)
 void
 zio_add_child(zio_t *pio, zio_t *cio)
 {
-	zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_PUSHPAGE);
+	zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
 	int w;
-
 	/*
 	 * Logical I/Os can have logical, gang, or vdev children.
 	 * Gang I/Os can have gang or vdev children.
@@ -505,15 +468,14 @@ zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
 	if (*countp != 0) {
 		zio->io_stage >>= 1;
 		zio->io_stall = countp;
-        waiting = B_TRUE;
+		waiting = B_TRUE;
 	}
 	mutex_exit(&zio->io_lock);
 
 	return (waiting);
 }
 
-__attribute__((always_inline))
-static inline void
+static void
 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
 {
 	uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
@@ -527,7 +489,7 @@ zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
 	if (--*countp == 0 && pio->io_stall == countp) {
 		pio->io_stall = NULL;
 		mutex_exit(&pio->io_lock);
-		__zio_execute(pio);
+		zio_execute(pio);
 	} else {
 		mutex_exit(&pio->io_lock);
 	}
@@ -562,7 +524,16 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 	ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
 	ASSERT(vd || stage == ZIO_STAGE_OPEN);
 
-	zio = kmem_cache_alloc(zio_cache, KM_PUSHPAGE);
+	zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
+	bzero(zio, sizeof (zio_t));
+
+	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
+	cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
+
+	list_create(&zio->io_parent_list, sizeof (zio_link_t),
+	    offsetof(zio_link_t, zl_parent_node));
+	list_create(&zio->io_child_list, sizeof (zio_link_t),
+	    offsetof(zio_link_t, zl_child_node));
 
 	if (vd != NULL)
 		zio->io_child_type = ZIO_CHILD_VDEV;
@@ -574,7 +545,6 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 		zio->io_child_type = ZIO_CHILD_LOGICAL;
 
 	if (bp != NULL) {
-		zio->io_logical = NULL;
 		zio->io_bp = (blkptr_t *)bp;
 		zio->io_bp_copy = *bp;
 		zio->io_bp_orig = *bp;
@@ -585,52 +555,21 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 			zio->io_logical = zio;
 		if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
 			pipeline |= ZIO_GANG_STAGES;
-	} else {
-		zio->io_logical = NULL;
-		zio->io_bp = NULL;
-		bzero(&zio->io_bp_copy, sizeof (blkptr_t));
-		bzero(&zio->io_bp_orig, sizeof (blkptr_t));
 	}
 
 	zio->io_spa = spa;
 	zio->io_txg = txg;
-	zio->io_ready = NULL;
 	zio->io_done = done;
 	zio->io_private = private;
-	zio->io_prev_space_delta = 0;
 	zio->io_type = type;
 	zio->io_priority = priority;
 	zio->io_vd = vd;
-	zio->io_vsd = NULL;
-	zio->io_vsd_ops = NULL;
 	zio->io_offset = offset;
-	zio->io_deadline = 0;
 	zio->io_orig_data = zio->io_data = data;
 	zio->io_orig_size = zio->io_size = size;
 	zio->io_orig_flags = zio->io_flags = flags;
 	zio->io_orig_stage = zio->io_stage = stage;
 	zio->io_orig_pipeline = zio->io_pipeline = pipeline;
-	bzero(&zio->io_prop, sizeof (zio_prop_t));
-	zio->io_cmd = 0;
-	zio->io_reexecute = 0;
-	zio->io_bp_override = NULL;
-	zio->io_walk_link = NULL;
-	zio->io_transform_stack = NULL;
-	zio->io_delay = 0;
-	zio->io_error = 0;
-	zio->io_child_count = 0;
-	zio->io_parent_count = 0;
-	zio->io_stall = NULL;
-	zio->io_gang_leader = NULL;
-	zio->io_gang_tree = NULL;
-	zio->io_executor = NULL;
-	zio->io_waiter = NULL;
-	zio->io_cksum_report = NULL;
-	zio->io_ena = 0;
-	bzero(zio->io_child_error, sizeof (int) * ZIO_CHILD_TYPES);
-	bzero(zio->io_children,
-	    sizeof (uint64_t) * ZIO_CHILD_TYPES * ZIO_WAIT_TYPES);
-	bzero(&zio->io_bookmark, sizeof (zbookmark_t));
 
 	zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
 	zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
@@ -646,14 +585,16 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 		zio_add_child(pio, zio);
 	}
 
-	taskq_init_ent(&zio->io_tqent);
-
 	return (zio);
 }
 
 static void
 zio_destroy(zio_t *zio)
 {
+	list_destroy(&zio->io_parent_list);
+	list_destroy(&zio->io_child_list);
+	mutex_destroy(&zio->io_lock);
+	cv_destroy(&zio->io_cv);
 	kmem_cache_free(zio_cache, zio);
 }
 
@@ -707,9 +648,7 @@ zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
 	    DMU_OT_IS_VALID(zp->zp_type) &&
 	    zp->zp_level < 32 &&
 	    zp->zp_copies > 0 &&
-	    zp->zp_copies <= spa_max_replication(spa) &&
-	    zp->zp_dedup <= 1 &&
-	    zp->zp_dedup_verify <= 1);
+	    zp->zp_copies <= spa_max_replication(spa));
 
 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
 	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
@@ -737,13 +676,20 @@ zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
 }
 
 void
-zio_write_override(zio_t *zio, blkptr_t *bp, int copies)
+zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
 {
 	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
 	ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
 
+	/*
+	 * We must reset the io_prop to match the values that existed
+	 * when the bp was first written by dmu_sync() keeping in mind
+	 * that nopwrite and dedup are mutually exclusive.
+	 */
+	zio->io_prop.zp_dedup = nopwrite ? B_FALSE : zio->io_prop.zp_dedup;
+	zio->io_prop.zp_nopwrite = nopwrite;
 	zio->io_prop.zp_copies = copies;
 	zio->io_bp_override = bp;
 }
@@ -751,6 +697,7 @@ zio_write_override(zio_t *zio, blkptr_t *bp, int copies)
 void
 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
 {
+	metaslab_check_free(spa, bp);
 	bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
 }
 
@@ -765,7 +712,9 @@ zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
 
 	ASSERT(!BP_IS_HOLE(bp));
 	ASSERT(spa_syncing_txg(spa) == txg);
-	ASSERT(spa_sync_pass(spa) <= SYNC_PASS_DEFERRED_FREE);
+	ASSERT(spa_sync_pass(spa) < zfs_sync_pass_deferred_free);
+
+	metaslab_check_free(spa, bp);
 
 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
 	    NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_FREE, flags,
@@ -1016,9 +965,8 @@ zio_write_bp_init(zio_t *zio)
 	 * wait for them and then repeat this pipeline stage.
 	 */
 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
-	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY)) {
+	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
 		return (ZIO_PIPELINE_STOP);
-    }
 
 	if (!IO_IS_ALLOCATING(zio))
 		return (ZIO_PIPELINE_CONTINUE);
@@ -1032,6 +980,19 @@ zio_write_bp_init(zio_t *zio)
 		*bp = *zio->io_bp_override;
 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+		/*
+		 * If we've been overridden and nopwrite is set then
+		 * set the flag accordingly to indicate that a nopwrite
+		 * has already occurred.
+		 */
+		if (!BP_IS_HOLE(bp) && zp->zp_nopwrite) {
+			ASSERT(!zp->zp_dedup);
+			zio->io_flags |= ZIO_FLAG_NOPWRITE;
+			return (ZIO_PIPELINE_CONTINUE);
+		}
+
+		ASSERT(!zp->zp_nopwrite);
+
 		if (BP_IS_HOLE(bp) || !zp->zp_dedup)
 			return (ZIO_PIPELINE_CONTINUE);
 
@@ -1063,7 +1024,7 @@ zio_write_bp_init(zio_t *zio)
 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
 		ASSERT(!BP_GET_DEDUP(bp));
 
-		if (pass > SYNC_PASS_DONT_COMPRESS)
+		if (pass >= zfs_sync_pass_dont_compress)
 			compress = ZIO_COMPRESS_OFF;
 
 		/* Make sure someone doesn't change their mind on overwrites */
@@ -1092,9 +1053,9 @@ zio_write_bp_init(zio_t *zio)
 	 * There should only be a handful of blocks after pass 1 in any case.
 	 */
 	if (bp->blk_birth == zio->io_txg && BP_GET_PSIZE(bp) == psize &&
-	    pass > SYNC_PASS_REWRITE) {
-		enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
+	    pass >= zfs_sync_pass_rewrite) {
 		ASSERT(psize != 0);
+		enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
 		zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
 		zio->io_flags |= ZIO_FLAG_IO_REWRITE;
 	} else {
@@ -1119,7 +1080,13 @@ zio_write_bp_init(zio_t *zio)
 			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
 			zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
 		}
+		if (zp->zp_nopwrite) {
+			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
+			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
+			zio->io_pipeline |= ZIO_STAGE_NOP_WRITE;
+		}
 	}
+
 	return (ZIO_PIPELINE_CONTINUE);
 }
 
@@ -1143,7 +1110,7 @@ zio_free_bp_init(zio_t *zio)
  */
 
 static void
-zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
+zio_taskq_dispatch(zio_t *zio, zio_taskq_type_t q, boolean_t cutinline)
 {
 	spa_t *spa = zio->io_spa;
 	zio_type_t t = zio->io_type;
@@ -1164,10 +1131,11 @@ zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
 		t = ZIO_TYPE_NULL;
 
 	/*
-	 * If this is a high priority I/O, then use the high priority taskq.
+	 * If this is a high priority I/O, then use the high priority taskq if
+	 * available.
 	 */
 	if (zio->io_priority == ZIO_PRIORITY_NOW &&
-	    spa->spa_zio_taskq[t][q + 1] != NULL)
+	    spa->spa_zio_taskq[t][q + 1].stqs_count != 0)
 		q++;
 
 	ASSERT3U(q, <, ZIO_TASKQ_TYPES);
@@ -1177,25 +1145,26 @@ zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
 	 * to a single taskq at a time.  It would be a grievous error
 	 * to dispatch the zio to another taskq at the same time.
 	 */
-	ASSERT(taskq_empty_ent(&zio->io_tqent));
-	//taskq_dispatch_ent(spa->spa_zio_taskq[t][q],
-    //   (task_func_t *)zio_execute, zio, flags, &zio->io_tqent);
-    taskq_ent_t *tqe;
-	tqe = (taskq_ent_t *)taskq_dispatch(spa->spa_zio_taskq[t][q],
-                                        (task_func_t *)zio_execute, zio, flags);
-    memcpy(&zio->io_tqent, tqe, sizeof(zio->io_tqent));
+	ASSERT(zio->io_tqent.tqent_next == NULL);
+	spa_taskq_dispatch_ent(spa, t, q, (task_func_t *)zio_execute, zio,
+	    flags, &zio->io_tqent);
 }
 
 static boolean_t
-zio_taskq_member(zio_t *zio, enum zio_taskq_type q)
+zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
 {
 	kthread_t *executor = zio->io_executor;
 	spa_t *spa = zio->io_spa;
-	zio_type_t t;
-
-	for (t = 0; t < ZIO_TYPES; t++)
-		if (taskq_member(spa->spa_zio_taskq[t][q], executor))
-			return (B_TRUE);
+    zio_type_t t;
+
+	for (t = 0; t < ZIO_TYPES; t++) {
+		spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
+		uint_t i;
+		for (i = 0; i < tqs->stqs_count; i++) {
+			if (taskq_member(tqs->stqs_taskq[i], executor))
+				return (B_TRUE);
+		}
+	}
 
 	return (B_FALSE);
 }
@@ -1229,32 +1198,14 @@ zio_interrupt(zio_t *zio)
  */
 static zio_pipe_stage_t *zio_pipeline[];
 
-/*
- * zio_execute() is a wrapper around the static function
- * __zio_execute() so that we can force  __zio_execute() to be
- * inlined.  This reduces stack overhead which is important
- * because __zio_execute() is called recursively in several zio
- * code paths.  zio_execute() itself cannot be inlined because
- * it is externally visible.
- */
 void
 zio_execute(zio_t *zio)
-{
-	__zio_execute(zio);
-}
-
-__attribute__((always_inline))
-static inline void
-__zio_execute(zio_t *zio)
 {
 	zio->io_executor = curthread;
 
-
 	while (zio->io_stage < ZIO_STAGE_DONE) {
 		enum zio_stage pipeline = zio->io_pipeline;
 		enum zio_stage stage = zio->io_stage;
-		dsl_pool_t *dp;
-		boolean_t cut;
 		int rv;
 
 		ASSERT(!MUTEX_HELD(&zio->io_lock));
@@ -1267,10 +1218,6 @@ __zio_execute(zio_t *zio)
 
 		ASSERT(stage <= ZIO_STAGE_DONE);
 
-		dp = spa_get_dsl(zio->io_spa);
-		cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
-		    zio_requeue_io_start_cut_in_line : B_FALSE;
-
 		/*
 		 * If we are in interrupt context and this pipeline stage
 		 * will grab a config lock that is held across I/O,
@@ -1282,19 +1229,8 @@ __zio_execute(zio_t *zio)
 		 */
 		if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
 		    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
-			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
-			return;
-		}
-
-		/*
-		 * If we executing in the context of the tx_sync_thread,
-		 * or we are performing pool initialization outside of a
-		 * zio_taskq[ZIO_TASKQ_ISSUE] context.  Then issue the zio
-		 * async to minimize stack usage for these deep call paths.
-		 */
-		if ((dp && curthread == dp->dp_tx.tx_sync_thread) ||
-		    (dp && spa_is_initializing(dp->dp_spa) &&
-		    !zio_taskq_member(zio, ZIO_TASKQ_ISSUE))) {
+			boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
+			    zio_requeue_io_start_cut_in_line : B_FALSE;
 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
 			return;
 		}
@@ -1309,7 +1245,6 @@ __zio_execute(zio_t *zio)
 	}
 }
 
-
 /*
  * ==========================================================================
  * Initiate I/O, either sync or async
@@ -1318,34 +1253,18 @@ __zio_execute(zio_t *zio)
 int
 zio_wait(zio_t *zio)
 {
-	uint64_t timeout;
 	int error;
 
 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
 	ASSERT(zio->io_executor == NULL);
 
 	zio->io_waiter = curthread;
-	timeout = ddi_get_lbolt() + (zio_delay_max / MILLISEC * hz);
 
-	__zio_execute(zio);
+	zio_execute(zio);
 
 	mutex_enter(&zio->io_lock);
-	while (zio->io_executor != NULL) {
-		/*
-		 * Wake up periodically to prevent the kernel from complaining
-		 * about a blocked task.  However, check zio_delay_max to see
-		 * if the I/O has exceeded the timeout and post an ereport.
-		 */
-		cv_timedwait_interruptible(&zio->io_cv, &zio->io_lock,
-		    ddi_get_lbolt() + hz);
-
-		if (timeout && (ddi_get_lbolt() > timeout)) {
-			zio->io_delay = zio_delay_max;
-			zfs_ereport_post(FM_EREPORT_ZFS_DELAY,
-			    zio->io_spa, zio->io_vd, zio, 0, 0);
-			timeout = 0;
-		}
-	}
+	while (zio->io_executor != NULL)
+		cv_wait(&zio->io_cv, &zio->io_lock);
 	mutex_exit(&zio->io_lock);
 
 	error = zio->io_error;
@@ -1371,7 +1290,7 @@ zio_nowait(zio_t *zio)
 		zio_add_child(spa->spa_async_zio_root, zio);
 	}
 
-	__zio_execute(zio);
+	zio_execute(zio);
 }
 
 /*
@@ -1384,7 +1303,8 @@ static void
 zio_reexecute(zio_t *pio)
 {
 	zio_t *cio, *cio_next;
-	int c, w;
+    int w;
+    int c;
 
 	ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
 	ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
@@ -1395,6 +1315,7 @@ zio_reexecute(zio_t *pio)
 	pio->io_stage = pio->io_orig_stage;
 	pio->io_pipeline = pio->io_orig_pipeline;
 	pio->io_reexecute = 0;
+	pio->io_flags |= ZIO_FLAG_REEXECUTED;
 	pio->io_error = 0;
 	for (w = 0; w < ZIO_WAIT_TYPES; w++)
 		pio->io_state[w] = 0;
@@ -1426,7 +1347,7 @@ zio_reexecute(zio_t *pio)
 	 * responsibility of the caller to wait on him.
 	 */
 	if (!(pio->io_flags & ZIO_FLAG_GODFATHER))
-		__zio_execute(pio);
+		zio_execute(pio);
 }
 
 void
@@ -1639,7 +1560,7 @@ zio_gang_node_alloc(zio_gang_node_t **gnpp)
 
 	ASSERT(*gnpp == NULL);
 
-	gn = kmem_zalloc(sizeof (*gn), KM_PUSHPAGE);
+	gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
 	gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
 	*gnpp = gn;
 
@@ -1650,7 +1571,7 @@ static void
 zio_gang_node_free(zio_gang_node_t **gnpp)
 {
 	zio_gang_node_t *gn = *gnpp;
-	int g;
+    int g;
 
 	for (g = 0; g < SPA_GBH_NBLKPTRS; g++)
 		ASSERT(gn->gn_child[g] == NULL);
@@ -1664,7 +1585,7 @@ static void
 zio_gang_tree_free(zio_gang_node_t **gnpp)
 {
 	zio_gang_node_t *gn = *gnpp;
-	int g;
+    int g;
 
 	if (gn == NULL)
 		return;
@@ -1694,7 +1615,7 @@ zio_gang_tree_assemble_done(zio_t *zio)
 	zio_t *gio = zio->io_gang_leader;
 	zio_gang_node_t *gn = zio->io_private;
 	blkptr_t *bp = zio->io_bp;
-	int g;
+    int g;
 
 	ASSERT(gio == zio_unique_parent(zio));
 	ASSERT(zio->io_child_count == 0);
@@ -1722,7 +1643,7 @@ zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, void *data)
 {
 	zio_t *gio = pio->io_gang_leader;
 	zio_t *zio;
-	int g;
+    int g;
 
 	ASSERT(BP_IS_GANG(bp) == !!gn);
 	ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
@@ -1793,11 +1714,11 @@ static void
 zio_write_gang_member_ready(zio_t *zio)
 {
 	zio_t *pio = zio_unique_parent(zio);
-	ASSERTV(zio_t *gio = zio->io_gang_leader;)
+	zio_t *gio = zio->io_gang_leader;
 	dva_t *cdva = zio->io_bp->blk_dva;
 	dva_t *pdva = pio->io_bp->blk_dva;
 	uint64_t asize;
-	int d;
+    int d;
 
 	if (BP_IS_HOLE(zio->io_bp))
 		return;
@@ -1835,7 +1756,8 @@ zio_write_gang_block(zio_t *pio)
 	int copies = gio->io_prop.zp_copies;
 	int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
 	zio_prop_t zp;
-	int g, error;
+	int error;
+    int g;
 
 	error = metaslab_alloc(spa, spa_normal_class(spa), SPA_GANGBLOCKSIZE,
 	    bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp,
@@ -1875,8 +1797,9 @@ zio_write_gang_block(zio_t *pio)
 		zp.zp_type = DMU_OT_NONE;
 		zp.zp_level = 0;
 		zp.zp_copies = gio->io_prop.zp_copies;
-		zp.zp_dedup = 0;
-		zp.zp_dedup_verify = 0;
+		zp.zp_dedup = B_FALSE;
+		zp.zp_dedup_verify = B_FALSE;
+		zp.zp_nopwrite = B_FALSE;
 
 		zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
 		    (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
@@ -1890,12 +1813,63 @@ zio_write_gang_block(zio_t *pio)
 	 */
 	pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+	zio_nowait(zio);
+
+	return (ZIO_PIPELINE_CONTINUE);
+}
+
+/*
+ * The zio_nop_write stage in the pipeline determines if allocating
+ * a new bp is necessary.  By leveraging a cryptographically secure checksum,
+ * such as SHA256, we can compare the checksums of the new data and the old
+ * to determine if allocating a new block is required.  The nopwrite
+ * feature can handle writes in either syncing or open context (i.e. zil
+ * writes) and as a result is mutually exclusive with dedup.
+ */
+static int
+zio_nop_write(zio_t *zio)
+{
+	blkptr_t *bp = zio->io_bp;
+	blkptr_t *bp_orig = &zio->io_bp_orig;
+	zio_prop_t *zp = &zio->io_prop;
+
+	ASSERT(BP_GET_LEVEL(bp) == 0);
+	ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
+	ASSERT(zp->zp_nopwrite);
+	ASSERT(!zp->zp_dedup);
+	ASSERT(zio->io_bp_override == NULL);
+	ASSERT(IO_IS_ALLOCATING(zio));
+
 	/*
-	 * We didn't allocate this bp, so make sure it doesn't get unmarked.
+	 * Check to see if the original bp and the new bp have matching
+	 * characteristics (i.e. same checksum, compression algorithms, etc).
+	 * If they don't then just continue with the pipeline which will
+	 * allocate a new bp.
 	 */
-	pio->io_flags &= ~ZIO_FLAG_FASTWRITE;
+	if (BP_IS_HOLE(bp_orig) ||
+	    !zio_checksum_table[BP_GET_CHECKSUM(bp)].ci_dedup ||
+	    BP_GET_CHECKSUM(bp) != BP_GET_CHECKSUM(bp_orig) ||
+	    BP_GET_COMPRESS(bp) != BP_GET_COMPRESS(bp_orig) ||
+	    BP_GET_DEDUP(bp) != BP_GET_DEDUP(bp_orig) ||
+	    zp->zp_copies != BP_GET_NDVAS(bp_orig))
+		return (ZIO_PIPELINE_CONTINUE);
 
-	zio_nowait(zio);
+	/*
+	 * If the checksums match then reset the pipeline so that we
+	 * avoid allocating a new bp and issuing any I/O.
+	 */
+	if (ZIO_CHECKSUM_EQUAL(bp->blk_cksum, bp_orig->blk_cksum)) {
+		ASSERT(zio_checksum_table[zp->zp_checksum].ci_dedup);
+		ASSERT3U(BP_GET_PSIZE(bp), ==, BP_GET_PSIZE(bp_orig));
+		ASSERT3U(BP_GET_LSIZE(bp), ==, BP_GET_LSIZE(bp_orig));
+		ASSERT(zp->zp_compress != ZIO_COMPRESS_OFF);
+		ASSERT(bcmp(&bp->blk_prop, &bp_orig->blk_prop,
+		    sizeof (uint64_t)) == 0);
+
+		*bp = *bp_orig;
+		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
+		zio->io_flags |= ZIO_FLAG_NOPWRITE;
+	}
 
 	return (ZIO_PIPELINE_CONTINUE);
 }
@@ -1928,7 +1902,7 @@ static int
 zio_ddt_read_start(zio_t *zio)
 {
 	blkptr_t *bp = zio->io_bp;
-	int p;
+    int p;
 
 	ASSERT(BP_GET_DEDUP(bp));
 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
@@ -2009,7 +1983,7 @@ static boolean_t
 zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
 {
 	spa_t *spa = zio->io_spa;
-	int p;
+    int p;
 
 	/*
 	 * Note: we compare the original data, not the transformed data,
@@ -2040,7 +2014,7 @@ zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
 
 			ddt_exit(ddt);
 
-			error = arc_read_nolock(NULL, spa, &blk,
+			error = arc_read(NULL, spa, &blk,
 			    arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
 			    &aflags, &zio->io_bookmark);
@@ -2050,7 +2024,7 @@ zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
 				    bcmp(abuf->b_data, zio->io_orig_data,
 				    zio->io_orig_size) != 0)
 					error = EEXIST;
-				VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
+				VERIFY(arc_buf_remove_ref(abuf, &abuf));
 			}
 
 			ddt_enter(ddt);
@@ -2113,12 +2087,12 @@ static void
 zio_ddt_ditto_write_done(zio_t *zio)
 {
 	int p = DDT_PHYS_DITTO;
+	zio_prop_t *zp = &zio->io_prop;
 	blkptr_t *bp = zio->io_bp;
 	ddt_t *ddt = ddt_select(zio->io_spa, bp);
 	ddt_entry_t *dde = zio->io_private;
 	ddt_phys_t *ddp = &dde->dde_phys[p];
 	ddt_key_t *ddk = &dde->dde_key;
-	ASSERTV(zio_prop_t *zp = &zio->io_prop);
 
 	ddt_enter(ddt);
 
@@ -2174,7 +2148,7 @@ zio_ddt_write(zio_t *zio)
 			zio->io_stage = ZIO_STAGE_OPEN;
 			BP_ZERO(bp);
 		} else {
-			zp->zp_dedup = 0;
+			zp->zp_dedup = B_FALSE;
 		}
 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
 		ddt_exit(ddt);
@@ -2291,7 +2265,7 @@ zio_dva_allocate(zio_t *zio)
 	}
 
 	ASSERT(BP_IS_HOLE(bp));
-	ASSERT3U(BP_GET_NDVAS(bp), ==, 0);
+	ASSERT0(BP_GET_NDVAS(bp));
 	ASSERT3U(zio->io_prop.zp_copies, >, 0);
 	ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
 	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
@@ -2304,7 +2278,6 @@ zio_dva_allocate(zio_t *zio)
 	flags |= (zio->io_flags & ZIO_FLAG_NODATA) ? METASLAB_GANG_AVOID : 0;
 	flags |= (zio->io_flags & ZIO_FLAG_GANG_CHILD) ?
 	    METASLAB_GANG_CHILD : 0;
-	flags |= (zio->io_flags & ZIO_FLAG_FASTWRITE) ? METASLAB_FASTWRITE : 0;
 	error = metaslab_alloc(spa, mc, zio->io_size, bp,
 	    zio->io_prop.zp_copies, zio->io_txg, NULL, flags);
 
@@ -2348,7 +2321,7 @@ zio_dva_claim(zio_t *zio)
 static void
 zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
 {
-	int g;
+    int g;
 
 	ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
 	ASSERT(zio->io_bp_override == NULL);
@@ -2368,8 +2341,8 @@ zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
  */
 int
-zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, uint64_t size,
-    boolean_t use_slog)
+zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
+    uint64_t size, boolean_t use_slog)
 {
 	int error = 1;
 
@@ -2382,14 +2355,14 @@ zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, uint64_t size,
 	 */
 	if (use_slog) {
 		error = metaslab_alloc(spa, spa_log_class(spa), size,
-		    new_bp, 1, txg, NULL,
-		    METASLAB_FASTWRITE | METASLAB_GANG_AVOID);
+		    new_bp, 1, txg, old_bp,
+		    METASLAB_HINTBP_AVOID | METASLAB_GANG_AVOID);
 	}
 
 	if (error) {
 		error = metaslab_alloc(spa, spa_normal_class(spa), size,
-		    new_bp, 1, txg, NULL,
-		    METASLAB_FASTWRITE | METASLAB_GANG_AVOID);
+		    new_bp, 1, txg, old_bp,
+		    METASLAB_HINTBP_AVOID | METASLAB_GANG_AVOID);
 	}
 
 	if (error == 0) {
@@ -2643,8 +2616,9 @@ zio_vdev_io_assess(zio_t *zio)
 	 * set vdev_cant_write so that we stop trying to allocate from it.
 	 */
 	if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
-	    vd != NULL && !vd->vdev_ops->vdev_op_leaf)
+	    vd != NULL && !vd->vdev_ops->vdev_op_leaf) {
 		vd->vdev_cant_write = B_TRUE;
+	}
 
 	if (zio->io_error)
 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
@@ -2799,7 +2773,8 @@ zio_ready(zio_t *zio)
 
 	if (zio->io_ready) {
 		ASSERT(IO_IS_ALLOCATING(zio));
-		ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
+		ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
+		    (zio->io_flags & ZIO_FLAG_NOPWRITE));
 		ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
 
 		zio->io_ready(zio);
@@ -2847,8 +2822,13 @@ zio_ready(zio_t *zio)
 static int
 zio_done(zio_t *zio)
 {
+	spa_t *spa = zio->io_spa;
+	zio_t *lio = zio->io_logical;
+	blkptr_t *bp = zio->io_bp;
+	vdev_t *vd = zio->io_vd;
+	uint64_t psize = zio->io_size;
 	zio_t *pio, *pio_next;
-	int c, w;
+    int c, w;
 
 	/*
 	 * If our children haven't all completed,
@@ -2857,28 +2837,28 @@ zio_done(zio_t *zio)
 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
 	    zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
 	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
-	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE)) {
+	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
 		return (ZIO_PIPELINE_STOP);
-    }
 
 	for (c = 0; c < ZIO_CHILD_TYPES; c++)
-		for (w = 0; w < ZIO_WAIT_TYPES; w++) {
+		for (w = 0; w < ZIO_WAIT_TYPES; w++)
 			ASSERT(zio->io_children[c][w] == 0);
-        }
-
-	if (zio->io_bp != NULL) {
-		ASSERT(zio->io_bp->blk_pad[0] == 0);
-		ASSERT(zio->io_bp->blk_pad[1] == 0);
-		ASSERT(bcmp(zio->io_bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
-		    (zio->io_bp == zio_unique_parent(zio)->io_bp));
-		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(zio->io_bp) &&
+
+	if (bp != NULL) {
+		ASSERT(bp->blk_pad[0] == 0);
+		ASSERT(bp->blk_pad[1] == 0);
+		ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
+		    (bp == zio_unique_parent(zio)->io_bp));
+		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
 		    zio->io_bp_override == NULL &&
 		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
-			ASSERT(!BP_SHOULD_BYTESWAP(zio->io_bp));
-			ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
-			ASSERT(BP_COUNT_GANG(zio->io_bp) == 0 ||
-			    (BP_COUNT_GANG(zio->io_bp) == BP_GET_NDVAS(zio->io_bp)));
+			ASSERT(!BP_SHOULD_BYTESWAP(bp));
+			ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(bp));
+			ASSERT(BP_COUNT_GANG(bp) == 0 ||
+			    (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
 		}
+		if (zio->io_flags & ZIO_FLAG_NOPWRITE)
+			VERIFY(BP_EQUAL(bp, &zio->io_bp_orig));
 	}
 
 	/*
@@ -2896,13 +2876,13 @@ zio_done(zio_t *zio)
 		while (zio->io_cksum_report != NULL) {
 			zio_cksum_report_t *zcr = zio->io_cksum_report;
 			uint64_t align = zcr->zcr_align;
-			uint64_t asize = P2ROUNDUP(zio->io_size, align);
+			uint64_t asize = P2ROUNDUP(psize, align);
 			char *abuf = zio->io_data;
 
-			if (asize != zio->io_size) {
+			if (asize != psize) {
 				abuf = zio_buf_alloc(asize);
-				bcopy(zio->io_data, abuf, zio->io_size);
-				bzero(abuf + zio->io_size, asize - zio->io_size);
+				bcopy(zio->io_data, abuf, psize);
+				bzero(abuf + psize, asize - psize);
 			}
 
 			zio->io_cksum_report = zcr->zcr_next;
@@ -2910,21 +2890,14 @@ zio_done(zio_t *zio)
 			zcr->zcr_finish(zcr, abuf);
 			zfs_ereport_free_checksum(zcr);
 
-			if (asize != zio->io_size)
+			if (asize != psize)
 				zio_buf_free(abuf, asize);
 		}
 	}
 
 	zio_pop_transforms(zio);	/* note: may set zio->io_error */
 
-	vdev_stat_update(zio, zio->io_size);
-
-	/*
-	 * When an I/O completes but was slow post an ereport.
-	 */
-	if (zio->io_delay >= zio_delay_max)
-		zfs_ereport_post(FM_EREPORT_ZFS_DELAY, zio->io_spa,
-		    zio->io_vd, zio, 0, 0);
+	vdev_stat_update(zio, psize);
 
 	if (zio->io_error) {
 		/*
@@ -2933,30 +2906,28 @@ zio_done(zio_t *zio)
 		 * at the block level.  We ignore these errors if the
 		 * device is currently unavailable.
 		 */
-		if (zio->io_error != ECKSUM && zio->io_vd != NULL &&
-			!vdev_is_dead(zio->io_vd))
-			zfs_ereport_post(FM_EREPORT_ZFS_IO, zio->io_spa,
-						zio->io_vd, zio, 0, 0);
+		if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
+			zfs_ereport_post(FM_EREPORT_ZFS_IO, spa, vd, zio, 0, 0);
 
 		if ((zio->io_error == EIO || !(zio->io_flags &
 		    (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
-		    zio == zio->io_logical) {
+		    zio == lio) {
 			/*
 			 * For logical I/O requests, tell the SPA to log the
 			 * error and generate a logical data ereport.
 			 */
-			spa_log_error(zio->io_spa, zio);
-			zfs_ereport_post(FM_EREPORT_ZFS_DATA, zio->io_spa, NULL, zio,
+			spa_log_error(spa, zio);
+			zfs_ereport_post(FM_EREPORT_ZFS_DATA, spa, NULL, zio,
 			    0, 0);
 		}
 	}
 
-	if (zio->io_error && zio == zio->io_logical) {
+	if (zio->io_error && zio == lio) {
 		/*
 		 * Determine whether zio should be reexecuted.  This will
 		 * propagate all the way to the root via zio_notify_parent().
 		 */
-		ASSERT(zio->io_vd == NULL && zio->io_bp != NULL);
+		ASSERT(vd == NULL && bp != NULL);
 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
 
 		if (IO_IS_ALLOCATING(zio) &&
@@ -2971,8 +2942,8 @@ zio_done(zio_t *zio)
 		    zio->io_type == ZIO_TYPE_FREE) &&
 		    !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
 		    zio->io_error == ENXIO &&
-		    spa_load_state(zio->io_spa) == SPA_LOAD_NONE &&
-		    spa_get_failmode(zio->io_spa) != ZIO_FAILURE_MODE_CONTINUE)
+		    spa_load_state(spa) == SPA_LOAD_NONE &&
+		    spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
 
 		if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
@@ -2997,8 +2968,8 @@ zio_done(zio_t *zio)
 
 	if ((zio->io_error || zio->io_reexecute) &&
 	    IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
-	    !(zio->io_flags & ZIO_FLAG_IO_REWRITE))
-		zio_dva_unallocate(zio, zio->io_gang_tree, zio->io_bp);
+	    !(zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)))
+		zio_dva_unallocate(zio, zio->io_gang_tree, bp);
 
 	zio_gang_tree_free(&zio->io_gang_tree);
 
@@ -3063,17 +3034,16 @@ zio_done(zio_t *zio)
 			 * We'd fail again if we reexecuted now, so suspend
 			 * until conditions improve (e.g. device comes online).
 			 */
-			zio_suspend(zio->io_spa, zio);
+			zio_suspend(spa, zio);
 		} else {
 			/*
 			 * Reexecution is potentially a huge amount of work.
 			 * Hand it off to the otherwise-unused claim taskq.
 			 */
-			ASSERT(taskq_empty_ent(&zio->io_tqent));
-			(void) taskq_dispatch_ent(
-			    zio->io_spa->spa_zio_taskq[ZIO_TYPE_CLAIM][ZIO_TASKQ_ISSUE],
-			    (task_func_t *)zio_reexecute, zio, 0,
-			    &zio->io_tqent);
+			ASSERT(zio->io_tqent.tqent_next == NULL);
+			spa_taskq_dispatch_ent(spa, ZIO_TYPE_CLAIM,
+			    ZIO_TASKQ_ISSUE, (task_func_t *)zio_reexecute, zio,
+			    0, &zio->io_tqent);
 		}
 		return (ZIO_PIPELINE_STOP);
 	}
@@ -3093,11 +3063,6 @@ zio_done(zio_t *zio)
 		zfs_ereport_free_checksum(zcr);
 	}
 
-	if (zio->io_flags & ZIO_FLAG_FASTWRITE && zio->io_bp &&
-	    !BP_IS_HOLE(zio->io_bp)) {
-		metaslab_fastwrite_unmark(zio->io_spa, zio->io_bp);
-	}
-
 	/*
 	 * It is the responsibility of the done callback to ensure that this
 	 * particular zio is no longer discoverable for adoption, and as
@@ -3113,7 +3078,7 @@ zio_done(zio_t *zio)
 	for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
 		zio_link_t *zl = zio->io_walk_link;
 		pio_next = zio_walk_parents(zio);
-	zio_remove_child(pio, zio, zl);
+		zio_remove_child(pio, zio, zl);
 		zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
 	}
 
@@ -3141,6 +3106,7 @@ static zio_pipe_stage_t *zio_pipeline[] = {
 	zio_issue_async,
 	zio_write_bp_init,
 	zio_checksum_generate,
+	zio_nop_write,
 	zio_ddt_read_start,
 	zio_ddt_read_done,
 	zio_ddt_write,
@@ -3199,25 +3165,3 @@ zbookmark_is_before(const dnode_phys_t *dnp, const zbookmark_t *zb1,
 		return (B_FALSE);
 	return (zb1nextL0 <= zb2->zb_blkid);
 }
-
-#if defined(_KERNEL) && defined(HAVE_SPL)
-/* Fault injection */
-EXPORT_SYMBOL(zio_injection_enabled);
-EXPORT_SYMBOL(zio_inject_fault);
-EXPORT_SYMBOL(zio_inject_list_next);
-EXPORT_SYMBOL(zio_clear_fault);
-EXPORT_SYMBOL(zio_handle_fault_injection);
-EXPORT_SYMBOL(zio_handle_device_injection);
-EXPORT_SYMBOL(zio_handle_label_injection);
-EXPORT_SYMBOL(zio_priority_table);
-EXPORT_SYMBOL(zio_type_name);
-
-module_param(zio_bulk_flags, int, 0644);
-MODULE_PARM_DESC(zio_bulk_flags, "Additional flags to pass to bulk buffers");
-
-module_param(zio_delay_max, int, 0644);
-MODULE_PARM_DESC(zio_delay_max, "Max zio millisec delay before posting event");
-
-module_param(zio_requeue_io_start_cut_in_line, int, 0644);
-MODULE_PARM_DESC(zio_requeue_io_start_cut_in_line, "Prioritize requeued I/O");
-#endif