From 9523ff15698107eb1c5c00462a3ffe2646a3f9ec Mon Sep 17 00:00:00 2001
From: Li Wei <wei.g.li@intel.com>
Date: Fri, 22 Mar 2024 10:28:25 +0900
Subject: [PATCH] DAOS-14261 engine: Add dss_chore for I/O forwarding (#13372)

As requested by the Jira ticket, add a new I/O forwarding mechanism,
dss_chore, to avoid creating a ULT for every forwarding task.

  - Forwarding of object I/O and DTX RPCs is converted to chores.

  - Cancelation is not implemented, because the I/O forwarding tasks
    themselves do not support cancelation yet.

  - In certain engine configurations, some xstreams do not need to
    initialize dx_chore_queue. This is left to future work.

Required-githooks: true
Skipped-githooks: clang

Change-Id: I8d6f9889f5562a8bc3683d26cb830672a8aa40f3
Signed-off-by: Li Wei <wei.g.li@intel.com>
---
 src/dtx/dtx_common.c               |  88 +++++++-----
 src/dtx/dtx_rpc.c                  | 139 ++++++++++++-------
 src/engine/sched.c                 |  18 ++-
 src/engine/srv.c                   |  27 +++-
 src/engine/srv_internal.h          |  16 +++
 src/engine/ult.c                   | 209 +++++++++++++++++++++++++++++
 src/include/daos_srv/daos_engine.h |  43 ++++++
 7 files changed, 453 insertions(+), 87 deletions(-)

diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c
index 43ba3d64752..ee119d4b965 100644
--- a/src/dtx/dtx_common.c
+++ b/src/dtx/dtx_common.c
@@ -1914,10 +1914,16 @@ dtx_handle_resend(daos_handle_t coh,  struct dtx_id *dti,
  */
 #define DTX_EXEC_STEP_LENGTH	DTX_THRESHOLD_COUNT
 
-struct dtx_ult_arg {
+struct dtx_chore {
+	struct dss_chore		 chore;
 	dtx_sub_func_t			 func;
 	void				*func_arg;
 	struct dtx_leader_handle	*dlh;
+
+	/* Chore-internal state variables */
+	uint32_t			 i;
+	uint32_t			 j;
+	uint32_t			 k;
 };
 
 static void
@@ -1970,20 +1976,34 @@ dtx_sub_comp_cb(struct dtx_leader_handle *dlh, int idx, int rc)
 		  idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags, rc);
 }
 
-static void
-dtx_leader_exec_ops_ult(void *arg)
+static enum dss_chore_status
+dtx_leader_exec_ops_chore(struct dss_chore *chore, bool is_reentrance)
 {
-	struct dtx_ult_arg		*ult_arg = arg;
-	struct dtx_leader_handle	*dlh = ult_arg->dlh;
+	struct dtx_chore		*dtx_chore = container_of(chore, struct dtx_chore, chore);
+	struct dtx_leader_handle	*dlh = dtx_chore->dlh;
 	struct dtx_sub_status		*sub;
 	struct daos_shard_tgt		*tgt;
-	uint32_t			 i;
-	uint32_t			 j;
-	uint32_t			 k;
 	int				 rc = 0;
 
-	for (i = dlh->dlh_forward_idx, j = 0, k = 0; j < dlh->dlh_forward_cnt; i++, j++) {
-		sub = &dlh->dlh_subs[i];
+	/*
+	 * If this is the first entrance, initialize the chore-internal state
+	 * variables.
+	 */
+	if (is_reentrance) {
+		D_DEBUG(DB_TRACE, "%p: resume: i=%u j=%u k=%u forward_cnt=%u\n", chore,
+			dtx_chore->i, dtx_chore->j, dtx_chore->k, dlh->dlh_forward_cnt);
+		dtx_chore->i++;
+		dtx_chore->j++;
+	} else {
+		D_DEBUG(DB_TRACE, "%p: initialize: forward_idx=%u forward_cnt=%u\n", chore,
+			dlh->dlh_forward_idx, dlh->dlh_forward_cnt);
+		dtx_chore->i = dlh->dlh_forward_idx;
+		dtx_chore->j = 0;
+		dtx_chore->k = 0;
+	}
+
+	for (; dtx_chore->j < dlh->dlh_forward_cnt; dtx_chore->i++, dtx_chore->j++) {
+		sub = &dlh->dlh_subs[dtx_chore->i];
 		tgt = &sub->dss_tgt;
 
 		if (dlh->dlh_normal_sub_done == 0) {
@@ -1991,7 +2011,7 @@ dtx_leader_exec_ops_ult(void *arg)
 			sub->dss_comp = 0;
 
 			if (unlikely(tgt->st_flags & DTF_DELAY_FORWARD)) {
-				dtx_sub_comp_cb(dlh, i, 0);
+				dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
 				continue;
 			}
 		} else {
@@ -2003,33 +2023,35 @@ dtx_leader_exec_ops_ult(void *arg)
 		}
 
 		if (tgt->st_rank == DAOS_TGT_IGNORE ||
-		    (i == daos_fail_value_get() && DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) {
+		    (dtx_chore->i == daos_fail_value_get() &&
+		     DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) {
 			if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD)
-				dtx_sub_comp_cb(dlh, i, 0);
+				dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
 			continue;
 		}
 
-		rc = ult_arg->func(dlh, ult_arg->func_arg, i, dtx_sub_comp_cb);
+		rc = dtx_chore->func(dlh, dtx_chore->func_arg, dtx_chore->i, dtx_sub_comp_cb);
 		if (rc != 0) {
 			if (sub->dss_comp == 0)
-				dtx_sub_comp_cb(dlh, i, rc);
+				dtx_sub_comp_cb(dlh, dtx_chore->i, rc);
 			break;
 		}
 
 		/* Yield to avoid holding CPU for too long time. */
-		if ((++k) % DTX_RPC_YIELD_THD == 0)
-			ABT_thread_yield();
+		if (++(dtx_chore->k) % DTX_RPC_YIELD_THD == 0)
+			return DSS_CHORE_YIELD;
 	}
 
 	if (rc != 0) {
-		for (i++, j++; j < dlh->dlh_forward_cnt; i++, j++) {
-			sub = &dlh->dlh_subs[i];
+		for (dtx_chore->i++, dtx_chore->j++; dtx_chore->j < dlh->dlh_forward_cnt;
+		     dtx_chore->i++, dtx_chore->j++) {
+			sub = &dlh->dlh_subs[dtx_chore->i];
 			tgt = &sub->dss_tgt;
 
 			if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) {
 				sub->dss_result = 0;
 				sub->dss_comp = 0;
-				dtx_sub_comp_cb(dlh, i, 0);
+				dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
 			}
 		}
 	}
@@ -2039,6 +2061,8 @@ dtx_leader_exec_ops_ult(void *arg)
 	D_ASSERTF(rc == ABT_SUCCESS, "ABT_future_set failed [%u, %u), for delay %s: %d\n",
 		  dlh->dlh_forward_idx, dlh->dlh_forward_idx + dlh->dlh_forward_cnt,
 		  dlh->dlh_normal_sub_done == 1 ? "yes" : "no", rc);
+
+	return DSS_CHORE_DONE;
 }
 
 /**
@@ -2048,15 +2072,15 @@ int
 dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
 		    dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg)
 {
-	struct dtx_ult_arg	ult_arg;
+	struct dtx_chore	dtx_chore;
 	int			sub_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;
 	int			rc = 0;
 	int			local_rc = 0;
 	int			remote_rc = 0;
 
-	ult_arg.func = func;
-	ult_arg.func_arg = func_arg;
-	ult_arg.dlh = dlh;
+	dtx_chore.func = func;
+	dtx_chore.func_arg = func_arg;
+	dtx_chore.dlh = dlh;
 
 	dlh->dlh_result = 0;
 	dlh->dlh_allow_failure = allow_failure;
@@ -2092,15 +2116,10 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
 		D_GOTO(out, rc = dss_abterr2der(rc));
 	}
 
-	/*
-	 * NOTE: Ideally, we probably should create ULT for each shard, but for performance
-	 *	 reasons, let's only create one for all remote targets for now.
-	 */
-	rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW,
-			    dss_get_module_info()->dmi_tgt_id, DSS_DEEP_STACK_SZ, NULL);
+	rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore);
 	if (rc != 0) {
-		D_ERROR("ult create failed [%u, %u] (2): "DF_RC"\n",
-			dlh->dlh_forward_idx, dlh->dlh_forward_cnt, DP_RC(rc));
+		DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx,
+			 dlh->dlh_forward_cnt);
 		ABT_future_free(&dlh->dlh_future);
 		goto out;
 	}
