Skip to content

Commit

Permalink
DAOS-14903 container: refine task process in pmap_refresh_cb (#13491) (
Browse files Browse the repository at this point in the history
…#13682)

should register completion callback before task reinit, or the complete
cb possibly cannot be triggered.

and a few other backports:
6d4e549 - DAOS-14788 pool: Fix some reinit usages (#13518)
91b93c8 - DAOS-13252 tests: set svcn for multiple_failure test (#13619)
d30e842 - DAOS-14903 object: fix bug in peer status check (#13585)

Signed-off-by: Xuezhao Liu <[email protected]>
Signed-off-by: Li Wei <[email protected]>
  • Loading branch information
liuxuezhao authored Feb 2, 2024
1 parent 4c14427 commit 60bbb3a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 35 deletions.
17 changes: 9 additions & 8 deletions src/container/cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -634,14 +634,6 @@ pmap_refresh_cb(tse_task_t *task, void *data)
else
delay = 0;

rc = tse_task_reinit_with_delay(task, delay);
if (rc) {
D_ERROR(DF_UUID": pmap_refresh version (%d:%d), resched"
" failed, "DF_RC"\n", DP_UUID(pool->dp_pool),
pm_ver, cb_arg->pra_pm_ver, DP_RC(rc));
goto out;
}

rc = tse_task_register_comp_cb(task, pmap_refresh_cb, cb_arg,
sizeof(*cb_arg));
if (rc) {
Expand All @@ -656,6 +648,15 @@ pmap_refresh_cb(tse_task_t *task, void *data)
D_DEBUG(DB_TRACE, DF_UUID": pmap_refresh version (%d:%d), "
"in %d retry\n", DP_UUID(pool->dp_pool), pm_ver,
cb_arg->pra_pm_ver, cb_arg->pra_retry_nr);

rc = tse_task_reinit_with_delay(task, delay);
if (rc) {
D_ERROR(DF_UUID": pmap_refresh version (%d:%d), resched"
" failed, "DF_RC"\n", DP_UUID(pool->dp_pool),
pm_ver, cb_arg->pra_pm_ver, DP_RC(rc));
goto out;
}

return rc;
}
out:
Expand Down
9 changes: 5 additions & 4 deletions src/include/daos/tse.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,11 @@ tse_task_register_cbs(tse_task_t *task, tse_task_cb_t prep_cb,

/**
* Reinitialize a task and move it into the scheduler's initialize list. The
* task must have a body function to be reinserted into the scheduler. If the
* task is reintialzed in one of its completion CBs, that callback and the ones
* that have already executed will have been removed from the cb list and will
* need to be re-registered by the user after re-insertion.
* task must have a body function to be reinserted into the scheduler.
* Once the task being reinitialized, it possible be executed by other thread
* when it progresses the scheduler. So all accesses to the task must happen before
* the reinit call, for example task dependency/callback registration or task
* argument accessing.
*
* \param task [IN] Task to reinitialize
*
Expand Down
7 changes: 5 additions & 2 deletions src/object/obj_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -2607,7 +2607,12 @@ dc_tx_restart(tse_task_t *task)
/*
* Reinitialize task with a delay to implement the
* backoff and call dc_tx_restart_end below.
*
* We don't need to get an extra tx reference, because
* the reinitialized task must acquire tx->tx_lock
* first.
*/
tse_task_set_priv_internal(task, tx);
rc = tse_task_reinit_with_delay(task, backoff);
if (rc != 0) {
/* Skip the backoff. */
Expand All @@ -2616,8 +2621,6 @@ dc_tx_restart(tse_task_t *task)
goto out_tx_lock;
}
D_MUTEX_UNLOCK(&tx->tx_lock);
/* Pass our tx reference to task. */
tse_task_set_priv_internal(task, tx);
return 0;
}

Expand Down
33 changes: 16 additions & 17 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1300,9 +1300,9 @@ agg_peer_update_ult(void *arg)
iod.iod_size = entry->ae_rsize;
obj = obj_hdl2ptr(entry->ae_obj_hdl);
for (peer = 0; peer < p; peer++) {
/* Only update the available parities */
if (peer == pidx || entry->ae_peer_pshards[peer].sd_rank == DAOS_TGT_IGNORE)
if (peer == pidx)
continue;
D_ASSERT(entry->ae_peer_pshards[peer].sd_rank != DAOS_TGT_IGNORE);
tgt_ep.ep_rank = entry->ae_peer_pshards[peer].sd_rank;
tgt_ep.ep_tag = entry->ae_peer_pshards[peer].sd_tgt_idx;
rc = obj_req_create(dss_get_module_info()->dmi_ctx, &tgt_ep,
Expand Down Expand Up @@ -1429,6 +1429,7 @@ agg_peer_update(struct ec_agg_entry *entry, bool write_parity)
struct daos_shard_loc *peer_loc;
uint32_t failed_tgts_cnt = 0;
uint32_t p = ec_age2p(entry);
uint32_t pidx = ec_age2pidx(entry);
uint32_t peer;
int i, tid, rc = 0;

Expand All @@ -1450,24 +1451,19 @@ agg_peer_update(struct ec_agg_entry *entry, bool write_parity)
return rc;
}

rc = agg_get_obj_handle(entry);
if (rc) {
D_ERROR("Failed to open object: "DF_RC"\n", DP_RC(rc));
goto out;
}

if (targets != NULL) {
for (peer = 0; peer < p; peer++) {
if (peer == pidx)
continue;
peer_loc = &entry->ae_peer_pshards[peer];
for (i = 0; i < failed_tgts_cnt; i++) {
if (targets[i].ta_comp.co_rank == peer_loc->sd_rank ||
peer_loc->sd_rank == DAOS_TGT_IGNORE) {
D_DEBUG(DB_EPC, DF_UOID" peer parity "
"tgt gailed rank %d, tgt_idx "
"%d.\n", DP_UOID(entry->ae_oid),
peer_loc->sd_rank,
peer_loc->sd_tgt_idx);
goto out;
if (peer_loc->sd_rank == DAOS_TGT_IGNORE ||
(targets[i].ta_comp.co_rank == peer_loc->sd_rank &&
targets[i].ta_comp.co_index == peer_loc->sd_tgt_idx)) {
D_DEBUG(DB_EPC, DF_UOID" peer parity tgt failed rank %d, "
"tgt_idx %d.\n", DP_UOID(entry->ae_oid),
peer_loc->sd_rank, peer_loc->sd_tgt_idx);
D_GOTO(out, rc = -1);
}
}
}
Expand Down Expand Up @@ -1624,7 +1620,10 @@ agg_process_holes_ult(void *arg)
continue;

for (i = 0; targets && i < failed_tgts_cnt; i++) {
if (targets[i].ta_comp.co_rank == entry->ae_peer_pshards[peer].sd_rank) {
if (entry->ae_peer_pshards[peer].sd_rank == DAOS_TGT_IGNORE ||
(targets[i].ta_comp.co_rank == entry->ae_peer_pshards[peer].sd_rank &&
targets[i].ta_comp.co_index ==
entry->ae_peer_pshards[peer].sd_tgt_idx)) {
D_ERROR(DF_UOID" peer %d parity tgt failed\n",
DP_UOID(entry->ae_oid), peer);
rc = -1;
Expand Down
15 changes: 12 additions & 3 deletions src/pool/cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,9 @@ map_refresh_cb(tse_task_t *task, void *varg)
bool reinit = false;
int rc = task->dt_result;

/* Get an extra reference for the reinit case. */
dc_pool_get(pool);

/*
* If it turns out below that we do need to update the cached pool map,
* then holding the lock while doing so will be okay, since we probably
Expand Down Expand Up @@ -1816,6 +1819,7 @@ map_refresh_cb(tse_task_t *task, void *varg)
dc_pool_put(arg->mra_pool);
}

dc_pool_put(pool);
return rc;
}

Expand All @@ -1830,6 +1834,9 @@ map_refresh(tse_task_t *task)
struct map_refresh_cb_arg cb_arg;
int rc;

/* Get an extra reference for the reinit cases. */
dc_pool_get(pool);

if (arg->mra_passive) {
/*
* Passive pool map refresh tasks do nothing besides waiting
Expand Down Expand Up @@ -1894,7 +1901,7 @@ map_refresh(tse_task_t *task)
DP_UUID(pool->dp_pool), task, DP_RC(rc));
goto out_task;
}
goto out;
goto out_pool;
}

if (pool->dp_map_task == NULL) {
Expand Down Expand Up @@ -1942,7 +1949,7 @@ map_refresh(tse_task_t *task)
DP_UUID(pool->dp_pool), query_task, DP_RC(rc));
goto out_map_task;
}
goto out;
goto out_pool;
}

/*
Expand Down Expand Up @@ -1974,6 +1981,7 @@ map_refresh(tse_task_t *task)

D_DEBUG(DB_MD, DF_UUID": %p: asking rank %u for version > %u\n",
DP_UUID(pool->dp_pool), task, rank, version);
dc_pool_put(pool);
return daos_rpc_send(rpc, task);

out_cb_arg:
Expand All @@ -1987,7 +1995,8 @@ map_refresh(tse_task_t *task)
d_backoff_seq_fini(&arg->mra_backoff_seq);
dc_pool_put(arg->mra_pool);
tse_task_complete(task, rc);
out:
out_pool:
dc_pool_put(pool);
return rc;
}

Expand Down
2 changes: 1 addition & 1 deletion src/tests/ftest/erasurecode/multiple_failure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ server_config:
storage: auto
pool:
size: 93%
svcn: 1
svcn: 5
control_method: dmg
container:
type: POSIX
Expand Down

0 comments on commit 60bbb3a

Please sign in to comment.