Skip to content

Commit

Permalink
DAOS-14261 engine: Add dss_chore for I/O forwarding
Browse files Browse the repository at this point in the history
As requested by the Jira ticket, add a new I/O forwarding mechanism,
dss_chore, to avoid creating a ULT for every forwarding task.

  - Cancelation is not fully implemented, because the existing task
    themselves do not support cancelation yet.

  - In certain engine configurations, some xstreams do not need to
    initialize dx_chore_queue. This is left to future work.

Signed-off-by: Li Wei <[email protected]>
Required-githooks: true
  • Loading branch information
liw committed Nov 22, 2023
1 parent a39c469 commit 68dc394
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 39 deletions.
89 changes: 51 additions & 38 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1913,10 +1913,15 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti,
*/
#define DTX_EXEC_STEP_LENGTH DTX_THRESHOLD_COUNT

struct dtx_ult_arg {
struct dtx_chore {
struct dss_chore chore;
dtx_sub_func_t func;
void *func_arg;
struct dtx_leader_handle *dlh;

/* Chore-internal state variables */
uint32_t i;
uint32_t j;
};

static void
Expand Down Expand Up @@ -1969,28 +1974,38 @@ dtx_sub_comp_cb(struct dtx_leader_handle *dlh, int idx, int rc)
idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags, rc);
}

static void
dtx_leader_exec_ops_ult(void *arg)
static enum dss_chore_status
dtx_leader_exec_ops_chore(struct dss_chore *chore, bool is_reentrance)
{
struct dtx_ult_arg *ult_arg = arg;
struct dtx_leader_handle *dlh = ult_arg->dlh;
struct dtx_chore *dtx_chore = container_of(chore, struct dtx_chore, chore);
struct dtx_leader_handle *dlh = dtx_chore->dlh;
struct dtx_sub_status *sub;
struct daos_shard_tgt *tgt;
uint32_t i;
uint32_t j;
uint32_t k;
int rc = 0;

for (i = dlh->dlh_forward_idx, j = 0, k = 0; j < dlh->dlh_forward_cnt; i++, j++) {
sub = &dlh->dlh_subs[i];
/*
* If this is the first entrance, initialize the chore-internal state
* variables.
*/
if (!is_reentrance) {
D_DEBUG(DB_TRACE, "initialize: chore=%p\n", chore);
dtx_chore->i = dlh->dlh_forward_idx;
dtx_chore->j = 0;
} else {
D_DEBUG(DB_TRACE, "resume: chore=%p i=%u j=%u\n", chore, dtx_chore->i,
dtx_chore->j);
}

while (dtx_chore->j < dlh->dlh_forward_cnt) {
sub = &dlh->dlh_subs[dtx_chore->i];
tgt = &sub->dss_tgt;

if (dlh->dlh_normal_sub_done == 0) {
sub->dss_result = 0;
sub->dss_comp = 0;

if (unlikely(tgt->st_flags & DTF_DELAY_FORWARD)) {
dtx_sub_comp_cb(dlh, i, 0);
dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
continue;
}
} else {
Expand All @@ -2002,33 +2017,38 @@ dtx_leader_exec_ops_ult(void *arg)
}

if (tgt->st_rank == DAOS_TGT_IGNORE ||
(i == daos_fail_value_get() && DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) {
(dtx_chore->i == daos_fail_value_get() &&
DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) {
if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD)
dtx_sub_comp_cb(dlh, i, 0);
dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
continue;
}

rc = ult_arg->func(dlh, ult_arg->func_arg, i, dtx_sub_comp_cb);
rc = dtx_chore->func(dlh, dtx_chore->func_arg, dtx_chore->i, dtx_sub_comp_cb);
if (rc != 0) {
if (sub->dss_comp == 0)
dtx_sub_comp_cb(dlh, i, rc);
dtx_sub_comp_cb(dlh, dtx_chore->i, rc);
break;
}

dtx_chore->i++;
dtx_chore->j++;

/* Yield to avoid holding CPU for too long time. */
if ((++k) % DTX_RPC_YIELD_THD == 0)
ABT_thread_yield();
if (dtx_chore->j % DTX_RPC_YIELD_THD == 0)
return DSS_CHORE_READY;
}

if (rc != 0) {
for (i++, j++; j < dlh->dlh_forward_cnt; i++, j++) {
sub = &dlh->dlh_subs[i];
for (dtx_chore->i++, dtx_chore->j++; dtx_chore->j < dlh->dlh_forward_cnt;
dtx_chore->i++, dtx_chore->j++) {
sub = &dlh->dlh_subs[dtx_chore->i];
tgt = &sub->dss_tgt;

if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) {
sub->dss_result = 0;
sub->dss_comp = 0;
dtx_sub_comp_cb(dlh, i, 0);
dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
}
}
}
Expand All @@ -2038,6 +2058,8 @@ dtx_leader_exec_ops_ult(void *arg)
D_ASSERTF(rc == ABT_SUCCESS, "ABT_future_set failed [%u, %u), for delay %s: %d\n",
dlh->dlh_forward_idx, dlh->dlh_forward_idx + dlh->dlh_forward_cnt,
dlh->dlh_normal_sub_done == 1 ? "yes" : "no", rc);

return DSS_CHORE_DONE;
}

/**
Expand All @@ -2047,15 +2069,15 @@ int
dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg)
{
struct dtx_ult_arg ult_arg;
struct dtx_chore dtx_chore;
int sub_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;
int rc = 0;
int local_rc = 0;
int remote_rc = 0;

ult_arg.func = func;
ult_arg.func_arg = func_arg;
ult_arg.dlh = dlh;
dtx_chore.func = func;
dtx_chore.func_arg = func_arg;
dtx_chore.dlh = dlh;

dlh->dlh_result = 0;
dlh->dlh_allow_failure = allow_failure;
Expand Down Expand Up @@ -2091,17 +2113,10 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
D_GOTO(out, rc = dss_abterr2der(rc));
}

/*
* NOTE: Ideally, we probably should create ULT for each shard, but for performance
* reasons, let's only create one for all remote targets for now. Moreover,
* we assume that func does not require deep stacks to forward the remote
* requests (dtx_leader_exec_ops_ult does not execute the local part of func).
*/
rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW,
dss_get_module_info()->dmi_tgt_id, 0, NULL);
rc = dss_chore_add(&dtx_chore.chore, dtx_leader_exec_ops_chore);
if (rc != 0) {
D_ERROR("ult create failed [%u, %u] (2): "DF_RC"\n",
dlh->dlh_forward_idx, dlh->dlh_forward_cnt, DP_RC(rc));
DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx,
dlh->dlh_forward_cnt);
ABT_future_free(&dlh->dlh_future);
goto out;
}
Expand Down Expand Up @@ -2169,11 +2184,9 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
/* The ones without DELAY flag will be skipped when scan the targets array. */
dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;