@@ -2168,10 +2187,9 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
 	/* The ones without DELAY flag will be skipped when scan the targets array. */
 	dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;
 
-	rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW,
-			    dss_get_module_info()->dmi_tgt_id, DSS_DEEP_STACK_SZ, NULL);
+	rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore);
 	if (rc != 0) {
-		D_ERROR("ult create failed (4): "DF_RC"\n", DP_RC(rc));
+		DL_ERROR(rc, "chore create failed (4)");
 		ABT_future_free(&dlh->dlh_future);
 		goto out;
 	}
diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c
index 324f8bffd3a..03b0b542383 100644
--- a/src/dtx/dtx_rpc.c
+++ b/src/dtx/dtx_rpc.c
@@ -363,6 +363,8 @@ dtx_req_wait(struct dtx_req_args *dra)
 }
 
 struct dtx_common_args {
+	struct dss_chore	  dca_chore;
+	ABT_eventual		  dca_chore_eventual;
 	struct dtx_req_args	  dca_dra;
 	d_list_t		  dca_head;
 	struct btr_root		  dca_tree_root;
@@ -373,57 +375,76 @@ struct dtx_common_args {
 	d_rank_t		  dca_rank;
 	uint32_t		  dca_tgtid;
 	struct ds_cont_child	 *dca_cont;
-	ABT_thread		  dca_helper;
 	struct dtx_id		  dca_dti_inline;
 	struct dtx_id		 *dca_dtis;
 	struct dtx_entry	**dca_dtes;
+
+	/* Chore-internal state variables */
+	struct dtx_req_rec	 *dca_drr;
+	int			  dca_i;
 };
 
+/* If is_reentrance, this function ignores len. */
 static int
-dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len)
+dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool is_reentrance)
 {
 	struct dtx_req_args	*dra = &dca->dca_dra;
-	struct dtx_req_rec	*drr;
 	int			 rc;
-	int			 i = 0;
 
-	dra->dra_length = len;
+	if (!is_reentrance) {
+		dra->dra_length = len;
+
+		rc = ABT_future_create(len, dtx_req_list_cb, &dra->dra_future);
+		if (rc != ABT_SUCCESS) {
+			D_ERROR("ABT_future_create failed for opc %x, len = %d: "
+				"rc = %d.\n", dra->dra_opc, len, rc);
+			return dss_abterr2der(rc);
+		}
 
-	rc = ABT_future_create(len, dtx_req_list_cb, &dra->dra_future);
-	if (rc != ABT_SUCCESS) {
-		D_ERROR("ABT_future_create failed for opc %x, len = %d: "
-			"rc = %d.\n", dra->dra_opc, len, rc);
-		return dss_abterr2der(rc);
+		D_DEBUG(DB_TRACE, "%p: DTX req for opc %x, future %p (%d) start.\n",
+			&dca->dca_chore, dra->dra_opc, dra->dra_future, len);
 	}
 
-	D_DEBUG(DB_TRACE, "DTX req for opc %x, future %p start.\n", dra->dra_opc, dra->dra_future);
+	/*
+	 * Begin or continue an iteration over dca_head. When beginning the
+	 * iteration, dca->dca_drr does not point to a real entry, and is only
+	 * safe for d_list_for_each_entry_continue.
+	 */
+	if (!is_reentrance) {
+		dca->dca_drr = d_list_entry(&dca->dca_head, struct dtx_req_rec, drr_link);
+		dca->dca_i = 0;
+	}
+	/* DO NOT add any line here! See the comment on dca->dca_drr above. */
+	d_list_for_each_entry_continue(dca->dca_drr, &dca->dca_head, drr_link)
+	{
+		D_DEBUG(DB_TRACE, "chore=%p: drr=%p i=%d\n", &dca->dca_chore, dca->dca_drr,
+			dca->dca_i);
 
-	d_list_for_each_entry(drr, &dca->dca_head, drr_link) {
-		drr->drr_parent = dra;
-		drr->drr_result = 0;
+		dca->dca_drr->drr_parent = dra;
+		dca->dca_drr->drr_result = 0;
 
-		if (unlikely(dra->dra_opc == DTX_COMMIT && i == 0 &&
+		if (unlikely(dra->dra_opc == DTX_COMMIT && dca->dca_i == 0 &&
 			     DAOS_FAIL_CHECK(DAOS_DTX_FAIL_COMMIT)))
-			rc = dtx_req_send(drr, 1);
+			rc = dtx_req_send(dca->dca_drr, 1);
 		else
-			rc = dtx_req_send(drr, epoch);
+			rc = dtx_req_send(dca->dca_drr, epoch);
 		if (rc != 0) {
 			/* If the first sub-RPC failed, then break, otherwise
 			 * other remote replicas may have already received the
 			 * RPC and executed it, so have to go ahead.
 			 */
-			if (i == 0) {
+			if (dca->dca_i == 0) {
 				ABT_future_free(&dra->dra_future);
 				return rc;
 			}
 		}
 
 		/* Yield to avoid holding CPU for too long time. */
-		if (++i % DTX_RPC_YIELD_THD == 0)
-			ABT_thread_yield();
+		if (++(dca->dca_i) % DTX_RPC_YIELD_THD == 0)
+			return DSS_CHORE_YIELD;
 	}
 
-	return 0;
+	return DSS_CHORE_DONE;
 }
 
 static int
@@ -599,16 +620,22 @@ dtx_classify_one(struct ds_pool *pool, daos_handle_t tree, d_list_t *head, int *
 	return rc > 0 ? 0 : rc;
 }
 
-static int
-dtx_rpc_internal(struct dtx_common_args *dca)
+static enum dss_chore_status
+dtx_rpc_helper(struct dss_chore *chore, bool is_reentrance)
 {
+	struct dtx_common_args	*dca = container_of(chore, struct dtx_common_args, dca_chore);
 	struct ds_pool		*pool = dca->dca_cont->sc_pool->spc_pool;
 	struct umem_attr	 uma = { 0 };
 	int			 length = 0;
 	int			 rc;
 	int			 i;
 
-	if (dca->dca_dra.dra_opc != DTX_REFRESH) {
+	if (is_reentrance) {
+		D_DEBUG(DB_TRACE, "%p: skip to send\n", &dca->dca_chore);
+		goto send;
+	}
+
+	if (dca->dca_dtes != NULL) {
 		D_ASSERT(dca->dca_dtis != NULL);
 
 		if (dca->dca_count > 1) {
@@ -616,7 +643,7 @@ dtx_rpc_internal(struct dtx_common_args *dca)
 			rc = dbtree_create_inplace(DBTREE_CLASS_DTX_CF, 0, DTX_CF_BTREE_ORDER,
 						   &uma, &dca->dca_tree_root, &dca->dca_tree_hdl);
 			if (rc != 0)
-				return rc;
+				goto done;
 		}
 
 		ABT_rwlock_rdlock(pool->sp_lock);
@@ -626,7 +653,7 @@ dtx_rpc_internal(struct dtx_common_args *dca)
 					      dca->dca_rank, dca->dca_tgtid);
 			if (rc < 0) {
 				ABT_rwlock_unlock(pool->sp_lock);
-				return rc;
+				goto done;
 			}
 
 			daos_dti_copy(&dca->dca_dtis[i], &dca->dca_dtes[i]->dte_xid);
@@ -636,30 +663,33 @@ dtx_rpc_internal(struct dtx_common_args *dca)
 		/* For DTX_CHECK, if no other available target(s), then current target is the
 		 * unique valid one (and also 'prepared'), then related DTX can be committed.
 		 */
-		if (d_list_empty(&dca->dca_head))
-			return dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0;
+		if (d_list_empty(&dca->dca_head)) {
+			rc = (dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0);
+			goto done;
+		}
 	} else {
 		length = dca->dca_count;
 	}
 
 	D_ASSERT(length > 0);
 
-	return dtx_req_list_send(dca, dca->dca_epoch, length);
-}
-
-static void
-dtx_rpc_helper(void *arg)
-{
-	struct dtx_common_args	*dca = arg;
-	int			 rc;
-
-	rc = dtx_rpc_internal(dca);
+send:
+	rc = dtx_req_list_send(dca, dca->dca_epoch, length, is_reentrance);
+	if (rc == DSS_CHORE_YIELD)
+		return DSS_CHORE_YIELD;
+	if (rc == DSS_CHORE_DONE)
+		rc = 0;
 
+done:
 	if (rc != 0)
 		dca->dca_dra.dra_result = rc;
-
-	D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE,
-		 "DTX helper ULT for %u exit: %d\n", dca->dca_dra.dra_opc, rc);
+	D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, "%p: DTX RPC chore for %u done: %d\n", chore,
+		 dca->dca_dra.dra_opc, rc);
+	if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) {
+		rc = ABT_eventual_set(dca->dca_chore_eventual, NULL, 0);
+		D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_set: %d\n", rc);
+	}
+	return DSS_CHORE_DONE;
 }
 
 static int
@@ -672,6 +702,7 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list,  struct dtx_entry **
 
 	memset(dca, 0, sizeof(*dca));
 
+	dca->dca_chore_eventual = ABT_EVENTUAL_NULL;
 	D_INIT_LIST_HEAD(&dca->dca_head);
 	dca->dca_tree_hdl = DAOS_HDL_INVAL;
 	dca->dca_epoch = epoch;
@@ -679,7 +710,6 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list,  struct dtx_entry **
 	crt_group_rank(NULL, &dca->dca_rank);
 	dca->dca_tgtid = dss_get_module_info()->dmi_tgt_id;
 	dca->dca_cont = cont;
-	dca->dca_helper = ABT_THREAD_NULL;
 	dca->dca_dtes = dtes;
 
 	dra = &dca->dca_dra;
@@ -705,11 +735,18 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list,  struct dtx_entry **
 	}
 
 	/* Use helper ULT to handle DTX RPC if there are enough helper XS. */
-	if (dss_has_enough_helper())
-		rc = dss_ult_create(dtx_rpc_helper, dca, DSS_XS_IOFW, dca->dca_tgtid,
-				    DSS_DEEP_STACK_SZ, &dca->dca_helper);
-	else
-		rc = dtx_rpc_internal(dca);
+	if (dss_has_enough_helper()) {
+		rc = ABT_eventual_create(0, &dca->dca_chore_eventual);
+		if (rc != ABT_SUCCESS) {
+			D_ERROR("failed to create eventual: %d\n", rc);
+			rc = dss_abterr2der(rc);
+			goto out;
+		}
+		rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper);
+	} else {
+		dss_chore_diy(&dca->dca_chore, dtx_rpc_helper);
+		rc = dca->dca_dra.dra_result;
+	}
 
 out:
 	return rc;
