diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 77aec06240ef..a0a5b0002c85 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1913,10 +1913,15 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti, */ #define DTX_EXEC_STEP_LENGTH DTX_THRESHOLD_COUNT -struct dtx_ult_arg { +struct dtx_chore { + struct dss_chore chore; dtx_sub_func_t func; void *func_arg; struct dtx_leader_handle *dlh; + + /* Chore-internal state variables */ + uint32_t i; + uint32_t j; }; static void @@ -1969,20 +1974,30 @@ dtx_sub_comp_cb(struct dtx_leader_handle *dlh, int idx, int rc) idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags, rc); } -static void -dtx_leader_exec_ops_ult(void *arg) +static enum dss_chore_status +dtx_leader_exec_ops_chore(struct dss_chore *chore, bool is_reentrance) { - struct dtx_ult_arg *ult_arg = arg; - struct dtx_leader_handle *dlh = ult_arg->dlh; + struct dtx_chore *dtx_chore = container_of(chore, struct dtx_chore, chore); + struct dtx_leader_handle *dlh = dtx_chore->dlh; struct dtx_sub_status *sub; struct daos_shard_tgt *tgt; - uint32_t i; - uint32_t j; - uint32_t k; int rc = 0; - for (i = dlh->dlh_forward_idx, j = 0, k = 0; j < dlh->dlh_forward_cnt; i++, j++) { - sub = &dlh->dlh_subs[i]; + /* + * If this is the first entrance, initialize the chore-internal state + * variables. + */ + if (!is_reentrance) { + D_DEBUG(DB_TRACE, "initialize: chore=%p\n", chore); + dtx_chore->i = dlh->dlh_forward_idx; + dtx_chore->j = 0; + } else { + D_DEBUG(DB_TRACE, "resume: chore=%p i=%u j=%u\n", chore, dtx_chore->i, + dtx_chore->j); + } + + while (dtx_chore->j < dlh->dlh_forward_cnt) { + sub = &dlh->dlh_subs[dtx_chore->i]; tgt = &sub->dss_tgt; if (dlh->dlh_normal_sub_done == 0) { @@ -1990,7 +2005,7 @@ dtx_leader_exec_ops_ult(void *arg) sub->dss_comp = 0; if (unlikely(tgt->st_flags & DTF_DELAY_FORWARD)) { - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); continue; } } else { @@ -2002,33 +2017,38 @@ dtx_leader_exec_ops_ult(void *arg) } if (tgt->st_rank == DAOS_TGT_IGNORE || - (i == daos_fail_value_get() && DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) { + (dtx_chore->i == daos_fail_value_get() && + DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) { if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); continue; } - rc = ult_arg->func(dlh, ult_arg->func_arg, i, dtx_sub_comp_cb); + rc = dtx_chore->func(dlh, dtx_chore->func_arg, dtx_chore->i, dtx_sub_comp_cb); if (rc != 0) { if (sub->dss_comp == 0) - dtx_sub_comp_cb(dlh, i, rc); + dtx_sub_comp_cb(dlh, dtx_chore->i, rc); break; } + dtx_chore->i++; + dtx_chore->j++; + /* Yield to avoid holding CPU for too long time. */ - if ((++k) % DTX_RPC_YIELD_THD == 0) - ABT_thread_yield(); + if (dtx_chore->j % DTX_RPC_YIELD_THD == 0) + return DSS_CHORE_READY; } if (rc != 0) { - for (i++, j++; j < dlh->dlh_forward_cnt; i++, j++) { - sub = &dlh->dlh_subs[i]; + for (dtx_chore->i++, dtx_chore->j++; dtx_chore->j < dlh->dlh_forward_cnt; + dtx_chore->i++, dtx_chore->j++) { + sub = &dlh->dlh_subs[dtx_chore->i]; tgt = &sub->dss_tgt; if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) { sub->dss_result = 0; sub->dss_comp = 0; - dtx_sub_comp_cb(dlh, i, 0); + dtx_sub_comp_cb(dlh, dtx_chore->i, 0); } } } @@ -2038,6 +2058,8 @@ dtx_leader_exec_ops_ult(void *arg) D_ASSERTF(rc == ABT_SUCCESS, "ABT_future_set failed [%u, %u), for delay %s: %d\n", dlh->dlh_forward_idx, dlh->dlh_forward_idx + dlh->dlh_forward_cnt, dlh->dlh_normal_sub_done == 1 ? "yes" : "no", rc); + + return DSS_CHORE_DONE; } /** @@ -2047,15 +2069,15 @@ int dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg) { - struct dtx_ult_arg ult_arg; + struct dtx_chore dtx_chore; int sub_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; int rc = 0; int local_rc = 0; int remote_rc = 0; - ult_arg.func = func; - ult_arg.func_arg = func_arg; - ult_arg.dlh = dlh; + dtx_chore.func = func; + dtx_chore.func_arg = func_arg; + dtx_chore.dlh = dlh; dlh->dlh_result = 0; dlh->dlh_allow_failure = allow_failure; @@ -2091,17 +2113,10 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, D_GOTO(out, rc = dss_abterr2der(rc)); } - /* - * NOTE: Ideally, we probably should create ULT for each shard, but for performance - * reasons, let's only create one for all remote targets for now. Moreover, - * we assume that func does not require deep stacks to forward the remote - * requests (dtx_leader_exec_ops_ult does not execute the local part of func). - */ - rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW, - dss_get_module_info()->dmi_tgt_id, 0, NULL); + rc = dss_chore_add(&dtx_chore.chore, dtx_leader_exec_ops_chore); if (rc != 0) { - D_ERROR("ult create failed [%u, %u] (2): "DF_RC"\n", - dlh->dlh_forward_idx, dlh->dlh_forward_cnt, DP_RC(rc)); + DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx, + dlh->dlh_forward_cnt); ABT_future_free(&dlh->dlh_future); goto out; } @@ -2169,11 +2184,9 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, /* The ones without DELAY flag will be skipped when scan the targets array. */ dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; - /* See also the dss_ult_create above. */ - rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW, - dss_get_module_info()->dmi_tgt_id, 0, NULL); + rc = dss_chore_add(&dtx_chore.chore, dtx_leader_exec_ops_chore); if (rc != 0) { - D_ERROR("ult create failed (4): "DF_RC"\n", DP_RC(rc)); + DL_ERROR(rc, "chore create failed (4)"); ABT_future_free(&dlh->dlh_future); goto out; } diff --git a/src/engine/srv.c b/src/engine/srv.c index e8c498fa6f3a..9700af550dac 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -463,6 +463,12 @@ dss_srv_handler(void *arg) goto crt_destroy; } + rc = dss_chore_queue_init(dx); + if (rc != 0) { + DL_ERROR(rc, "failed to initialize chore queue"); + goto tse_fini; + } + if (dss_xstream_has_nvme(dx)) { ABT_thread_attr attr; @@ -473,7 +479,7 @@ dss_srv_handler(void *arg) if (rc != 0) { D_ERROR("failed to init spdk context for xstream(%d) " "rc:%d\n", dmi->dmi_xs_id, rc); - D_GOTO(tse_fini, rc); + D_GOTO(chore_queue_fini, rc); } rc = ABT_thread_attr_create(&attr); @@ -555,6 +561,8 @@ dss_srv_handler(void *arg) nvme_fini: if (dss_xstream_has_nvme(dx)) bio_xsctxt_free(dmi->dmi_nvme_ctxt); +chore_queue_fini: + dss_chore_queue_fini(dx); tse_fini: tse_sched_fini(&dx->dx_sched_dsc); crt_destroy: diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 4fbe5d386d7a..fcb2f6d9f450 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -93,6 +93,7 @@ struct dss_xstream { #endif bool dx_progress_started; /* Network poll started */ int dx_tag; /** tag for xstream */ + struct dss_chore_queue *dx_chore_queue; }; /** Engine module's metrics */ @@ -367,4 +368,7 @@ dss_xstream_has_nvme(struct dss_xstream *dx) return false; } +int dss_chore_queue_init(struct dss_xstream *dx); +void dss_chore_queue_fini(struct dss_xstream *dx); + #endif /* __DAOS_SRV_INTERNAL__ */ diff --git a/src/engine/ult.c b/src/engine/ult.c index 204381755fb1..7d7b46dde4f8 100644 --- a/src/engine/ult.c +++ b/src/engine/ult.c @@ -610,3 +610,155 @@ dss_main_exec(void (*func)(void *), void *arg) return dss_ult_create(func, arg, DSS_XS_SELF, info->dmi_tgt_id, 0, NULL); } + +struct dss_chore_queue { + d_list_t chq_list; + bool chq_stop; + ABT_mutex chq_mutex; + ABT_cond chq_cond; + ABT_thread chq_ult; +}; + +/** + * Add a chore for \a func. + * + * \param[in] chore embedded chore object + * \param[in] func function to be executed via \a chore + * + * \retval -DER_OP_CANCEL chore queue stopping + */ +int +dss_chore_add(struct dss_chore *chore, dss_chore_func_t func) +{ + int xs_id; + struct dss_xstream *dx; + struct dss_chore_queue *queue; + + chore->cho_status = DSS_CHORE_NEW; + chore->cho_func = func; + + /* Find our chore queue. */ + xs_id = sched_ult2xs(DSS_XS_IOFW, dss_get_module_info()->dmi_tgt_id); + D_ASSERT(xs_id != -DER_INVAL); + dx = dss_get_xstream(xs_id); + D_ASSERT(dx != NULL); + queue = dx->dx_chore_queue; + D_ASSERT(queue != NULL); + D_DEBUG(DB_TRACE, "chore=%p: tgt_id=%d -> xs_id=%d dx.tgt_id=%d\n", chore, + dss_get_module_info()->dmi_tgt_id, xs_id, dx->dx_tgt_id); + + ABT_mutex_lock(queue->chq_mutex); + if (queue->chq_stop) { + ABT_mutex_unlock(queue->chq_mutex); + return -DER_OP_CANCELED; + } + d_list_add_tail(&chore->cho_link, &queue->chq_list); + ABT_cond_broadcast(queue->chq_cond); + ABT_mutex_unlock(queue->chq_mutex); + + return 0; +} + +static void +dss_chore_queue_ult(void *arg) +{ + struct dss_chore_queue *queue = arg; + + D_ASSERT(queue != NULL); + D_DEBUG(DB_TRACE, "begin\n"); + + for (;;) { + struct dss_chore *chore = NULL; + bool stop = false; + + ABT_mutex_lock(queue->chq_mutex); + for (;;) { + chore = d_list_pop_entry(&queue->chq_list, struct dss_chore, cho_link); + if (chore != NULL) + break; + if (queue->chq_stop) { + stop = true; + break; + } + ABT_cond_wait(queue->chq_cond, queue->chq_mutex); + } + ABT_mutex_unlock(queue->chq_mutex); + + if (stop) + break; + + D_DEBUG(DB_TRACE, "scheduling: chore=%p chore.status=%d\n", chore, + chore->cho_status); + chore->cho_status = chore->cho_func(chore, chore->cho_status == DSS_CHORE_READY); + D_ASSERT(chore->cho_status != DSS_CHORE_NEW); + if (chore->cho_status == DSS_CHORE_READY) { + ABT_mutex_lock(queue->chq_mutex); + d_list_add_tail(&chore->cho_link, &queue->chq_list); + ABT_mutex_unlock(queue->chq_mutex); + } + } + + D_DEBUG(DB_TRACE, "end\n"); +} + +int +dss_chore_queue_init(struct dss_xstream *dx) +{ + struct dss_chore_queue *queue; + int rc; + + D_ALLOC_PTR(queue); + if (queue == NULL) { + rc = -DER_NOMEM; + goto err; + } + + D_INIT_LIST_HEAD(&queue->chq_list); + queue->chq_stop = false; + + rc = ABT_mutex_create(&queue->chq_mutex); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create chore queue mutex: %d\n", rc); + rc = dss_abterr2der(rc); + goto err_queue; + } + + rc = ABT_cond_create(&queue->chq_cond); + if (rc != ABT_SUCCESS) { + D_ERROR("failed to create chore queue condition variable: %d\n", rc); + rc = dss_abterr2der(rc); + goto err_mutex; + } + + rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC], + dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL, + &queue->chq_ult); + if (rc != 0) { + D_ERROR("failed to create chore queue ULT: %d\n", rc); + rc = dss_abterr2der(rc); + goto err_cond; + } + + dx->dx_chore_queue = queue; + return 0; + +err_cond: + ABT_cond_free(&queue->chq_cond); +err_mutex: + ABT_mutex_free(&queue->chq_mutex); +err_queue: + D_FREE(queue); +err: + return rc; +} + +void +dss_chore_queue_fini(struct dss_xstream *dx) +{ + dx->dx_chore_queue->chq_stop = true; + ABT_cond_broadcast(dx->dx_chore_queue->chq_cond); + ABT_thread_free(&dx->dx_chore_queue->chq_ult); + ABT_cond_free(&dx->dx_chore_queue->chq_cond); + ABT_mutex_free(&dx->dx_chore_queue->chq_mutex); + D_FREE(dx->dx_chore_queue); +} diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index ebee10755a3c..fa15ab8426d2 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -814,4 +814,37 @@ enum dss_drpc_call_flag { int dss_drpc_call(int32_t module, int32_t method, void *req, size_t req_size, unsigned int flags, Drpc__Response **resp); +/** Statuses of a chore */ +enum dss_chore_status { + DSS_CHORE_NEW, + DSS_CHORE_READY, /**< ready to be scheduled */ + DSS_CHORE_DONE /**< no more scheduling required */ +}; + +struct dss_chore; + +/** + * Must return either DSS_CHORE_READY (if yielding to other chores) or + * DSS_CHORE_DONE (if terminating). If \a is_reentrance is true, this is not + * the first time \a chore is scheduled. A typical implementation shall + * initialize its internal state variables if \a is_reentrance is false. See + * dtx_leader_exec_ops_chore for an example. + */ +typedef enum dss_chore_status (*dss_chore_func_t)(struct dss_chore *chore, bool is_reentrance); + +/** + * Chore (opaque) (e.g., an I/O forwarding task) + * + * This shall be embedded in the caller's own "super" object, which typically + * also includes arguments and internal state variables for \a cho_func. All + * fields are private. See dtx_chore for an example. + */ +struct dss_chore { + d_list_t cho_link; + enum dss_chore_status cho_status; + dss_chore_func_t cho_func; +}; + +int dss_chore_add(struct dss_chore *chore, dss_chore_func_t func); + #endif /* __DSS_API_H__ */