Skip to content

Commit

Permalink
DAOS-14467 chk: properly stop check scheduler (#13181)
Browse files Browse the repository at this point in the history
When someone wants to stop current check instance, it needs to set
ins->ci_sched_exiting to notify related instance scheduler to exit.

Originally, we used "ci_sched_running" for such purpose. But it is
confused to distinguish whether the scheduler has already exited or
someone is stopping the instance. The others may misunderstand that
related check scheduler has already exited, but the scheduler is in
stopping process, as to subsequent checker restart will get failure.

Some code cleanup.

Signed-off-by: Fan Yong <[email protected]>
  • Loading branch information
Nasf-Fan authored Nov 23, 2023
1 parent 467b191 commit ca86895
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 52 deletions.
7 changes: 6 additions & 1 deletion src/chk/chk_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,14 @@ chk_pool_wait(struct chk_pool_rec *cpr)
struct chk_pending_rec *tmp;

D_ASSERT(cpr->cpr_refs > 0);
/*
* The caller chk_pool_stop_one() must firstly delete the cpr from the pool
* tree, then stop it here. So chk_pool_wait() will not be called repeatedly.
*/
D_ASSERT(cpr->cpr_stop == 0);

ABT_mutex_lock(cpr->cpr_mutex);
if (cpr->cpr_thread != ABT_THREAD_NULL && !cpr->cpr_stop) {
if (cpr->cpr_thread != ABT_THREAD_NULL) {
cpr->cpr_stop = 1;
ABT_cond_broadcast(cpr->cpr_cond);
ABT_mutex_unlock(cpr->cpr_mutex);
Expand Down
24 changes: 18 additions & 6 deletions src/chk/chk_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -1858,11 +1858,11 @@ chk_engine_sched(void *args)
D_INFO(DF_ENGINE" scheduler on rank %u entry at phase %u\n",
DP_ENGINE(ins), myrank, cbk->cb_phase);

while (ins->ci_sched_running) {
while (!ins->ci_sched_exiting) {
dss_sleep(300);

/* Someone wants to stop the check. */
if (!ins->ci_sched_running)
if (ins->ci_sched_exiting)
D_GOTO(out, rc = 0);

ins_phase = chk_pools_find_slowest(ins, &done);
Expand Down Expand Up @@ -2293,9 +2293,11 @@ chk_engine_stop(uint64_t gen, int pool_nr, uuid_t pools[], uint32_t *flags)
{
struct chk_instance *ins = chk_engine;
struct chk_bookmark *cbk = &ins->ci_bk;
struct chk_pool_rec *cpr;
d_rank_t myrank = dss_self_rank();
int rc = 0;
int i;
int active = false;

if (gen != 0 && gen != cbk->cb_gen)
D_GOTO(log, rc = -DER_NOTAPPLICABLE);
Expand All @@ -2306,7 +2308,7 @@ chk_engine_stop(uint64_t gen, int pool_nr, uuid_t pools[], uint32_t *flags)
if (ins->ci_starting)
D_GOTO(log, rc = -DER_BUSY);

if (ins->ci_stopping)
if (ins->ci_stopping || ins->ci_sched_exiting)
D_GOTO(log, rc = -DER_INPROGRESS);

if (cbk->cb_ins_status != CHK__CHECK_INST_STATUS__CIS_RUNNING)
Expand All @@ -2332,7 +2334,17 @@ chk_engine_stop(uint64_t gen, int pool_nr, uuid_t pools[], uint32_t *flags)
if (ins->ci_pool_stopped)
*flags = CSF_POOL_STOPPED;

if (d_list_empty(&ins->ci_pool_list)) {
d_list_for_each_entry(cpr, &ins->ci_pool_list, cpr_link) {
if (!cpr->cpr_done && !cpr->cpr_skip && !cpr->cpr_stop) {
D_ASSERTF(pool_nr != 0, "Hit active pool "DF_UUIDF" after stop all\n",
DP_UUID(cpr->cpr_uuid));

active = true;
break;
}
}

if (!active) {
chk_stop_sched(ins);
/* To indicate that there is no active pool(s) on this rank. */
rc = 1;
Expand Down Expand Up @@ -2563,7 +2575,7 @@ chk_engine_mark_rank_dead(uint64_t gen, d_rank_t rank, uint32_t version)
* check instance; otherwise, related pool(s) will be marked as 'failed' when
* try ro access something on the dead rank.
*
* So here, it is not ncessary to find out the affected pools and fail them
* So here, it is not necessary to find out the affected pools and fail them
* immediately when the death event is reported, instead, it will be handled
* sometime later as the DAOS check going.
*/
Expand Down Expand Up @@ -3164,7 +3176,7 @@ chk_engine_report(struct chk_report_unit *cru, uint64_t *seq, int *decision)
goto out;
}

if (!ins->ci_sched_running || cpr->cpr_exiting) {
if (!ins->ci_sched_running || ins->ci_sched_exiting || cpr->cpr_exiting) {
rc = 1;
ABT_mutex_unlock(cpr->cpr_mutex);
goto out;
Expand Down
38 changes: 23 additions & 15 deletions src/chk/chk_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ CRT_RPC_DECLARE(chk_query, DAOS_ISEQ_CHK_QUERY, DAOS_OSEQ_CHK_QUERY);
/*
* CHK_MARK:
* From check leader to check engine to mark some rank as "dead". Under check mode, if some rank
* is dead (and failed to rejoin), it will not be exlcuded from related pool map to avoid further
* is dead (and failed to rejoin), it will not be excluded from related pool map to avoid further
* damaging the system, instead, it will be mark as "dead" by the check instance and the check
* status on related pool(s) will be marked as "failed".
*/
Expand Down Expand Up @@ -998,6 +998,9 @@ static inline void
chk_pool_get(struct chk_pool_rec *cpr)
{
cpr->cpr_refs++;

D_DEBUG(DB_TRACE, "Get ref on pool rec %p for "DF_UUIDF", ref %d\n",
cpr, DP_UUID(cpr->cpr_uuid), cpr->cpr_refs);
}

static inline void
Expand All @@ -1009,6 +1012,9 @@ chk_pool_put(struct chk_pool_rec *cpr)
/* NOTE: Before being destroyed, keep it in the list. */
D_ASSERT(!d_list_empty(&cpr->cpr_link));

D_DEBUG(DB_TRACE, "Pet ref on pool rec %p for "DF_UUIDF", ref %d\n",
cpr, DP_UUID(cpr->cpr_uuid), cpr->cpr_refs);

if (--(cpr->cpr_refs) == 0) {
d_list_del(&cpr->cpr_link);
D_ASSERT(cpr->cpr_thread == ABT_THREAD_NULL);
Expand All @@ -1035,6 +1041,9 @@ chk_pool_put(struct chk_pool_rec *cpr)
D_FREE(cpr->cpr_mbs[i].cpm_tgt_status);
}

D_DEBUG(DB_TRACE, "Destroy pool rec %p for "DF_UUIDF"\n",
cpr, DP_UUID(cpr->cpr_uuid));

D_FREE(cpr->cpr_mbs);
D_FREE(cpr->cpr_label);
D_FREE(cpr);
Expand Down Expand Up @@ -1150,43 +1159,42 @@ chk_dup_string(char **tgt, const char *src, size_t len)
static inline void
chk_stop_sched(struct chk_instance *ins)
{
uint64_t gen = ins->ci_bk.cb_gen;

ABT_mutex_lock(ins->ci_abt_mutex);
if (ins->ci_sched != ABT_THREAD_NULL && ins->ci_sched_running) {
ins->ci_sched_running = 0;
if (ins->ci_sched_running && !ins->ci_sched_exiting) {
D_ASSERT(ins->ci_sched != ABT_THREAD_NULL);

D_INFO("Stopping %s instance on rank %u with gen "DF_U64"\n",
ins->ci_is_leader ? "leader" : "engine", dss_self_rank(), gen);

ins->ci_sched_exiting = 1;
ABT_cond_broadcast(ins->ci_abt_cond);
ABT_mutex_unlock(ins->ci_abt_mutex);
ABT_thread_free(&ins->ci_sched);
} else {
ABT_mutex_unlock(ins->ci_abt_mutex);
/* Check ci_bk.cb_gen for the case of others restarted checker during my wait. */
while (ins->ci_sched_running && gen == ins->ci_bk.cb_gen)
dss_sleep(300);
}
}

static inline int
chk_ins_can_start(struct chk_instance *ins)
{
struct chk_bookmark *cbk = &ins->ci_bk;

if (unlikely(!ins->ci_inited))
return -DER_AGAIN;

if (ins->ci_starting)
return -DER_INPROGRESS;

if (ins->ci_stopping)
if (ins->ci_stopping || ins->ci_sched_exiting)
return -DER_BUSY;

if (ins->ci_sched_running)
return -DER_ALREADY;

/*
* If ci_sched_running is zero but check instance is still running,
* then someone is trying to stop it.
*/
if (((ins->ci_is_leader && cbk->cb_magic == CHK_BK_MAGIC_LEADER) ||
(!ins->ci_is_leader && cbk->cb_magic == CHK_BK_MAGIC_ENGINE)) &&
cbk->cb_ins_status == CHK__CHECK_INST_STATUS__CIS_RUNNING)
return -DER_BUSY;

return 0;
}

Expand Down
10 changes: 5 additions & 5 deletions src/chk/chk_leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ chk_leader_need_stop(struct chk_instance *ins, int *ret)
}
}

if (!ins->ci_sched_running) {
if (!ins->ci_sched_running || ins->ci_sched_exiting) {
*ret = 0;
return true;
}
Expand Down Expand Up @@ -1931,7 +1931,7 @@ chk_leader_pool_mbs_one(struct chk_pool_rec *cpr)
if (rc1 == RSVC_CLIENT_RECHOOSE ||
(rc1 == RSVC_CLIENT_PROCEED && daos_rpc_retryable_rc(rc))) {
dss_sleep(interval);
if (cpr->cpr_stop || !ins->ci_sched_running) {
if (cpr->cpr_stop || !ins->ci_sched_running || ins->ci_sched_exiting) {
notify = false;
D_GOTO(out_client, rc = 0);
}
Expand Down Expand Up @@ -2165,7 +2165,7 @@ chk_leader_sched(void *args)
ABT_mutex_lock(ins->ci_abt_mutex);

again:
if (!ins->ci_sched_running) {
if (ins->ci_sched_exiting) {
ABT_mutex_unlock(ins->ci_abt_mutex);
D_GOTO(out, rc = 0);
}
Expand Down Expand Up @@ -3039,7 +3039,7 @@ chk_leader_stop(int pool_nr, uuid_t pools[])
if (ins->ci_starting)
D_GOTO(log, rc = -DER_BUSY);

if (ins->ci_stopping)
if (ins->ci_stopping || ins->ci_sched_exiting)
D_GOTO(log, rc = -DER_INPROGRESS);

/*
Expand Down Expand Up @@ -3620,7 +3620,7 @@ chk_leader_report(struct chk_report_unit *cru, uint64_t *seq, int *decision)
goto out;
}

if (!ins->ci_sched_running || cpr->cpr_exiting) {
if (!ins->ci_sched_running || ins->ci_sched_exiting || cpr->cpr_exiting) {
rc = 1;
ABT_mutex_unlock(cpr->cpr_mutex);
goto out;
Expand Down
2 changes: 1 addition & 1 deletion src/include/daos_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ static inline bool
daos_is_valid_uuid_string(const char *uuid)
{
const char *p;
int len = DAOS_UUID_STR_SIZE - 1; /* Not include the ternimated '\0' */
int len = DAOS_UUID_STR_SIZE - 1; /* Not include the terminated '\0' */
int i;

if (strnlen(uuid, len) != len)
Expand Down
2 changes: 1 addition & 1 deletion src/rdb/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ rdb_get_use_leases(void)
* \a clue->bcl_replicas with d_rank_list_free.
*
* \param[in] storage database storage
* \parma[out] clue database clue
* \param[out] clue database clue
*/
int
rdb_glance(struct rdb_storage *storage, struct rdb_clue *clue)
Expand Down
2 changes: 1 addition & 1 deletion src/tests/ftest/daos_test/suite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ timeout: 600
timeouts:
test_daos_degraded_mode: 450
test_daos_management: 110
test_daos_cat_recovery: 1800
test_daos_cat_recovery: 3000
test_daos_pool: 180
test_daos_container: 450
test_daos_epoch: 125
Expand Down
33 changes: 11 additions & 22 deletions src/tests/ftest/util/dmg_utils_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ class ConfigSubCommand(CommandWithSubCommand):

def __init__(self):
"""Create a dmg config subcommand object."""
super(DmgCommandBase.ConfigSubCommand, self).__init__(
"run/dmg/config/*", "config")
super(DmgCommandBase.ConfigSubCommand, self).__init__("run/dmg/config/*", "config")

def get_sub_command_class(self):
# pylint: disable=redefined-variable-type
Expand All @@ -234,10 +233,8 @@ def __init__(self):
"""Create a dmg config generate object."""
super(
DmgCommandBase.ConfigSubCommand.GenerateSubCommand,
self).__init__(
"/run/dmg/config/generate/*", "generate")
self.access_points = FormattedParameter(
"--access-points={}", None)
self).__init__("/run/dmg/config/generate/*", "generate")
self.access_points = FormattedParameter("--access-points={}", None)
self.num_engines = FormattedParameter("--num-engines={}", None)
self.scm_only = FormattedParameter("--scm-only", False)
self.net_class = FormattedParameter("--net-class={}", None)
Expand Down Expand Up @@ -277,8 +274,7 @@ class FaultsSubCommand(CommandWithSubCommand):

def __init__(self):
"""Create a dmg faults subcommand object."""
super(DmgCommandBase.FaultsSubCommand, self).__init__(
"run/dmg/faults/*", "faults")
super(DmgCommandBase.FaultsSubCommand, self).__init__("run/dmg/faults/*", "faults")

def get_sub_command_class(self):
# pylint: disable=redefined-variable-type
Expand All @@ -299,8 +295,7 @@ def __init__(self):
"""Create a dmg faults add-checker-report object."""
super(
DmgCommandBase.FaultsSubCommand.AddCheckerReportSubCommand,
self).__init__(
"/run/dmg/faults/add-checker-report/*", "add-checker-report")
self).__init__("/run/dmg/faults/add-checker-report/*", "add-checker-report")
self.file = FormattedParameter("--file={}", None)
self.checker_report_class = FormattedParameter("--class={}", None)

Expand Down Expand Up @@ -525,8 +520,7 @@ class OverwriteAclSubCommand(CommandWithParameters):

def __init__(self):
"""Create a dmg pool overwrite-acl command object."""
super().__init__(
"/run/dmg/pool/overwrite-acl/*", "overwrite-acl")
super().__init__("/run/dmg/pool/overwrite-acl/*", "overwrite-acl")
self.pool = BasicParameter(None, position=1)
self.acl_file = FormattedParameter("-a {}", None)

Expand Down Expand Up @@ -880,8 +874,7 @@ class EraseSubCommand(CommandWithParameters):

def __init__(self):
"""Create a dmg system erase command object."""
super().__init__(
"/run/dmg/system/erase/*", "erase")
super().__init__("/run/dmg/system/erase/*", "erase")

class ExcludeSubCommand(CommandWithParameters):
"""Defines an object for the dmg system exclude command."""
Expand All @@ -897,8 +890,7 @@ class LeaderQuerySubCommand(CommandWithParameters):

def __init__(self):
"""Create a dmg system leader-query command object."""
super().__init__(
"/run/dmg/system/leader-query/*", "leader-query")
super().__init__("/run/dmg/system/leader-query/*", "leader-query")

class ListPoolsSubCommand(CommandWithParameters):
"""Defines an object for the dmg system list-pools command."""
Expand Down Expand Up @@ -971,8 +963,7 @@ class ListSubCommand(CommandWithParameters):

def __init__(self):
"""Create a dmg telemetry metrics list object."""
super().__init__(
"/run/dmg/telemetry/metrics/list/*", "list")
super().__init__("/run/dmg/telemetry/metrics/list/*", "list")
self.host = FormattedParameter("--host-list={}", None)
self.port = FormattedParameter("--port={}", None)

Expand All @@ -981,8 +972,7 @@ class QuerySubCommand(CommandWithParameters):

def __init__(self):
"""Create a dmg telemetry metrics query object."""
super().__init__(
"/run/dmg/telemetry/metrics/query/*", "query")
super().__init__("/run/dmg/telemetry/metrics/query/*", "query")
self.host = FormattedParameter("--host-list={}", None)
self.port = FormattedParameter("--port={}", None)
self.metrics = FormattedParameter("--metrics={}", None)
Expand All @@ -992,8 +982,7 @@ class VersionSubCommand(CommandWithSubCommand):

def __init__(self):
"""Create a dmg version subcommand object."""
super(DmgCommandBase.VersionSubCommand, self).__init__(
"/run/dmg/version/*", "version")
super(DmgCommandBase.VersionSubCommand, self).__init__("/run/dmg/version/*", "version")

def _get_new(self):
"""Get a new object based upon this one.
Expand Down

0 comments on commit ca86895

Please sign in to comment.