@@ -721,8 +758,12 @@ dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head)
 	struct dtx_req_rec	*drr;
 	int			 rc;
 
-	if (dca->dca_helper != ABT_THREAD_NULL)
-		ABT_thread_free(&dca->dca_helper);
+	if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) {
+		rc = ABT_eventual_wait(dca->dca_chore_eventual, NULL);
+		D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_wait: %d\n", rc);
+		rc = ABT_eventual_free(&dca->dca_chore_eventual);
+		D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc);
+	}
 
 	rc = dtx_req_wait(&dca->dca_dra);
 
diff --git a/src/engine/sched.c b/src/engine/sched.c
index aae2fb5554b..988b8f2b6aa 100644
--- a/src/engine/sched.c
+++ b/src/engine/sched.c
@@ -1441,8 +1441,8 @@ sched_stop(struct dss_xstream *dx)
 	process_all(dx);
 }
 
-void
-sched_cond_wait(ABT_cond cond, ABT_mutex mutex)
+static void
+cond_wait(ABT_cond cond, ABT_mutex mutex, bool for_business)
 {
 	struct dss_xstream	*dx = dss_current_xstream();
 	struct sched_info	*info = &dx->dx_sched_info;
@@ -1451,6 +1451,20 @@ sched_cond_wait(ABT_cond cond, ABT_mutex mutex)
 	ABT_cond_wait(cond, mutex);
 	D_ASSERT(info->si_wait_cnt > 0);
 	info->si_wait_cnt -= 1;
+	if (for_business)
+		info->si_stats.ss_busy_ts = info->si_cur_ts;
+}
+
+void
+sched_cond_wait(ABT_cond cond, ABT_mutex mutex)
+{
+	cond_wait(cond, mutex, false /* for_business */);
+}
+
+void
+sched_cond_wait_for_business(ABT_cond cond, ABT_mutex mutex)
+{
+	cond_wait(cond, mutex, true /* for_business */);
 }
 
 uint64_t
