diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 77aec06240ef..2226f589e2b2 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1913,10 +1913,15 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti, */ #define DTX_EXEC_STEP_LENGTH DTX_THRESHOLD_COUNT -struct dtx_ult_arg { +struct dtx_chore { + struct dss_chore chore; dtx_sub_func_t func; void *func_arg; struct dtx_leader_handle *dlh; + + /* Chore-internal state variables */ + uint32_t i; + uint32_t j; }; static void @@ -1969,20 +1974,31 @@ dtx_sub_comp_cb(struct dtx_leader_handle *dlh, int idx, int rc) idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags, rc); } -static void -dtx_leader_exec_ops_ult(void *arg) +static enum dss_chore_status +dtx_leader_exec_ops_chore(struct dss_chore *chore, bool is_reentrance) { - struct dtx_ult_arg *ult_arg = arg; - struct dtx_leader_handle *dlh = ult_arg->dlh; + struct dtx_chore *dtx_chore = container_of(chore, struct dtx_chore, chore); + struct dtx_leader_handle *dlh = dtx_chore->dlh; struct dtx_sub_status *sub; struct daos_shard_tgt *tgt; - uint32_t i; - uint32_t j; - uint32_t k; int rc = 0; - for (i = dlh->dlh_forward_idx, j = 0, k = 0; j < dlh->dlh_forward_cnt; i++, j++) { - sub = &dlh->dlh_subs[i]; + /* + * If this is the first entrance, initialize the chore-internal state + * variables. + */ + if (!is_reentrance) { + D_DEBUG(DB_TRACE, "%p: initialize: forward_idx=%u forward_cnt=%u\n", chore, + dlh->dlh_forward_idx, dlh->dlh_forward_cnt); + dtx_chore->i = dlh->dlh_forward_idx; + dtx_chore->j = 0; + } else { + D_DEBUG(DB_TRACE, "%p: resume: i=%u j=%u forward_cnt=%u\n", chore, dtx_chore->i, + dtx_chore->j, dlh->dlh_forward_cnt); + } + + while (dtx_chore->j < dlh->dlh_forward_cnt) { + sub = &dlh->dlh_subs[dtx_chore->i]; tgt = &sub->dss_tgt; if (dlh->dlh_normal_sub_done == 0) { @@ -1990,7 +2006,7 @@ dtx_leader_exec_ops_ult(void *arg) sub->dss_comp = 0; if (unlikely(tgt->st_flags & DTF_DELAY_FORWARD)) { - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); continue; } } else { @@ -2002,33 +2018,38 @@ dtx_leader_exec_ops_ult(void *arg) } if (tgt->st_rank == DAOS_TGT_IGNORE || - (i == daos_fail_value_get() && DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) { + (dtx_chore->i == daos_fail_value_get() && + DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) { if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); continue; } - rc = ult_arg->func(dlh, ult_arg->func_arg, i, dtx_sub_comp_cb); + rc = dtx_chore->func(dlh, dtx_chore->func_arg, dtx_chore->i, dtx_sub_comp_cb); if (rc != 0) { if (sub->dss_comp == 0) - dtx_sub_comp_cb(dlh, i, rc); + dtx_sub_comp_cb(dlh, dtx_chore->i, rc); break; } + dtx_chore->i++; + dtx_chore->j++; + /* Yield to avoid holding CPU for too long time. */ - if ((++k) % DTX_RPC_YIELD_THD == 0) - ABT_thread_yield(); + if (dtx_chore->j % DTX_RPC_YIELD_THD == 0) + return DSS_CHORE_READY; } if (rc != 0) { - for (i++, j++; j < dlh->dlh_forward_cnt; i++, j++) { - sub = &dlh->dlh_subs[i]; + for (dtx_chore->i++, dtx_chore->j++; dtx_chore->j < dlh->dlh_forward_cnt; + dtx_chore->i++, dtx_chore->j++) { + sub = &dlh->dlh_subs[dtx_chore->i]; tgt = &sub->dss_tgt; if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) { sub->dss_result = 0; sub->dss_comp = 0; - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); } } } @@ -2038,6 +2059,8 @@ dtx_leader_exec_ops_ult(void *arg) D_ASSERTF(rc == ABT_SUCCESS, "ABT_future_set failed [%u, %u), for delay %s: %d\n", dlh->dlh_forward_idx, dlh->dlh_forward_idx + dlh->dlh_forward_cnt, dlh->dlh_normal_sub_done == 1 ? "yes" : "no", rc); + + return DSS_CHORE_DONE; } /** @@ -2047,15 +2070,15 @@ int dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg) { - struct dtx_ult_arg ult_arg; + struct dtx_chore dtx_chore; int sub_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; int rc = 0; int local_rc = 0; int remote_rc = 0; - ult_arg.func = func; - ult_arg.func_arg = func_arg; - ult_arg.dlh = dlh; + dtx_chore.func = func; + dtx_chore.func_arg = func_arg; + dtx_chore.dlh = dlh; dlh->dlh_result = 0; dlh->dlh_allow_failure = allow_failure; @@ -2091,17 +2114,10 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, D_GOTO(out, rc = dss_abterr2der(rc)); } - /* - * NOTE: Ideally, we probably should create ULT for each shard, but for performance - * reasons, let's only create one for all remote targets for now. Moreover, - * we assume that func does not require deep stacks to forward the remote - * requests (dtx_leader_exec_ops_ult does not execute the local part of func). - */ - rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW, - dss_get_module_info()->dmi_tgt_id, 0, NULL); + rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); if (rc != 0) { - D_ERROR("ult create failed [%u, %u] (2): "DF_RC"\n", - dlh->dlh_forward_idx, dlh->dlh_forward_cnt, DP_RC(rc)); + DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx, + dlh->dlh_forward_cnt); ABT_future_free(&dlh->dlh_future); goto out; } @@ -2169,11 +2185,9 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, /* The ones without DELAY flag will be skipped when scan the targets array. */ dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; - /* See also the dss_ult_create above. */ - rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW, - dss_get_module_info()->dmi_tgt_id, 0, NULL); + rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); if (rc != 0) { - D_ERROR("ult create failed (4): "DF_RC"\n", DP_RC(rc)); + DL_ERROR(rc, "chore create failed (4)"); ABT_future_free(&dlh->dlh_future); goto out; } diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 5c4c44c90359..058b44bf1172 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -363,6 +363,8 @@ dtx_req_wait(struct dtx_req_args *dra) } struct dtx_common_args { + struct dss_chore dca_chore; + ABT_eventual dca_chore_eventual; struct dtx_req_args dca_dra; d_list_t dca_head; struct btr_root dca_tree_root; @@ -373,57 +375,76 @@ struct dtx_common_args { d_rank_t dca_rank; uint32_t dca_tgtid; struct ds_cont_child *dca_cont; - ABT_thread dca_helper; struct dtx_id dca_dti_inline; struct dtx_id *dca_dtis; struct dtx_entry **dca_dtes; + + /* Chore-internal state variables */ + struct dtx_req_rec *dca_drr; + int dca_i; }; +/* If is_reentrance, this function ignores len. */ static int -dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len) +dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len, bool is_reentrance) { struct dtx_req_args *dra = &dca->dca_dra; - struct dtx_req_rec *drr; int rc; - int i = 0; - dra->dra_length = len; + if (!is_reentrance) { + dra->dra_length = len; + + rc = ABT_future_create(len, dtx_req_list_cb, &dra->dra_future); + if (rc != ABT_SUCCESS) { + D_ERROR("ABT_future_create failed for opc %x, len = %d: " + "rc = %d.\n", dra->dra_opc, len, rc); + return dss_abterr2der(rc); + } - rc = ABT_future_create(len, dtx_req_list_cb, &dra->dra_future); - if (rc != ABT_SUCCESS) { - D_ERROR("ABT_future_create failed for opc %x, len = %d: " - "rc = %d.\n", dra->dra_opc, len, rc); - return dss_abterr2der(rc); + D_DEBUG(DB_TRACE, "%p: DTX req for opc %x, future %p (%d) start.\n", + &dca->dca_chore, dra->dra_opc, dra->dra_future, len); } - D_DEBUG(DB_TRACE, "DTX req for opc %x, future %p start.\n", dra->dra_opc, dra->dra_future); + /* + * Begin or continue an iteration over dca_head. When beginning the + * iteration, dca->dca_drr does not point to a real entry, and is only + * safe for d_list_for_each_entry_continue. + */ + if (!is_reentrance) { + dca->dca_drr = d_list_entry(&dca->dca_head, struct dtx_req_rec, drr_link); + dca->dca_i = 0; + } + /* DO NOT add any line here! See the comment on dca->dca_drr above. */ + d_list_for_each_entry_continue(dca->dca_drr, &dca->dca_head, drr_link) + { + D_DEBUG(DB_TRACE, "chore=%p: drr=%p i=%d\n", &dca->dca_chore, dca->dca_drr, + dca->dca_i); - d_list_for_each_entry(drr, &dca->dca_head, drr_link) { - drr->drr_parent = dra; - drr->drr_result = 0; + dca->dca_drr->drr_parent = dra; + dca->dca_drr->drr_result = 0; - if (unlikely(dra->dra_opc == DTX_COMMIT && i == 0 && + if (unlikely(dra->dra_opc == DTX_COMMIT && dca->dca_i == 0 && DAOS_FAIL_CHECK(DAOS_DTX_FAIL_COMMIT))) - rc = dtx_req_send(drr, 1); + rc = dtx_req_send(dca->dca_drr, 1); else - rc = dtx_req_send(drr, epoch); + rc = dtx_req_send(dca->dca_drr, epoch); if (rc != 0) { /* If the first sub-RPC failed, then break, otherwise * other remote replicas may have already received the * RPC and executed it, so have to go ahead. */ - if (i == 0) { + if (dca->dca_i == 0) { ABT_future_free(&dra->dra_future); return rc; } } /* Yield to avoid holding CPU for too long time. */ - if (++i % DTX_RPC_YIELD_THD == 0) - ABT_thread_yield(); + if (++(dca->dca_i) % DTX_RPC_YIELD_THD == 0) + return DSS_CHORE_READY; } - return 0; + return DSS_CHORE_DONE; } static int @@ -599,15 +620,21 @@ dtx_classify_one(struct ds_pool *pool, daos_handle_t tree, d_list_t *head, int * return rc > 0 ? 0 : rc; } -static int -dtx_rpc_internal(struct dtx_common_args *dca) +static enum dss_chore_status +dtx_rpc_helper(struct dss_chore *chore, bool is_reentrance) { + struct dtx_common_args *dca = container_of(chore, struct dtx_common_args, dca_chore); struct ds_pool *pool = dca->dca_cont->sc_pool->spc_pool; struct umem_attr uma = { 0 }; int length = 0; int rc; int i; + if (is_reentrance) { + D_DEBUG(DB_TRACE, "%p: skip to send\n", &dca->dca_chore); + goto send; + } + if (dca->dca_dra.dra_opc != DTX_REFRESH) { D_ASSERT(dca->dca_dtis != NULL); @@ -616,7 +643,7 @@ dtx_rpc_internal(struct dtx_common_args *dca) rc = dbtree_create_inplace(DBTREE_CLASS_DTX_CF, 0, DTX_CF_BTREE_ORDER, &uma, &dca->dca_tree_root, &dca->dca_tree_hdl); if (rc != 0) - return rc; + goto done; } ABT_rwlock_rdlock(pool->sp_lock); @@ -626,7 +653,7 @@ dtx_rpc_internal(struct dtx_common_args *dca) dca->dca_rank, dca->dca_tgtid); if (rc < 0) { ABT_rwlock_unlock(pool->sp_lock); - return rc; + goto done; } daos_dti_copy(&dca->dca_dtis[i], &dca->dca_dtes[i]->dte_xid); @@ -636,30 +663,31 @@ dtx_rpc_internal(struct dtx_common_args *dca) /* For DTX_CHECK, if no other available target(s), then current target is the * unique valid one (and also 'prepared'), then related DTX can be committed. */ - if (d_list_empty(&dca->dca_head)) - return dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0; + if (d_list_empty(&dca->dca_head)) { + rc = (dca->dca_dra.dra_opc == DTX_CHECK ? DTX_ST_PREPARED : 0); + goto done; + } } else { length = dca->dca_count; } D_ASSERT(length > 0); - return dtx_req_list_send(dca, dca->dca_epoch, length); -} - -static void -dtx_rpc_helper(void *arg) -{ - struct dtx_common_args *dca = arg; - int rc; - - rc = dtx_rpc_internal(dca); +send: + rc = dtx_req_list_send(dca, dca->dca_epoch, length, is_reentrance); + if (rc == DSS_CHORE_READY) + return rc; +done: if (rc != 0) dca->dca_dra.dra_result = rc; - - D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, - "DTX helper ULT for %u exit: %d\n", dca->dca_dra.dra_opc, rc); + D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, "%p: DTX RPC chore for %u done: %d\n", chore, + dca->dca_dra.dra_opc, rc); + if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) { + rc = ABT_eventual_set(dca->dca_chore_eventual, NULL, 0); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_set: %d\n", rc); + } + return DSS_CHORE_DONE; } static int @@ -672,6 +700,7 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** memset(dca, 0, sizeof(*dca)); + dca->dca_chore_eventual = ABT_EVENTUAL_NULL; D_INIT_LIST_HEAD(&dca->dca_head); dca->dca_tree_hdl = DAOS_HDL_INVAL; dca->dca_epoch = epoch; @@ -679,7 +708,6 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** crt_group_rank(NULL, &dca->dca_rank); dca->dca_tgtid = dss_get_module_info()->dmi_tgt_id; dca->dca_cont = cont; - dca->dca_helper = ABT_THREAD_NULL; dca->dca_dtes = dtes; dra = &dca->dca_dra; @@ -705,11 +733,18 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry ** } /* Use helper ULT to handle DTX RPC if there are enough helper XS. */ - if (dss_has_enough_helper()) - rc = dss_ult_create(dtx_rpc_helper, dca, DSS_XS_IOFW, dca->dca_tgtid, 0, - &dca->dca_helper); - else - rc = dtx_rpc_internal(dca); + if (dss_has_enough_helper()) { + rc = ABT_eventual_create(0, &dca->dca_chore_eventual); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create eventual: %d\n", rc); + rc = dss_abterr2der(rc); + goto out; + } + rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); + } else { + dss_chore_diy(&dca->dca_chore, dtx_rpc_helper); + rc = dca->dca_dra.dra_result; + } out: return rc; @@ -721,8 +756,12 @@ dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head) struct dtx_req_rec *drr; int rc; - if (dca->dca_helper != ABT_THREAD_NULL) - ABT_thread_free(&dca->dca_helper); + if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) { + rc = ABT_eventual_wait(dca->dca_chore_eventual, NULL); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_wait: %d\n", rc); + rc = ABT_eventual_free(&dca->dca_chore_eventual); + D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc); + } rc = dtx_req_wait(&dca->dca_dra); diff --git a/src/engine/srv.c b/src/engine/srv.c index aa6cbd706e8f..1b192f807c5c 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -518,6 +518,15 @@ dss_srv_handler(void *arg) } } + /* Some xstreams do not need this, actually... */ + rc = dss_chore_queue_init(dx); + if (rc != 0) { + DL_ERROR(rc, "failed to initialize chore queue"); + ABT_future_set(dx->dx_shutdown, dx); + wait_all_exited(dx, dmi); + goto nvme_fini; + } + dmi->dmi_xstream = dx; ABT_mutex_lock(xstream_data.xd_mutex); /* initialized everything for the ULT, notify the creator */ @@ -564,12 +573,15 @@ dss_srv_handler(void *arg) if (dx->dx_comm) dx->dx_progress_started = false; + dss_chore_queue_stop(dx); + wait_all_exited(dx, dmi); if (dmi->dmi_dp) { daos_profile_destroy(dmi->dmi_dp); dmi->dmi_dp = NULL; } + dss_chore_queue_fini(dx); nvme_fini: if (dss_xstream_has_nvme(dx)) bio_xsctxt_free(dmi->dmi_nvme_ctxt); diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 92504c026ca3..df35ac659e94 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -102,6 +102,7 @@ struct dss_xstream { #endif bool dx_progress_started; /* Network poll started */ int dx_tag; /** tag for xstream */ + struct dss_chore_queue *dx_chore_queue; }; /** Engine module's metrics */ @@ -376,4 +377,8 @@ dss_xstream_has_nvme(struct dss_xstream *dx) return false; } +int dss_chore_queue_init(struct dss_xstream *dx); +void dss_chore_queue_stop(struct dss_xstream *dx); +void dss_chore_queue_fini(struct dss_xstream *dx); + #endif /* __DAOS_SRV_INTERNAL__ */ diff --git a/src/engine/ult.c b/src/engine/ult.c index 204381755fb1..0121bc43cf66 100644 --- a/src/engine/ult.c +++ b/src/engine/ult.c @@ -610,3 +610,189 @@ dss_main_exec(void (*func)(void *), void *arg) return dss_ult_create(func, arg, DSS_XS_SELF, info->dmi_tgt_id, 0, NULL); } + +struct dss_chore_queue { + d_list_t chq_list; + bool chq_stop; + ABT_mutex chq_mutex; + ABT_cond chq_cond; + ABT_thread chq_ult; +}; + +/** + * Add \a chore for \a func to the chore queue of some other xstream. + * + * \param[in] chore address of the embedded chore object + * \param[in] func function to be executed via \a chore + * + * \retval -DER_CANCEL chore queue stopping + */ +int +dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) +{ + struct dss_module_info *info = dss_get_module_info(); + int xs_id; + struct dss_xstream *dx; + struct dss_chore_queue *queue; + + chore->cho_status = DSS_CHORE_NEW; + chore->cho_func = func; + + /* Find the chore queue. */ + xs_id = sched_ult2xs(DSS_XS_IOFW, info->dmi_tgt_id); + D_ASSERT(xs_id != -DER_INVAL); + dx = dss_get_xstream(xs_id); + D_ASSERT(dx != NULL); + queue = dx->dx_chore_queue; + D_ASSERT(queue != NULL); + + ABT_mutex_lock(queue->chq_mutex); + if (queue->chq_stop) { + ABT_mutex_unlock(queue->chq_mutex); + return -DER_CANCELED; + } + d_list_add_tail(&chore->cho_link, &queue->chq_list); + ABT_cond_broadcast(queue->chq_cond); + ABT_mutex_unlock(queue->chq_mutex); + + D_DEBUG(DB_TRACE, "%p: tgt_id=%d -> xs_id=%d dx.tgt_id=%d\n", chore, info->dmi_tgt_id, + xs_id, dx->dx_tgt_id); + return 0; +} + +/** + * Do \a chore for \a func synchronously in the current ULT. + * + * \param[in] chore embedded chore object + * \param[in] func function to be executed via \a chore + */ +void +dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func) +{ + enum dss_chore_status status; + bool is_reentrance = false; + +reenter: + D_DEBUG(DB_TRACE, "%p: status=%d\n", chore, chore->cho_status); + status = func(chore, is_reentrance); + D_ASSERT(status != DSS_CHORE_NEW); + if (status == DSS_CHORE_READY) { + ABT_thread_yield(); + is_reentrance = true; + goto reenter; + } +} + +static void +dss_chore_queue_ult(void *arg) +{ + struct dss_chore_queue *queue = arg; + + D_ASSERT(queue != NULL); + D_DEBUG(DB_TRACE, "begin\n"); + + for (;;) { + struct dss_chore *chore = NULL; + bool stop = false; + + ABT_mutex_lock(queue->chq_mutex); + for (;;) { + chore = d_list_pop_entry(&queue->chq_list, struct dss_chore, cho_link); + if (chore != NULL) + break; + if (queue->chq_stop) { + stop = true; + break; + } + ABT_cond_wait(queue->chq_cond, queue->chq_mutex); + } + ABT_mutex_unlock(queue->chq_mutex); + + if (stop) + break; + + D_DEBUG(DB_TRACE, "%p: status=%d\n", chore, chore->cho_status); + chore->cho_status = chore->cho_func(chore, chore->cho_status == DSS_CHORE_READY); + D_ASSERT(chore->cho_status != DSS_CHORE_NEW); + if (chore->cho_status == DSS_CHORE_READY) { + ABT_mutex_lock(queue->chq_mutex); + d_list_add_tail(&chore->cho_link, &queue->chq_list); + ABT_mutex_unlock(queue->chq_mutex); + } + + ABT_thread_yield(); + } + + D_DEBUG(DB_TRACE, "end\n"); +} + +int +dss_chore_queue_init(struct dss_xstream *dx) +{ + struct dss_chore_queue *queue; + int rc; + + D_ALLOC_PTR(queue); + if (queue == NULL) { + rc = -DER_NOMEM; + goto err; + } + + D_INIT_LIST_HEAD(&queue->chq_list); + queue->chq_stop = false; + + rc = ABT_mutex_create(&queue->chq_mutex); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create chore queue mutex: %d\n", rc); + rc = dss_abterr2der(rc); + goto err_queue; + } + + rc = ABT_cond_create(&queue->chq_cond); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create chore queue condition variable: %d\n", rc); + rc = dss_abterr2der(rc); + goto err_mutex; + } + + rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC], + dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL, + &queue->chq_ult); + if (rc != 0) { + D_ERROR("failed to create chore queue ULT: %d\n", rc); + rc = dss_abterr2der(rc); + goto err_cond; + } + + dx->dx_chore_queue = queue; + return 0; + +err_cond: + ABT_cond_free(&queue->chq_cond); +err_mutex: + ABT_mutex_free(&queue->chq_mutex); +err_queue: + D_FREE(queue); +err: + return rc; +} + +void +dss_chore_queue_stop(struct dss_xstream *dx) +{ + ABT_mutex_lock(dx->dx_chore_queue->chq_mutex); + dx->dx_chore_queue->chq_stop = true; + ABT_cond_broadcast(dx->dx_chore_queue->chq_cond); + ABT_mutex_unlock(dx->dx_chore_queue->chq_mutex); +} + +void +dss_chore_queue_fini(struct dss_xstream *dx) +{ + dx->dx_chore_queue->chq_stop = true; + ABT_cond_broadcast(dx->dx_chore_queue->chq_cond); + ABT_thread_free(&dx->dx_chore_queue->chq_ult); + ABT_cond_free(&dx->dx_chore_queue->chq_cond); + ABT_mutex_free(&dx->dx_chore_queue->chq_mutex); + D_FREE(dx->dx_chore_queue); +} diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index 498715b3d12f..8719a903d9cf 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -821,4 +821,40 @@ enum dss_drpc_call_flag { int dss_drpc_call(int32_t module, int32_t method, void *req, size_t req_size, unsigned int flags, Drpc__Response **resp); +/** Status of a chore */ +enum dss_chore_status { + DSS_CHORE_NEW, /**< ready to be scheduled for the first time (private) */ + DSS_CHORE_READY, /**< ready to be scheduled */ + DSS_CHORE_DONE /**< no more scheduling required */ +}; + +struct dss_chore; + +/** + * Must return either DSS_CHORE_READY (if yielding to other chores) or + * DSS_CHORE_DONE (if terminating). If \a is_reentrance is true, this is not + * the first time \a chore is scheduled. A typical implementation shall + * initialize its internal state variables if \a is_reentrance is false. See + * dtx_leader_exec_ops_chore for an example. + */ +typedef enum dss_chore_status (*dss_chore_func_t)(struct dss_chore *chore, bool is_reentrance); + +/** + * Chore (opaque) + * + * A simple task (e.g., an I/O forwarding task) that yields by returning + * DSS_CHORE_READY instead of calling ABT_thread_yield. This data structure + * shall be embedded in the user's own task data structure, which typically + * also includes arguments and internal state variables for \a cho_func. All + * fields are private. See dtx_chore for an example. + */ +struct dss_chore { + d_list_t cho_link; + enum dss_chore_status cho_status; + dss_chore_func_t cho_func; +}; + +int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func); +void dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func); + #endif /* __DSS_API_H__ */