/* See also the dss_ult_create above. */
rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW,
dss_get_module_info()->dmi_tgt_id, 0, NULL);
rc = dss_chore_add(&dtx_chore.chore, dtx_leader_exec_ops_chore);
if (rc != 0) {
D_ERROR("ult create failed (4): "DF_RC"\n", DP_RC(rc));
DL_ERROR(rc, "chore create failed (4)");
ABT_future_free(&dlh->dlh_future);
goto out;
}
Expand Down
10 changes: 9 additions & 1 deletion src/engine/srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ dss_srv_handler(void *arg)
goto crt_destroy;
}

rc = dss_chore_queue_init(dx);
if (rc != 0) {
DL_ERROR(rc, "failed to initialize chore queue");
goto tse_fini;
}

if (dss_xstream_has_nvme(dx)) {
ABT_thread_attr attr;

Expand All @@ -473,7 +479,7 @@ dss_srv_handler(void *arg)
if (rc != 0) {
D_ERROR("failed to init spdk context for xstream(%d) "
"rc:%d\n", dmi->dmi_xs_id, rc);
D_GOTO(tse_fini, rc);
D_GOTO(chore_queue_fini, rc);
}

rc = ABT_thread_attr_create(&attr);
Expand Down Expand Up @@ -555,6 +561,8 @@ dss_srv_handler(void *arg)
nvme_fini:
if (dss_xstream_has_nvme(dx))
bio_xsctxt_free(dmi->dmi_nvme_ctxt);
chore_queue_fini:
dss_chore_queue_fini(dx);
tse_fini:
tse_sched_fini(&dx->dx_sched_dsc);
crt_destroy:
Expand Down
4 changes: 4 additions & 0 deletions src/engine/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ struct dss_xstream {
#endif
bool dx_progress_started; /* Network poll started */
int dx_tag; /** tag for xstream */
struct dss_chore_queue *dx_chore_queue;
};

/** Engine module's metrics */
Expand Down Expand Up @@ -367,4 +368,7 @@ dss_xstream_has_nvme(struct dss_xstream *dx)
return false;
}

int dss_chore_queue_init(struct dss_xstream *dx);
void dss_chore_queue_fini(struct dss_xstream *dx);

#endif /* __DAOS_SRV_INTERNAL__ */
152 changes: 152 additions & 0 deletions src/engine/ult.c
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,155 @@ dss_main_exec(void (*func)(void *), void *arg)

return dss_ult_create(func, arg, DSS_XS_SELF, info->dmi_tgt_id, 0, NULL);
}