diff --git a/src/engine/srv.c b/src/engine/srv.c
index 246ee975c64..986d8ed04c4 100644
--- a/src/engine/srv.c
+++ b/src/engine/srv.c
@@ -370,6 +370,7 @@ dss_srv_handler(void *arg)
 	int				 rc;
 	bool                             track_mem     = false;
 	bool				 signal_caller = true;
+	bool				 with_chore_queue = dx->dx_iofw && !dx->dx_main_xs;
 
 	rc = dss_xstream_set_affinity(dx);
 	if (rc)
@@ -500,6 +501,16 @@ dss_srv_handler(void *arg)
 		}
 	}
 
+	if (with_chore_queue) {
+		rc = dss_chore_queue_start(dx);
+		if (rc != 0) {
+			DL_ERROR(rc, "failed to start chore queue");
+			ABT_future_set(dx->dx_shutdown, dx);
+			wait_all_exited(dx, dmi);
+			goto nvme_fini;
+		}
+	}
+
 	dmi->dmi_xstream = dx;
 	ABT_mutex_lock(xstream_data.xd_mutex);
 	/* initialized everything for the ULT, notify the creator */
@@ -546,6 +557,9 @@ dss_srv_handler(void *arg)
 	if (dx->dx_comm)
 		dx->dx_progress_started = false;
 
+	if (with_chore_queue)
+		dss_chore_queue_stop(dx);
+
 	wait_all_exited(dx, dmi);
 	if (dmi->dmi_dp) {
 		daos_profile_destroy(dmi->dmi_dp);
@@ -755,6 +769,8 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id)
 	} else {
 		dx->dx_main_xs	= (xs_id >= dss_sys_xs_nr) && (xs_offset == 0);
 	}
+	/* See the DSS_XS_IOFW case in sched_ult2xs. */
+	dx->dx_iofw = xs_id >= dss_sys_xs_nr && (!dx->dx_main_xs || dss_tgt_offload_xs_nr == 0);
 	dx->dx_dsc_started = false;
 
 	/**
@@ -783,6 +799,12 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id)
 		D_GOTO(out_dx, rc);
 	}
 
+	rc = dss_chore_queue_init(dx);
+	if (rc != 0) {
+		DL_ERROR(rc, "initialize chore queue fails");
+		goto out_sched;
+	}
+
 	dss_mem_stats_init(&dx->dx_mem_stats, xs_id);
 
 	/** start XS, ABT rank 0 is reserved for the primary xstream */
@@ -790,7 +812,7 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id)
 					  &dx->dx_xstream);
 	if (rc != ABT_SUCCESS) {
 		D_ERROR("create xstream fails %d\n", rc);
-		D_GOTO(out_sched, rc = dss_abterr2der(rc));
+		D_GOTO(out_chore_queue, rc = dss_abterr2der(rc));
 	}
 
 	rc = ABT_thread_attr_create(&attr);