struct dss_chore_queue {
d_list_t chq_list;
bool chq_stop;
ABT_mutex chq_mutex;
ABT_cond chq_cond;
ABT_thread chq_ult;
};

/**
* Add a chore for \a func.
*
* \param[in] chore embedded chore object
* \param[in] func function to be executed via \a chore
*
* \retval -DER_OP_CANCEL chore queue stopping
*/
int
dss_chore_add(struct dss_chore *chore, dss_chore_func_t func)
{
int xs_id;
struct dss_xstream *dx;
struct dss_chore_queue *queue;

chore->cho_status = DSS_CHORE_NEW;
chore->cho_func = func;

/* Find our chore queue. */
xs_id = sched_ult2xs(DSS_XS_IOFW, dss_get_module_info()->dmi_tgt_id);
D_ASSERT(xs_id != -DER_INVAL);
dx = dss_get_xstream(xs_id);
D_ASSERT(dx != NULL);
queue = dx->dx_chore_queue;
D_ASSERT(queue != NULL);
D_DEBUG(DB_TRACE, "chore=%p: tgt_id=%d -> xs_id=%d dx.tgt_id=%d\n", chore,
dss_get_module_info()->dmi_tgt_id, xs_id, dx->dx_tgt_id);

ABT_mutex_lock(queue->chq_mutex);
if (queue->chq_stop) {
ABT_mutex_unlock(queue->chq_mutex);
return -DER_OP_CANCELED;
}
d_list_add_tail(&chore->cho_link, &queue->chq_list);
ABT_cond_broadcast(queue->chq_cond);
ABT_mutex_unlock(queue->chq_mutex);

return 0;
}

static void
dss_chore_queue_ult(void *arg)
{
struct dss_chore_queue *queue = arg;

D_ASSERT(queue != NULL);
D_DEBUG(DB_TRACE, "begin\n");

for (;;) {
struct dss_chore *chore = NULL;
bool stop = false;

ABT_mutex_lock(queue->chq_mutex);
for (;;) {
chore = d_list_pop_entry(&queue->chq_list, struct dss_chore, cho_link);
if (chore != NULL)
break;
if (queue->chq_stop) {
stop = true;
break;
}
ABT_cond_wait(queue->chq_cond, queue->chq_mutex);
}
ABT_mutex_unlock(queue->chq_mutex);

if (stop)
break;

D_DEBUG(DB_TRACE, "scheduling: chore=%p chore.status=%d\n", chore,
chore->cho_status);
chore->cho_status = chore->cho_func(chore, chore->cho_status == DSS_CHORE_READY);
D_ASSERT(chore->cho_status != DSS_CHORE_NEW);
if (chore->cho_status == DSS_CHORE_READY) {
ABT_mutex_lock(queue->chq_mutex);
d_list_add_tail(&chore->cho_link, &queue->chq_list);
ABT_mutex_unlock(queue->chq_mutex);
}
}

D_DEBUG(DB_TRACE, "end\n");
}

int
dss_chore_queue_init(struct dss_xstream *dx)
{
struct dss_chore_queue *queue;
int rc;

D_ALLOC_PTR(queue);
if (queue == NULL) {
rc = -DER_NOMEM;
goto err;
}

D_INIT_LIST_HEAD(&queue->chq_list);
queue->chq_stop = false;

rc = ABT_mutex_create(&queue->chq_mutex);
if (rc != ABT_SUCCESS) {
D_ERROR("failed to create chore queue mutex: %d\n", rc);
rc = dss_abterr2der(rc);
goto err_queue;
}

rc = ABT_cond_create(&queue->chq_cond);
if (rc != ABT_SUCCESS) {
D_ERROR("failed to create chore queue condition variable: %d\n", rc);
rc = dss_abterr2der(rc);
goto err_mutex;
}

rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC],
dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL,
&queue->chq_ult);
if (rc != 0) {
D_ERROR("failed to create chore queue ULT: %d\n", rc);
rc = dss_abterr2der(rc);
goto err_cond;
}

dx->dx_chore_queue = queue;
return 0;

err_cond:
ABT_cond_free(&queue->chq_cond);
err_mutex:
ABT_mutex_free(&queue->chq_mutex);
err_queue:
D_FREE(queue);
err:
return rc;
}

void
dss_chore_queue_fini(struct dss_xstream *dx)
{
dx->dx_chore_queue->chq_stop = true;
ABT_cond_broadcast(dx->dx_chore_queue->chq_cond);
ABT_thread_free(&dx->dx_chore_queue->chq_ult);
ABT_cond_free(&dx->dx_chore_queue->chq_cond);
ABT_mutex_free(&dx->dx_chore_queue->chq_mutex);
D_FREE(dx->dx_chore_queue);
}
Loading

0 comments on commit 68dc394

Please sign in to comment.