@@ -839,6 +861,8 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id)
 		ABT_thread_attr_free(&attr);
 	ABT_xstream_join(dx->dx_xstream);
 	ABT_xstream_free(&dx->dx_xstream);
+out_chore_queue:
+	dss_chore_queue_fini(dx);
 out_sched:
 	dss_sched_fini(dx);
 out_dx:
@@ -898,6 +922,7 @@ dss_xstreams_fini(bool force)
 		dx = xstream_data.xd_xs_ptrs[i];
 		if (dx == NULL)
 			continue;
+		dss_chore_queue_fini(dx);
 		dss_sched_fini(dx);
 		dss_xstream_free(dx);
 		xstream_data.xd_xs_ptrs[i] = NULL;
diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h
index 892e6ae3dc4..8621175b44f 100644
--- a/src/engine/srv_internal.h
+++ b/src/engine/srv_internal.h
@@ -60,6 +60,15 @@ struct mem_stats {
 	uint64_t            ms_current;
 };
 
+/* See dss_chore. */
+struct dss_chore_queue {
+	d_list_t   chq_list;
+	bool       chq_stop;
+	ABT_mutex  chq_mutex;
+	ABT_cond   chq_cond;
+	ABT_thread chq_ult;
+};
+
 /** Per-xstream configuration data */
 struct dss_xstream {
 	char			dx_name[DSS_XS_NAME_LEN];
@@ -85,6 +94,7 @@ struct dss_xstream {
 	unsigned int		dx_timeout;
 	bool			dx_main_xs;	/* true for main XS */
 	bool			dx_comm;	/* true with cart context */
+	bool			dx_iofw;	/* true for DSS_XS_IOFW XS */
 	bool			dx_dsc_started;	/* DSC progress ULT started */
 	struct mem_stats        dx_mem_stats;   /* memory usages stats on this xstream */
 #ifdef ULT_MMAP_STACK
@@ -93,6 +103,7 @@ struct dss_xstream {
 #endif
 	bool			dx_progress_started;	/* Network poll started */
 	int                     dx_tag;                 /** tag for xstream */
+	struct dss_chore_queue	dx_chore_queue;
 };
 
 /** Engine module's metrics */
@@ -370,4 +381,9 @@ dss_xstream_has_nvme(struct dss_xstream *dx)
 	return false;
 }
 
+int dss_chore_queue_init(struct dss_xstream *dx);
+int dss_chore_queue_start(struct dss_xstream *dx);
+void dss_chore_queue_stop(struct dss_xstream *dx);
+void dss_chore_queue_fini(struct dss_xstream *dx);
+
 #endif /* __DAOS_SRV_INTERNAL__ */
diff --git a/src/engine/ult.c b/src/engine/ult.c
index 204381755fb..1e8743fcd89 100644
--- a/src/engine/ult.c
+++ b/src/engine/ult.c
@@ -610,3 +610,212 @@ dss_main_exec(void (*func)(void *), void *arg)
 
 	return dss_ult_create(func, arg, DSS_XS_SELF, info->dmi_tgt_id, 0, NULL);
 }
+
+static void
+dss_chore_diy_internal(struct dss_chore *chore)
+{
+reenter:
+	D_DEBUG(DB_TRACE, "%p: status=%d\n", chore, chore->cho_status);
+	chore->cho_status = chore->cho_func(chore, chore->cho_status == DSS_CHORE_YIELD);
+	D_ASSERT(chore->cho_status != DSS_CHORE_NEW);
+	if (chore->cho_status == DSS_CHORE_YIELD) {
+		ABT_thread_yield();
+		goto reenter;
+	}
+}
+
+static void
+dss_chore_ult(void *arg)
+{
+	struct dss_chore *chore = arg;
+
+	dss_chore_diy_internal(chore);
+}
+
+/**
+ * Add \a chore for \a func to the chore queue of some other xstream.
+ *
+ * \param[in]	chore	address of the embedded chore object
+ * \param[in]	func	function to be executed via \a chore
+ *
+ * \retval	-DER_CANCEL	chore queue stopping
+ */
+int
+dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func)
+{
+	struct dss_module_info *info = dss_get_module_info();
+	int                     xs_id;
+	struct dss_xstream     *dx;
+	struct dss_chore_queue *queue;
+
+	chore->cho_status = DSS_CHORE_NEW;
+	chore->cho_func   = func;
+
+	/*
+	 * The dss_chore_queue_ult approach may get insufficient scheduling on
+	 * a "main" xstream when the chore queue is long. So we fall back to
+	 * the one-ULT-per-chore approach if there's no helper xstream.
+	 */
+	if (dss_tgt_offload_xs_nr == 0) {
+		D_INIT_LIST_HEAD(&chore->cho_link);
+		return dss_ult_create(dss_chore_ult, chore, DSS_XS_IOFW, info->dmi_tgt_id,
+				      0 /* stack_size */, NULL /* ult */);
+	}
+
+	/* Find the chore queue. */
+	xs_id = sched_ult2xs(DSS_XS_IOFW, info->dmi_tgt_id);
+	D_ASSERT(xs_id != -DER_INVAL);
+	dx = dss_get_xstream(xs_id);
+	D_ASSERT(dx != NULL);
+	queue = &dx->dx_chore_queue;
+	D_ASSERT(queue != NULL);
+
+	ABT_mutex_lock(queue->chq_mutex);
+	if (queue->chq_stop) {
+		ABT_mutex_unlock(queue->chq_mutex);
+		return -DER_CANCELED;
+	}
+	d_list_add_tail(&chore->cho_link, &queue->chq_list);
+	ABT_cond_broadcast(queue->chq_cond);
+	ABT_mutex_unlock(queue->chq_mutex);
+
+	D_DEBUG(DB_TRACE, "%p: tgt_id=%d -> xs_id=%d dx.tgt_id=%d\n", chore, info->dmi_tgt_id,
+		xs_id, dx->dx_tgt_id);
+	return 0;
+}
+
+/**
+ * Do \a chore for \a func synchronously in the current ULT.
+ *
+ * \param[in]	chore	embedded chore object
+ * \param[in]	func	function to be executed via \a chore
+ */
+void
+dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func)
+{
+	D_INIT_LIST_HEAD(&chore->cho_link);
+	chore->cho_status = DSS_CHORE_NEW;
+	chore->cho_func   = func;
+
+	dss_chore_diy_internal(chore);
+}
+
+static void
+dss_chore_queue_ult(void *arg)
+{
+	struct dss_chore_queue *queue = arg;
+	d_list_t                list  = D_LIST_HEAD_INIT(list);
+
+	D_ASSERT(queue != NULL);
+	D_DEBUG(DB_TRACE, "begin\n");
+
+	for (;;) {
+		struct dss_chore *chore;
+		struct dss_chore *chore_tmp;
+		bool              stop = false;
+
+		/*
+		 * The scheduling order shall be
+		 *
+		 *   [queue->chq_list] [list],
+		 *
+		 * where list contains chores that have returned
+		 * DSS_CHORE_YIELD in the previous iteration.
+		 */
+		ABT_mutex_lock(queue->chq_mutex);
+		for (;;) {
+			if (!d_list_empty(&queue->chq_list)) {
+				d_list_splice_init(&queue->chq_list, &list);
+				break;
+			}
+			if (!d_list_empty(&list))
+				break;
+			if (queue->chq_stop) {
+				stop = true;
+				break;
+			}
+			sched_cond_wait_for_business(queue->chq_cond, queue->chq_mutex);
+		}
+		ABT_mutex_unlock(queue->chq_mutex);
+
+		if (stop)
+			break;
+
+		d_list_for_each_entry_safe(chore, chore_tmp, &list, cho_link) {
+			bool is_reentrance = (chore->cho_status == DSS_CHORE_YIELD);
+
+			D_DEBUG(DB_TRACE, "%p: before: status=%d\n", chore, chore->cho_status);
+			chore->cho_status = chore->cho_func(chore, is_reentrance);
+			D_ASSERT(chore->cho_status != DSS_CHORE_NEW);
+			D_DEBUG(DB_TRACE, "%p: after: status=%d\n", chore, chore->cho_status);
+			if (chore->cho_status == DSS_CHORE_DONE)
+				d_list_del_init(&chore->cho_link);
+			ABT_thread_yield();
+		}
+	}
+
+	D_DEBUG(DB_TRACE, "end\n");
+}
+
+int
+dss_chore_queue_init(struct dss_xstream *dx)
+{
+	struct dss_chore_queue *queue = &dx->dx_chore_queue;
+	int                     rc;
+
+	D_INIT_LIST_HEAD(&queue->chq_list);
+	queue->chq_stop = false;
+
+	rc = ABT_mutex_create(&queue->chq_mutex);
+	if (rc != ABT_SUCCESS) {
+		D_ERROR("failed to create chore queue mutex: %d\n", rc);
+		return dss_abterr2der(rc);
+	}
+
+	rc = ABT_cond_create(&queue->chq_cond);
+	if (rc != ABT_SUCCESS) {
+		D_ERROR("failed to create chore queue condition variable: %d\n", rc);
+		ABT_mutex_free(&queue->chq_mutex);
+		return dss_abterr2der(rc);
+	}
+
+	return 0;
+}
+
+int
+dss_chore_queue_start(struct dss_xstream *dx)
+{
+	struct dss_chore_queue *queue = &dx->dx_chore_queue;
+	int                     rc;
+
+	rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC],
+				    dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL,
+				    &queue->chq_ult);
+	if (rc != 0) {
+		D_ERROR("failed to create chore queue ULT: %d\n", rc);
+		return dss_abterr2der(rc);
+	}
+
+	return 0;
+}
+
+void
+dss_chore_queue_stop(struct dss_xstream *dx)
+{
+	struct dss_chore_queue *queue = &dx->dx_chore_queue;
+
+	ABT_mutex_lock(queue->chq_mutex);
+	queue->chq_stop = true;
+	ABT_cond_broadcast(queue->chq_cond);
+	ABT_mutex_unlock(queue->chq_mutex);
+	ABT_thread_free(&queue->chq_ult);
+}
+
+void
+dss_chore_queue_fini(struct dss_xstream *dx)
+{
+	struct dss_chore_queue *queue = &dx->dx_chore_queue;
+
+	ABT_cond_free(&queue->chq_cond);
+	ABT_mutex_free(&queue->chq_mutex);
+}
diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h
index afdf267cd60..06a927b8d3f 100644
--- a/src/include/daos_srv/daos_engine.h
+++ b/src/include/daos_srv/daos_engine.h
@@ -338,6 +338,13 @@ int sched_req_space_check(struct sched_request *req);
  */
 void sched_cond_wait(ABT_cond cond, ABT_mutex mutex);
 
+/**
+ * Wrapper of ABT_cond_wait(), inform scheduler that it's going
+ * to be blocked for a relative long time. Unlike sched_cond_wait,
+ * after waking up, this function will prevent relaxing for a while.
+ */
+void sched_cond_wait_for_business(ABT_cond cond, ABT_mutex mutex);
+
 /**
  * Get current monotonic time in milli-seconds.
  */
@@ -812,4 +819,40 @@ enum dss_drpc_call_flag {
 int dss_drpc_call(int32_t module, int32_t method, void *req, size_t req_size,
 		  unsigned int flags, Drpc__Response **resp);
 
+/** Status of a chore */
+enum dss_chore_status {
+	DSS_CHORE_NEW,		/**< ready to be scheduled for the first time (private) */
+	DSS_CHORE_YIELD,	/**< ready to be scheduled again */
+	DSS_CHORE_DONE		/**< no more scheduling required */
+};
+
+struct dss_chore;
+
+/**
+ * Must return either DSS_CHORE_YIELD (if yielding to other chores) or
+ * DSS_CHORE_DONE (if terminating). If \a is_reentrance is true, this is not
+ * the first time \a chore is scheduled. A typical implementation shall
+ * initialize its internal state variables if \a is_reentrance is false. See
+ * dtx_leader_exec_ops_chore for an example.
+ */
+typedef enum dss_chore_status (*dss_chore_func_t)(struct dss_chore *chore, bool is_reentrance);
+
+/**
+ * Chore (opaque)
+ *
+ * A simple task (e.g., an I/O forwarding task) that yields by returning
+ * DSS_CHORE_YIELD instead of calling ABT_thread_yield. This data structure
+ * shall be embedded in the user's own task data structure, which typically
+ * also includes arguments and internal state variables for \a cho_func. All
+ * fields are private. See dtx_chore for an example.
+ */
+struct dss_chore {
+	d_list_t              cho_link;
+	enum dss_chore_status cho_status;
+	dss_chore_func_t      cho_func;
+};
+
+int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func);
+void dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func);
+
 #endif /* __DSS_API_H__ */