diff --git a/src/control/system/raft/database.go b/src/control/system/raft/database.go index 363414443b7..788f675509c 100644 --- a/src/control/system/raft/database.go +++ b/src/control/system/raft/database.go @@ -944,11 +944,27 @@ func (db *Database) UpdatePoolService(ps *system.PoolService) error { db.Lock() defer db.Unlock() - _, err := db.FindPoolServiceByUUID(ps.PoolUUID) + p, err := db.FindPoolServiceByUUID(ps.PoolUUID) if err != nil { return errors.Wrapf(err, "failed to retrieve pool %s", ps.PoolUUID) } + // This is a workaround before we can handle the following race + // properly. + // + // mgmtSvc.PoolCreate() Database.handlePoolRepsUpdate() + // Write ps: Creating + // Read ps: Creating + // Write ps: Ready + // Write ps: Creating + // + // The pool remains in Creating state after PoolCreate completes, + // leading to DER_AGAINs during PoolDestroy. + if p.State == system.PoolServiceStateReady && ps.State == system.PoolServiceStateCreating { + db.log.Debugf("ignoring invalid pool service update: %+v -> %+v", p, ps) + return nil + } + if err := db.submitPoolUpdate(raftOpUpdatePoolService, ps); err != nil { return err } diff --git a/src/engine/drpc_ras.c b/src/engine/drpc_ras.c index 30e29ed3c51..89fea9b0d2b 100644 --- a/src/engine/drpc_ras.c +++ b/src/engine/drpc_ras.c @@ -1,5 +1,5 @@ /* - * (C) Copyright 2021 Intel Corporation. + * (C) Copyright 2021-2022 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -183,7 +183,7 @@ log_event(Shared__RASEvent *evt) out: fclose(stream); D_INFO("&&& RAS EVENT%s\n", buf); - D_FREE(buf); + free(buf); } static int @@ -300,7 +300,7 @@ ds_notify_ras_eventf(ras_event_t id, ras_type_t type, ras_sev_t sev, char *hwid, } int -ds_notify_pool_svc_update(uuid_t *pool, d_rank_list_t *svcl) +ds_notify_pool_svc_update(uuid_t *pool, d_rank_list_t *svcl, uint64_t version) { Shared__RASEvent evt = SHARED__RASEVENT__INIT; Shared__RASEvent__PoolSvcEventInfo info = \ @@ -323,6 +323,8 @@ ds_notify_pool_svc_update(uuid_t *pool, d_rank_list_t *svcl) return rc; } + info.version = version; + evt.extended_info_case = SHARED__RASEVENT__EXTENDED_INFO_POOL_SVC_INFO; evt.pool_svc_info = &info; diff --git a/src/engine/tests/drpc_client_tests.c b/src/engine/tests/drpc_client_tests.c index b7c5d70706f..69b83329e05 100644 --- a/src/engine/tests/drpc_client_tests.c +++ b/src/engine/tests/drpc_client_tests.c @@ -319,7 +319,7 @@ test_drpc_verify_notify_pool_svc_update(void **state) svc_ranks = uint32_array_to_rank_list(svc_reps, 4); assert_non_null(svc_ranks); - assert_rc_equal(ds_notify_pool_svc_update(&pool_uuid, svc_ranks), 0); + assert_rc_equal(ds_notify_pool_svc_update(&pool_uuid, svc_ranks, 1), 0); verify_notify_pool_svc_update(&pool_uuid, svc_ranks); d_rank_list_free(svc_ranks); @@ -338,7 +338,7 @@ test_drpc_verify_notify_pool_svc_update_noreps(void **state) assert_int_equal(uuid_parse("11111111-1111-1111-1111-111111111111", pool_uuid), 0); - assert_rc_equal(ds_notify_pool_svc_update(&pool_uuid, NULL), + assert_rc_equal(ds_notify_pool_svc_update(&pool_uuid, NULL, 1), -DER_INVAL); assert_int_equal(sendmsg_call_count, 0); @@ -357,7 +357,7 @@ test_drpc_verify_notify_pool_svc_update_nopool(void **state) svc_ranks = uint32_array_to_rank_list(svc_reps, 4); assert_non_null(svc_ranks); - assert_rc_equal(ds_notify_pool_svc_update(NULL, svc_ranks), + assert_rc_equal(ds_notify_pool_svc_update(NULL, svc_ranks, 1), -DER_INVAL); assert_int_equal(sendmsg_call_count, 0); diff --git a/src/include/daos_srv/ras.h b/src/include/daos_srv/ras.h index 4acbd484086..52d940beee5 100644 --- a/src/include/daos_srv/ras.h +++ b/src/include/daos_srv/ras.h @@ -155,11 +155,12 @@ ds_notify_ras_eventf(ras_event_t id, ras_type_t type, ras_sev_t sev, char *hwid, * * \param[in] pool UUID of DAOS pool with updated service replicas. * \param[in] svcl New list of pool service replica ranks. + * \param[in] version Version of \a svcl. * * \retval Zero on success, non-zero otherwise. */ int -ds_notify_pool_svc_update(uuid_t *pool, d_rank_list_t *svcl); +ds_notify_pool_svc_update(uuid_t *pool, d_rank_list_t *svcl, uint64_t version); /** * Notify control plane that swim has detected a dead rank. diff --git a/src/include/daos_srv/rdb.h b/src/include/daos_srv/rdb.h index 1d01f3d3226..c8c3b8f9fac 100644 --- a/src/include/daos_srv/rdb.h +++ b/src/include/daos_srv/rdb.h @@ -116,10 +116,11 @@ struct rdb_storage; struct rdb_cbs; /** Database storage methods */ -int rdb_create(const char *path, const uuid_t uuid, size_t size, const d_rank_list_t *replicas, - struct rdb_cbs *cbs, void *arg, struct rdb_storage **storagep); -int rdb_open(const char *path, const uuid_t uuid, struct rdb_cbs *cbs, void *arg, - struct rdb_storage **storagep); +int rdb_create(const char *path, const uuid_t uuid, uint64_t caller_term, size_t size, + const d_rank_list_t *replicas, struct rdb_cbs *cbs, void *arg, + struct rdb_storage **storagep); +int rdb_open(const char *path, const uuid_t uuid, uint64_t caller_term, struct rdb_cbs *cbs, + void *arg, struct rdb_storage **storagep); void rdb_close(struct rdb_storage *storage); int rdb_destroy(const char *path, const uuid_t uuid); @@ -165,6 +166,7 @@ int rdb_get_leader(struct rdb *db, uint64_t *term, d_rank_t *rank); int rdb_get_ranks(struct rdb *db, d_rank_list_t **ranksp); int rdb_add_replicas(struct rdb *db, d_rank_list_t *replicas); int rdb_remove_replicas(struct rdb *db, d_rank_list_t *replicas); +int rdb_ping(struct rdb *db, uint64_t caller_term); /** * Path (opaque) diff --git a/src/include/daos_srv/rsvc.h b/src/include/daos_srv/rsvc.h index 48ed1ae591d..781ba04b6a3 100644 --- a/src/include/daos_srv/rsvc.h +++ b/src/include/daos_srv/rsvc.h @@ -1,5 +1,5 @@ /* - * (C) Copyright 2019-2021 Intel Corporation. + * (C) Copyright 2019-2022 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -93,6 +93,8 @@ enum ds_rsvc_state { DS_RSVC_DOWN /**< down */ }; +char *ds_rsvc_state_str(enum ds_rsvc_state state); + /** Replicated service */ struct ds_rsvc { d_list_t s_entry; /* in rsvc_hash */ @@ -120,27 +122,26 @@ int ds_rsvc_start_nodb(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid); int ds_rsvc_stop_nodb(enum ds_rsvc_class_id class, d_iov_t *id); -int ds_rsvc_start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, +int ds_rsvc_start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, uint64_t caller_term, bool create, size_t size, d_rank_list_t *replicas, void *arg); -int ds_rsvc_stop(enum ds_rsvc_class_id class, d_iov_t *id, bool destroy); +int ds_rsvc_stop(enum ds_rsvc_class_id class, d_iov_t *id, uint64_t caller_term, bool destroy); int ds_rsvc_stop_all(enum ds_rsvc_class_id class); int ds_rsvc_stop_leader(enum ds_rsvc_class_id class, d_iov_t *id, struct rsvc_hint *hint); -int ds_rsvc_dist_start(enum ds_rsvc_class_id class, d_iov_t *id, - const uuid_t dbid, const d_rank_list_t *ranks, - bool create, bool bootstrap, size_t size); -int ds_rsvc_dist_stop(enum ds_rsvc_class_id class, d_iov_t *id, - const d_rank_list_t *ranks, d_rank_list_t *excluded, - bool destroy); +int ds_rsvc_dist_start(enum ds_rsvc_class_id class, d_iov_t *id, const uuid_t dbid, + const d_rank_list_t *ranks, uint64_t caller_term, bool create, + bool bootstrap, size_t size); +int ds_rsvc_dist_stop(enum ds_rsvc_class_id class, d_iov_t *id, const d_rank_list_t *ranks, + d_rank_list_t *excluded, uint64_t caller_term, bool destroy); +enum ds_rsvc_state ds_rsvc_get_state(struct ds_rsvc *svc); +void ds_rsvc_set_state(struct ds_rsvc *svc, enum ds_rsvc_state state); int ds_rsvc_add_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, size_t size); int ds_rsvc_add_replicas(enum ds_rsvc_class_id class, d_iov_t *id, d_rank_list_t *ranks, size_t size, struct rsvc_hint *hint); -int ds_rsvc_remove_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, - bool stop); -int ds_rsvc_remove_replicas(enum ds_rsvc_class_id class, d_iov_t *id, - d_rank_list_t *ranks, bool stop, +int ds_rsvc_remove_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks); +int ds_rsvc_remove_replicas(enum ds_rsvc_class_id class, d_iov_t *id, d_rank_list_t *ranks, struct rsvc_hint *hint); int ds_rsvc_lookup(enum ds_rsvc_class_id class, d_iov_t *id, struct ds_rsvc **svc); diff --git a/src/mgmt/srv_pool.c b/src/mgmt/srv_pool.c index cb884690a93..d911541ec0c 100644 --- a/src/mgmt/srv_pool.c +++ b/src/mgmt/srv_pool.c @@ -232,7 +232,7 @@ ds_mgmt_create_pool(uuid_t pool_uuid, const char *group, char *tgt_dev, rc_cleanup = ds_mgmt_tgt_pool_destroy_ranks(pool_uuid, targets, true); if (rc_cleanup) D_ERROR(DF_UUID": failed to clean up failed pool: "DF_RC"\n", - DP_UUID(pool_uuid), DP_RC(rc)); + DP_UUID(pool_uuid), DP_RC(rc_cleanup)); } out: diff --git a/src/pool/srv_pool.c b/src/pool/srv_pool.c index dca22875e77..cfca4f9173f 100644 --- a/src/pool/srv_pool.c +++ b/src/pool/srv_pool.c @@ -61,6 +61,75 @@ struct pool_svc_events { bool pse_stop; }; +/* Pool service reconfiguration state */ +struct pool_svc_reconf { + int psc_svc_rf; + bool psc_force_notify; /* for pool_svc_step_up_cb */ + ABT_mutex psc_mutex; /* only for psc_cv */ + ABT_cond psc_cv; + bool psc_in_progress; + bool psc_canceled; +}; + +static int +reconf_init(struct pool_svc_reconf *reconf) +{ + int rc; + + rc = ABT_mutex_create(&reconf->psc_mutex); + if (rc != ABT_SUCCESS) { + return dss_abterr2der(rc); + } + + rc = ABT_cond_create(&reconf->psc_cv); + if (rc != ABT_SUCCESS) { + ABT_mutex_free(&reconf->psc_mutex); + return dss_abterr2der(rc); + } + + reconf->psc_svc_rf = -1; + reconf->psc_force_notify = false; + reconf->psc_in_progress = false; + reconf->psc_canceled = false; + return 0; +} + +static void +reconf_fini(struct pool_svc_reconf *reconf) +{ + ABT_cond_free(&reconf->psc_cv); + ABT_mutex_free(&reconf->psc_mutex); +} + +static void +reconf_begin(struct pool_svc_reconf *reconf) +{ + reconf->psc_in_progress = true; + reconf->psc_canceled = false; +} + +static void +reconf_end(struct pool_svc_reconf *reconf) +{ + reconf->psc_in_progress = false; + reconf->psc_canceled = false; +} + +static void +reconf_cancel_and_wait(struct pool_svc_reconf *reconf) +{ + /* + * The CV requires a mutex. We don't otherwise need it for ULTs within + * the same xstream. + */ + ABT_mutex_lock(reconf->psc_mutex); + if (reconf->psc_in_progress) + reconf->psc_canceled = true; + while (reconf->psc_in_progress) + ABT_cond_wait(reconf->psc_cv, reconf->psc_mutex); + ABT_mutex_unlock(reconf->psc_mutex); +} + /* Pool service */ struct pool_svc { struct ds_rsvc ps_rsvc; @@ -73,6 +142,7 @@ struct pool_svc { struct ds_pool *ps_pool; struct pool_svc_events ps_events; uint32_t ps_global_version; + struct pool_svc_reconf ps_reconf; }; /* Pool service failed to start */ @@ -625,7 +695,6 @@ select_svc_ranks(int svc_rf, const d_rank_list_t *target_addrs, int ndomains, const uint32_t *domains, d_rank_list_t **ranksp) { int nreplicas = ds_pool_svc_rf_to_nreplicas(svc_rf); - int i_rank_zero = -1; int selectable; d_rank_list_t *rnd_tgts; d_rank_list_t *ranks; @@ -645,15 +714,6 @@ select_svc_ranks(int svc_rf, const d_rank_list_t *target_addrs, int ndomains, /* Determine the number of selectable targets. */ selectable = rnd_tgts->rl_nr; - if (daos_rank_list_find((d_rank_list_t *)rnd_tgts, 0 /* rank */, - &i_rank_zero)) { - /* - * Unless it is the only target available, we don't select rank - * 0 for now to avoid losing orterun stdout. - */ - if (selectable > 1) - selectable -= 1 /* rank 0 */; - } if (nreplicas > selectable) nreplicas = selectable; @@ -666,9 +726,6 @@ select_svc_ranks(int svc_rf, const d_rank_list_t *target_addrs, int ndomains, for (i = 0; i < rnd_tgts->rl_nr; i++) { if (j == ranks->rl_nr) break; - if (i == i_rank_zero && selectable > 1) - /* This is rank 0 and it's not the only rank. */ - continue; D_DEBUG(DB_MD, "ranks[%d]: %u\n", j, rnd_tgts->rl_ranks[i]); ranks->rl_ranks[j] = rnd_tgts->rl_ranks[i]; j++; @@ -758,8 +815,8 @@ ds_pool_svc_dist_create(const uuid_t pool_uuid, int ntargets, const char *group, D_GOTO(out, rc); d_iov_set(&psid, (void *)pool_uuid, sizeof(uuid_t)); - rc = ds_rsvc_dist_start(DS_RSVC_CLASS_POOL, &psid, pool_uuid, ranks, true /* create */, - true /* bootstrap */, ds_rsvc_get_md_cap()); + rc = ds_rsvc_dist_start(DS_RSVC_CLASS_POOL, &psid, pool_uuid, ranks, RDB_NIL_TERM, + true /* create */, true /* bootstrap */, ds_rsvc_get_md_cap()); if (rc != 0) D_GOTO(out_ranks, rc); @@ -842,7 +899,7 @@ ds_pool_svc_stop(uuid_t pool_uuid) d_iov_set(&id, pool_uuid, sizeof(uuid_t)); - rc = ds_rsvc_stop(DS_RSVC_CLASS_POOL, &id, false /* destroy */); + rc = ds_rsvc_stop(DS_RSVC_CLASS_POOL, &id, RDB_NIL_TERM, false /* destroy */); if (rc == -DER_ALREADY) { D_DEBUG(DB_MD, DF_UUID": ds_rsvc_stop: "DF_RC"\n", DP_UUID(pool_uuid), DP_RC(rc)); rc = 0; @@ -944,14 +1001,20 @@ pool_svc_alloc_cb(d_iov_t *id, struct ds_rsvc **rsvc) goto err_events_mutex; } + rc = reconf_init(&svc->ps_reconf); + if (rc != 0) + goto err_events_cv; + rc = ds_cont_svc_init(&svc->ps_cont_svc, svc->ps_uuid, 0 /* id */, &svc->ps_rsvc); if (rc != 0) - goto err_events_cv; + goto err_reconf; *rsvc = &svc->ps_rsvc; return 0; +err_reconf: + reconf_fini(&svc->ps_reconf); err_events_cv: ABT_cond_free(&svc->ps_events.pse_cv); err_events_mutex: @@ -1116,6 +1179,12 @@ events_handler(void *arg) D_DEBUG(DB_MD, DF_UUID": stopping\n", DP_UUID(svc->ps_uuid)); } +static bool +events_pending(struct pool_svc *svc) +{ + return !d_list_empty(&svc->ps_events.pse_queue); +} + static void ds_pool_crt_event_cb(d_rank_t rank, uint64_t incarnation, enum crt_event_source src, enum crt_event_type type, void *arg) @@ -1202,6 +1271,7 @@ pool_svc_free_cb(struct ds_rsvc *rsvc) struct pool_svc *svc = pool_svc_obj(rsvc); ds_cont_svc_fini(&svc->ps_cont_svc); + reconf_fini(&svc->ps_reconf); ABT_cond_free(&svc->ps_events.pse_cv); ABT_mutex_free(&svc->ps_events.pse_mutex); rdb_path_fini(&svc->ps_user); @@ -1269,10 +1339,11 @@ static int read_db_for_stepping_up(struct pool_svc *svc, struct pool_buf **map_buf, uint32_t *map_version, daos_prop_t **prop) { - struct rdb_tx tx; - d_iov_t value; - bool version_exists = false; - int rc; + struct rdb_tx tx; + d_iov_t value; + bool version_exists = false; + struct daos_prop_entry *svc_rf_entry; + int rc; rc = rdb_tx_begin(svc->ps_rsvc.s_db, svc->ps_rsvc.s_term, &tx); if (rc != 0) @@ -1341,9 +1412,18 @@ read_db_for_stepping_up(struct pool_svc *svc, struct pool_buf **map_buf, D_DEBUG(DB_MD, DF_UUID": assuming 2.0\n", DP_UUID(svc->ps_uuid)); rc = pool_prop_read(&tx, svc, DAOS_PO_QUERY_PROP_ALL, prop); - if (rc != 0) + if (rc != 0) { D_ERROR(DF_UUID": cannot get properties: "DF_RC"\n", DP_UUID(svc->ps_uuid), DP_RC(rc)); + goto out_lock; + } + + svc_rf_entry = daos_prop_entry_get(*prop, DAOS_PROP_PO_SVC_REDUN_FAC); + D_ASSERT(svc_rf_entry != NULL); + if (daos_prop_is_set(svc_rf_entry)) + svc->ps_reconf.psc_svc_rf = svc_rf_entry->dpe_val; + else + svc->ps_reconf.psc_svc_rf = -1; out_lock: ABT_rwlock_unlock(svc->ps_lock); @@ -1442,6 +1522,9 @@ pool_svc_check_node_status(struct pool_svc *svc) D_PRINT(fmt, ## __VA_ARGS__); \ } while (0) +static void pool_svc_schedule_reconf(struct pool_svc *svc); +static void pool_svc_cancel_and_wait_reconf(struct pool_svc *svc); + static int pool_svc_step_up_cb(struct ds_rsvc *rsvc) { @@ -1453,6 +1536,7 @@ pool_svc_step_up_cb(struct ds_rsvc *rsvc) daos_prop_t *prop = NULL; bool cont_svc_up = false; bool events_initialized = false; + bool reconf_scheduled = false; d_rank_t rank = dss_self_rank(); int rc; @@ -1495,6 +1579,14 @@ pool_svc_step_up_cb(struct ds_rsvc *rsvc) goto out; events_initialized = true; + /* + * Just in case the previous leader didn't finish the last series of + * reconfigurations or the last MS notification. + */ + svc->ps_reconf.psc_force_notify = true; + pool_svc_schedule_reconf(svc); + reconf_scheduled = true; + rc = ds_pool_iv_prop_update(svc->ps_pool, prop); if (rc) { D_ERROR("ds_pool_iv_prop_update failed %d.\n", rc); @@ -1532,6 +1624,8 @@ pool_svc_step_up_cb(struct ds_rsvc *rsvc) if (rc != 0) { if (events_initialized) fini_events(svc); + if (reconf_scheduled) + pool_svc_cancel_and_wait_reconf(svc); if (cont_svc_up) ds_cont_svc_step_down(svc->ps_cont_svc); if (svc->ps_pool != NULL) @@ -1557,6 +1651,7 @@ pool_svc_step_down_cb(struct ds_rsvc *rsvc) ds_pool_iv_srv_hdl_invalidate(svc->ps_pool); fini_events(svc); + pool_svc_cancel_and_wait_reconf(svc); ds_cont_svc_step_down(svc->ps_cont_svc); fini_svc_pool(svc); @@ -1790,7 +1885,7 @@ start_one(uuid_t uuid, void *varg) } d_iov_set(&id, uuid, sizeof(uuid_t)); - ds_rsvc_start(DS_RSVC_CLASS_POOL, &id, uuid, false /* create */, 0 /* size */, + ds_rsvc_start(DS_RSVC_CLASS_POOL, &id, uuid, RDB_NIL_TERM, false /* create */, 0 /* size */, NULL /* replicas */, NULL /* arg */); return 0; } @@ -2212,6 +2307,7 @@ pool_prop_read(struct rdb_tx *tx, const struct pool_svc *svc, uint64_t bits, if (rc == -DER_NONEXIST && global_ver < 2) { rc = 0; val = DAOS_PROP_PO_SVC_REDUN_FAC_DEFAULT; + prop->dpp_entries[idx].dpe_flags |= DAOS_PROP_ENTRY_NOT_SET; } else if (rc != 0) { D_GOTO(out_prop, rc); } @@ -2332,7 +2428,7 @@ ds_pool_create_handler(crt_rpc_t *rpc) if (rc != 0) D_GOTO(out_mutex, rc); - if (svc->ps_rsvc.s_state == DS_RSVC_UP_EMPTY) { + if (ds_rsvc_get_state(&svc->ps_rsvc) == DS_RSVC_UP_EMPTY) { /* * The DB is no longer empty. Since the previous * pool_svc_step_up_cb() call didn't finish stepping up due to @@ -2348,8 +2444,7 @@ ds_pool_create_handler(crt_rpc_t *rpc) rdb_resign(svc->ps_rsvc.s_db, svc->ps_rsvc.s_term); D_GOTO(out_mutex, rc); } - svc->ps_rsvc.s_state = DS_RSVC_UP; - ABT_cond_broadcast(svc->ps_rsvc.s_state_cv); + ds_rsvc_set_state(&svc->ps_rsvc, DS_RSVC_UP); } out_mutex: @@ -5047,22 +5142,39 @@ ds_pool_svc_delete_acl(uuid_t pool_uuid, d_rank_list_t *ranks, } static void -pool_svc_reconfigure_replicas(struct pool_svc *svc, int svc_rf, struct pool_map *map) -{ - d_rank_list_t *current; - d_rank_list_t *to_add; - d_rank_list_t *to_remove; - d_rank_list_t *new; - int rc; +pool_svc_reconf_ult(void *arg) +{ + struct pool_svc *svc = arg; + struct pool_svc_reconf *reconf = &svc->ps_reconf; + d_rank_list_t *current; + d_rank_list_t *to_add; + d_rank_list_t *to_remove; + d_rank_list_t *new; + int rc; + + D_DEBUG(DB_MD, DF_UUID": begin\n", DP_UUID(svc->ps_uuid)); + + if (reconf->psc_canceled) + goto out; + + /* When there are pending events, the pool map may be unstable. */ + while (events_pending(svc)) { + dss_sleep(3000 /* ms */); + if (reconf->psc_canceled) + goto out; + } rc = rdb_get_ranks(svc->ps_rsvc.s_db, ¤t); if (rc != 0) { D_ERROR(DF_UUID": failed to get pool service replica ranks: "DF_RC"\n", DP_UUID(svc->ps_uuid), DP_RC(rc)); - return; + goto out; } - rc = ds_pool_plan_svc_reconfs(svc_rf, map, current, dss_self_rank(), &to_add, &to_remove); + ABT_rwlock_rdlock(svc->ps_pool->sp_lock); + rc = ds_pool_plan_svc_reconfs(reconf->psc_svc_rf, svc->ps_pool->sp_map, current, + dss_self_rank(), &to_add, &to_remove); + ABT_rwlock_unlock(svc->ps_pool->sp_lock); if (rc != 0) { D_ERROR(DF_UUID": cannot plan pool service reconfigurations: "DF_RC"\n", DP_UUID(svc->ps_uuid), DP_RC(rc)); @@ -5070,7 +5182,8 @@ pool_svc_reconfigure_replicas(struct pool_svc *svc, int svc_rf, struct pool_map } D_DEBUG(DB_MD, DF_UUID": svc_rf=%d current=%u to_add=%u to_remove=%u\n", - DP_UUID(svc->ps_uuid), svc_rf, current->rl_nr, to_add->rl_nr, to_remove->rl_nr); + DP_UUID(svc->ps_uuid), reconf->psc_svc_rf, current->rl_nr, to_add->rl_nr, + to_remove->rl_nr); /* * Ignore the return values from the "add" and "remove" calls here. If @@ -5082,64 +5195,116 @@ pool_svc_reconfigure_replicas(struct pool_svc *svc, int svc_rf, struct pool_map */ if (to_add->rl_nr > 0) ds_rsvc_add_replicas_s(&svc->ps_rsvc, to_add, ds_rsvc_get_md_cap()); + if (reconf->psc_canceled) + goto out_to_add_remove; if (to_add->rl_nr > to_remove->rl_nr) to_remove->rl_nr = 0; else to_remove->rl_nr -= to_add->rl_nr; - if (to_remove->rl_nr > 0) - ds_rsvc_remove_replicas_s(&svc->ps_rsvc, to_remove, false /* stop */); + if (to_remove->rl_nr > 0) { + d_rank_list_t *tmp; + + /* + * Since the ds_rsvc_dist_stop part is likely to hit RPC + * timeouts, after removing the replicas from the membership, + * we notify the MS first, and then come back to + * ds_rsvc_dist_stop. + */ + rc = d_rank_list_dup(&tmp, to_remove); + if (rc != 0) { + D_ERROR(DF_UUID": failed to duplicate to_remove: "DF_RC"\n", + DP_UUID(svc->ps_uuid), DP_RC(rc)); + goto out_to_add_remove; + } + rc = rdb_remove_replicas(svc->ps_rsvc.s_db, tmp); + /* Delete from to_remove ranks that are not removed. */ + d_rank_list_filter(tmp, to_remove, true /* exclude */); + d_rank_list_free(tmp); + } if (rdb_get_ranks(svc->ps_rsvc.s_db, &new) == 0) { d_rank_list_sort(current); d_rank_list_sort(new); - if (!d_rank_list_identical(new, current)) { + if (reconf->psc_force_notify || !d_rank_list_identical(new, current)) { /* * Send RAS event to control-plane over dRPC to indicate * change in pool service replicas. */ - rc = ds_notify_pool_svc_update(&svc->ps_uuid, new); - if (rc != 0) + rc = ds_notify_pool_svc_update(&svc->ps_uuid, new, svc->ps_rsvc.s_term); + if (rc == 0) + reconf->psc_force_notify = false; + else D_ERROR(DF_UUID": replica update notify failure: "DF_RC"\n", DP_UUID(svc->ps_uuid), DP_RC(rc)); } d_rank_list_free(new); } + if (reconf->psc_canceled) + goto out_to_add_remove; + + /* Ignore the return value of this ds_rsvc_dist_stop call. */ + if (to_remove->rl_nr > 0) + ds_rsvc_dist_stop(svc->ps_rsvc.s_class, &svc->ps_rsvc.s_id, to_remove, + NULL /* excluded */, svc->ps_rsvc.s_term, true /* destroy */); +out_to_add_remove: d_rank_list_free(to_remove); d_rank_list_free(to_add); out_cur: d_rank_list_free(current); +out: + reconf_end(reconf); + ABT_cond_broadcast(reconf->psc_cv); + D_DEBUG(DB_MD, DF_UUID": end\n", DP_UUID(svc->ps_uuid)); } -static int -get_svc_rf(struct rdb_tx *tx, struct pool_svc *svc) +static void +pool_svc_schedule_reconf(struct pool_svc *svc) { - uint32_t global_version; - d_iov_t value; - daos_prop_t *svc_rf_prop = NULL; - struct daos_prop_entry *svc_rf_entry; + struct pool_svc_reconf *reconf = &svc->ps_reconf; + enum ds_rsvc_state state; int rc; - d_iov_set(&value, &global_version, sizeof(global_version)); - rc = rdb_tx_lookup(tx, &svc->ps_root, &ds_pool_prop_global_version, &value); - if (rc == -DER_NONEXIST) - global_version = 0; - else if (rc != 0) - return rc; - if (global_version < 2) - return -DER_NONEXIST; + D_DEBUG(DB_MD, DF_UUID": begin\n", DP_UUID(svc->ps_uuid)); - rc = pool_prop_read(tx, svc, DAOS_PO_QUERY_PROP_SVC_REDUN_FAC, &svc_rf_prop); - if (rc != 0) - return rc; - svc_rf_entry = daos_prop_entry_get(svc_rf_prop, DAOS_PROP_PO_SVC_REDUN_FAC); - D_ASSERT(svc_rf_entry != NULL && daos_prop_is_set(svc_rf_entry)); - rc = svc_rf_entry->dpe_val; - daos_prop_free(svc_rf_prop); + /* + * Avoid scheduling new reconfigurations when the PS is stepping down + * and has already called pool_svc_cancel_and_wait_reconf. + */ + state = ds_rsvc_get_state(&svc->ps_rsvc); + if (state == DS_RSVC_DRAINING) { + D_DEBUG(DB_MD, DF_UUID": end: service %s\n", DP_UUID(svc->ps_uuid), + ds_rsvc_state_str(state)); + return; + } - return rc; + reconf_cancel_and_wait(reconf); + + /* Ended by pool_svc_reconf_ult. */ + reconf_begin(reconf); + + /* + * An extra svc leader reference is not required, because + * pool_svc_step_down_cb waits for this ULT to terminate. + * + * ULT tracking is achieved through svc->ps_reconf, not a ULT handle. + */ + rc = dss_ult_create(pool_svc_reconf_ult, svc, DSS_XS_SELF, 0, 0, NULL /* ult */); + if (rc != 0) { + D_ERROR(DF_UUID": failed to create reconfiguration ULT: "DF_RC"\n", + DP_UUID(svc->ps_uuid), DP_RC(rc)); + reconf_end(reconf); + } + + D_DEBUG(DB_MD, DF_UUID": end: "DF_RC"\n", DP_UUID(svc->ps_uuid), DP_RC(rc)); +} + +static void +pool_svc_cancel_and_wait_reconf(struct pool_svc *svc) +{ + reconf_cancel_and_wait(&svc->ps_reconf); } static int pool_find_all_targets_by_addr(struct pool_map *map, @@ -5178,7 +5343,6 @@ pool_svc_update_map_internal(struct pool_svc *svc, unsigned int opc, struct pool_target_addr_list *inval_tgt_addrs) { struct rdb_tx tx; - uint64_t svc_rf; struct pool_map *map; uint32_t map_version_before; uint32_t map_version; @@ -5195,14 +5359,6 @@ pool_svc_update_map_internal(struct pool_svc *svc, unsigned int opc, goto out; ABT_rwlock_wrlock(svc->ps_lock); - rc = get_svc_rf(&tx, svc); - if (rc == -DER_NONEXIST) - svc_rf = -1; - else if (rc >= 0) - svc_rf = rc; - else - goto out_lock; - /* Create a temporary pool map based on the last committed version. */ rc = read_map(&tx, &svc->ps_root, &map); if (rc != 0) @@ -5282,7 +5438,7 @@ pool_svc_update_map_internal(struct pool_svc *svc, unsigned int opc, ds_rsvc_request_map_dist(&svc->ps_rsvc); - pool_svc_reconfigure_replicas(svc, svc_rf, map); + pool_svc_schedule_reconf(svc); out_map_buf: pool_buf_free(map_buf); @@ -5505,7 +5661,6 @@ pool_extend_map(struct rdb_tx *tx, struct pool_svc *svc, uint32_t nnodes, uint32_t *domains, bool *updated_p, uint32_t *map_version_p, struct rsvc_hint *hint) { - uint64_t svc_rf; struct pool_buf *map_buf = NULL; struct pool_map *map = NULL; uint32_t map_version; @@ -5515,14 +5670,6 @@ pool_extend_map(struct rdb_tx *tx, struct pool_svc *svc, uint32_t nnodes, ntargets = nnodes * dss_tgt_nr; - rc = get_svc_rf(tx, svc); - if (rc == -DER_NONEXIST) - svc_rf = -1; - else if (rc >= 0) - svc_rf = rc; - else - return rc; - /* Create a temporary pool map based on the last committed version. */ rc = read_map(tx, &svc->ps_root, &map); if (rc != 0) @@ -5571,7 +5718,7 @@ pool_extend_map(struct rdb_tx *tx, struct pool_svc *svc, uint32_t nnodes, ds_rsvc_request_map_dist(&svc->ps_rsvc); - pool_svc_reconfigure_replicas(svc, svc_rf, map); + pool_svc_schedule_reconf(svc); out_map: if (map_version_p != NULL) { @@ -6698,8 +6845,7 @@ ds_pool_replicas_update_handler(crt_rpc_t *rpc) break; case POOL_REPLICAS_REMOVE: - rc = ds_rsvc_remove_replicas(DS_RSVC_CLASS_POOL, &id, ranks, - true /* stop */, &out->pmo_hint); + rc = ds_rsvc_remove_replicas(DS_RSVC_CLASS_POOL, &id, ranks, &out->pmo_hint); break; default: diff --git a/src/rdb/rdb.c b/src/rdb/rdb.c index b322cd79a6b..c3a72984a8b 100644 --- a/src/rdb/rdb.c +++ b/src/rdb/rdb.c @@ -18,7 +18,8 @@ #include "rdb_layout.h" static int rdb_open_internal(daos_handle_t pool, daos_handle_t mc, const uuid_t uuid, - struct rdb_cbs *cbs, void *arg, struct rdb **dbp); + uint64_t caller_term, struct rdb_cbs *cbs, void *arg, + struct rdb **dbp); /** * Create an RDB replica at \a path with \a uuid, \a size, and \a replicas, and @@ -26,6 +27,7 @@ static int rdb_open_internal(daos_handle_t pool, daos_handle_t mc, const uuid_t * * \param[in] path replica path * \param[in] uuid database UUID + * \param[in] caller_term caller term if not RDB_NIL_TERM (see rdb_open) * \param[in] size replica size in bytes * \param[in] replicas list of replica ranks * \param[in] cbs callbacks (not copied) @@ -33,8 +35,9 @@ static int rdb_open_internal(daos_handle_t pool, daos_handle_t mc, const uuid_t * \param[out] storagep database storage */ int -rdb_create(const char *path, const uuid_t uuid, size_t size, const d_rank_list_t *replicas, - struct rdb_cbs *cbs, void *arg, struct rdb_storage **storagep) +rdb_create(const char *path, const uuid_t uuid, uint64_t caller_term, size_t size, + const d_rank_list_t *replicas, struct rdb_cbs *cbs, void *arg, + struct rdb_storage **storagep) { daos_handle_t pool; daos_handle_t mc; @@ -43,8 +46,8 @@ rdb_create(const char *path, const uuid_t uuid, size_t size, const d_rank_list_t struct rdb *db; int rc; - D_DEBUG(DB_MD, DF_UUID": creating db %s with %u replicas\n", - DP_UUID(uuid), path, replicas == NULL ? 0 : replicas->rl_nr); + D_DEBUG(DB_MD, DF_UUID": creating db %s with %u replicas: caller_term="DF_X64"\n", + DP_UUID(uuid), path, replicas == NULL ? 0 : replicas->rl_nr, caller_term); /* * Create and open a VOS pool. RDB pools specify VOS_POF_SMALL for @@ -86,7 +89,7 @@ rdb_create(const char *path, const uuid_t uuid, size_t size, const d_rank_list_t if (rc != 0) goto out_mc_hdl; - rc = rdb_open_internal(pool, mc, uuid, cbs, arg, &db); + rc = rdb_open_internal(pool, mc, uuid, caller_term, cbs, arg, &db); if (rc != 0) goto out_mc_hdl; @@ -224,8 +227,8 @@ rdb_lookup(const uuid_t uuid) * the caller shall not close in this case. */ static int -rdb_open_internal(daos_handle_t pool, daos_handle_t mc, const uuid_t uuid, struct rdb_cbs *cbs, - void *arg, struct rdb **dbp) +rdb_open_internal(daos_handle_t pool, daos_handle_t mc, const uuid_t uuid, uint64_t caller_term, + struct rdb_cbs *cbs, void *arg, struct rdb **dbp) { struct rdb *db; int rc; @@ -309,7 +312,7 @@ rdb_open_internal(daos_handle_t pool, daos_handle_t mc, const uuid_t uuid, struc SCM_TOTAL(&vps), SCM_FREE(&vps), SCM_SYS(&vps), rdb_extra_sys[DAOS_MEDIA_SCM]); - rc = rdb_raft_open(db); + rc = rdb_raft_open(db, caller_term); if (rc != 0) goto err_kvss; @@ -333,14 +336,22 @@ rdb_open_internal(daos_handle_t pool, daos_handle_t mc, const uuid_t uuid, struc /** * Open an RDB replica at \a path. * + * If \a caller_term is not RDB_NIL_TERM, it shall be the term of the leader + * (of the same RDB) who is calling this function (usually via an RPC). This is + * used to perform the Raft term check/update so that an older leader doesn't + * interrupt with a newer leader. + * * \param[in] path replica path * \param[in] uuid database UUID + * \param[in] caller_term caller term if not RDB_NIL_TERM * \param[in] cbs callbacks (not copied) * \param[in] arg argument for cbs * \param[out] storagep database storage + * + * \retval -DER_STALE \a caller_term < the current term */ int -rdb_open(const char *path, const uuid_t uuid, struct rdb_cbs *cbs, void *arg, +rdb_open(const char *path, const uuid_t uuid, uint64_t caller_term, struct rdb_cbs *cbs, void *arg, struct rdb_storage **storagep) { daos_handle_t pool; @@ -351,7 +362,8 @@ rdb_open(const char *path, const uuid_t uuid, struct rdb_cbs *cbs, void *arg, struct rdb *db; int rc; - D_DEBUG(DB_MD, DF_UUID": opening db %s\n", DP_UUID(uuid), path); + D_DEBUG(DB_MD, DF_UUID": opening db %s: caller_term="DF_X64"\n", DP_UUID(uuid), path, + caller_term); /* * RDB pools specify VOS_POF_SMALL for basic system memory reservation @@ -422,7 +434,7 @@ rdb_open(const char *path, const uuid_t uuid, struct rdb_cbs *cbs, void *arg, goto err_mc; } - rc = rdb_open_internal(pool, mc, uuid, cbs, arg, &db); + rc = rdb_open_internal(pool, mc, uuid, caller_term, cbs, arg, &db); if (rc != 0) goto err_mc; @@ -652,6 +664,22 @@ rdb_campaign(struct rdb *db) return rdb_raft_campaign(db); } +/** + * Simulate a ping (i.e., an empty AE) from the leader of \a caller_term to \a + * db. This essentially checks if \a caller_term is stale, and if not, update + * the current term. See also rdb_open. + * + * \param[in] db database + * \param[in] caller_term caller term + * + * \retval -DER_STALE \a caller_term < the current term + */ +int +rdb_ping(struct rdb *db, uint64_t caller_term) +{ + return rdb_raft_ping(db, caller_term); +} + /** * Is this replica in the leader state? True does not guarantee a _current_ * leadership. diff --git a/src/rdb/rdb_internal.h b/src/rdb/rdb_internal.h index ec064067996..7aee95f8fa4 100644 --- a/src/rdb/rdb_internal.h +++ b/src/rdb/rdb_internal.h @@ -150,14 +150,14 @@ struct rdb_raft_node { struct rdb_raft_is dn_is; }; -int rdb_raft_init(daos_handle_t pool, daos_handle_t mc, - const d_rank_list_t *replicas); -int rdb_raft_open(struct rdb *db); +int rdb_raft_init(daos_handle_t pool, daos_handle_t mc, const d_rank_list_t *replicas); +int rdb_raft_open(struct rdb *db, uint64_t caller_term); int rdb_raft_start(struct rdb *db); void rdb_raft_stop(struct rdb *db); void rdb_raft_close(struct rdb *db); void rdb_raft_resign(struct rdb *db, uint64_t term); int rdb_raft_campaign(struct rdb *db); +int rdb_raft_ping(struct rdb *db, uint64_t caller_term); int rdb_raft_verify_leadership(struct rdb *db); int rdb_raft_add_replica(struct rdb *db, d_rank_t rank); int rdb_raft_remove_replica(struct rdb *db, d_rank_t rank); diff --git a/src/rdb/rdb_raft.c b/src/rdb/rdb_raft.c index c8c342206c9..bcd0692e4a3 100644 --- a/src/rdb/rdb_raft.c +++ b/src/rdb/rdb_raft.c @@ -2175,8 +2175,7 @@ rdb_raft_destroy_lc(daos_handle_t pool, daos_handle_t mc, d_iov_t *key, * error. */ int -rdb_raft_init(daos_handle_t pool, daos_handle_t mc, - const d_rank_list_t *replicas) +rdb_raft_init(daos_handle_t pool, daos_handle_t mc, const d_rank_list_t *replicas) { daos_handle_t lc; struct rdb_lc_record record; @@ -2453,7 +2452,7 @@ rdb_raft_get_ae_max_size(void) } int -rdb_raft_open(struct rdb *db) +rdb_raft_open(struct rdb *db, uint64_t caller_term) { int rc; @@ -2502,6 +2501,32 @@ rdb_raft_open(struct rdb *db) goto err_replies_cv; } + if (caller_term != RDB_NIL_TERM) { + uint64_t term; + d_iov_t value; + + d_iov_set(&value, &term, sizeof(term)); + rc = rdb_mc_lookup(db->d_mc, RDB_MC_ATTRS, &rdb_mc_term, &value); + if (rc == -DER_NONEXIST) + term = 0; + else if (rc != 0) + goto err_compact_cv; + + if (caller_term < term) { + D_DEBUG(DB_MD, DF_DB": stale caller term: "DF_X64" < "DF_X64"\n", DP_DB(db), + caller_term, term); + rc = -DER_STALE; + goto err_compact_cv; + } else if (caller_term > term) { + D_DEBUG(DB_MD, DF_DB": updating term: "DF_X64" -> "DF_X64"\n", DP_DB(db), + term, caller_term); + d_iov_set(&value, &caller_term, sizeof(caller_term)); + rc = rdb_mc_update(db->d_mc, RDB_MC_ATTRS, 1 /* n */, &rdb_mc_term, &value); + if (rc != 0) + goto err_compact_cv; + } + } + rc = rdb_raft_open_lc(db); if (rc != 0) goto err_compact_cv; @@ -2774,6 +2799,31 @@ rdb_raft_campaign(struct rdb *db) return rc; } +int +rdb_raft_ping(struct rdb *db, uint64_t caller_term) +{ + msg_appendentries_t ae = {.term = caller_term}; + msg_appendentries_response_t ae_resp; + struct rdb_raft_state state; + int rc; + + ABT_mutex_lock(db->d_raft_mutex); + rdb_raft_save_state(db, &state); + rc = raft_recv_appendentries(db->d_raft, NULL /* node */, &ae, &ae_resp); + rc = rdb_raft_check_state(db, &state, rc); + ABT_mutex_unlock(db->d_raft_mutex); + if (rc != 0) + return rc; + + if (caller_term < ae_resp.term) { + D_DEBUG(DB_MD, DF_DB": stale caller term: "DF_X64" < "DF_X64"\n", DP_DB(db), + caller_term, ae_resp.term); + return -DER_STALE; + } + + return 0; +} + /* Wait for index to be applied in term. For leaders only. * Caller initially holds d_raft_mutex. */ diff --git a/src/rdb/tests/README b/src/rdb/tests/README index 090abbb37c1..4b8a9210752 100644 --- a/src/rdb/tests/README +++ b/src/rdb/tests/README @@ -3,10 +3,11 @@ Manual execution instructions: From the command line the tests are run with: ==== server: start on S server ranks: +# specify all regular modules plus rdbt orterun -np --hostfile --map-by-node --enable-recovery -x LD_LIBRARY_PATH daos_server -o /utils/config/examples/daos_server_unittests.yml - start -d ./ -t 1 -m vos,rdb,rsvc,mgmt,rdbt + start -d ./ -t 1 -m vos,rdb,rsvc,security,mgmt,dtx,pool,cont,obj,rebuild,rdbt ==== agent: daos_agent --config-path=/utils/config/daos_agent.yml @@ -23,7 +24,7 @@ rdbt init --group=daos_server --uuid --replicas= # wait a short number of seconds for RDB initialization, then: # create KV stores in the initialized RDB on N replicas -rdbt create --group=daos_server -replicas= --nranks= +rdbt create --group=daos_server --replicas= --nranks= # run multi-replica tests rdbt test-multi --group=daos_server --replicas= --nranks= diff --git a/src/rdb/tests/rdb_test.c b/src/rdb/tests/rdb_test.c index 3722c978df5..ee4742d4aec 100644 --- a/src/rdb/tests/rdb_test.c +++ b/src/rdb/tests/rdb_test.c @@ -34,8 +34,6 @@ rdbt_svc_obj(struct ds_rsvc *rsvc) return container_of(rsvc, struct rdbt_svc, rt_rsvc); } -#define ID_OK(id) ioveq((id), &test_svc_id) - #define MUST(call) \ do { \ int _rc = call; \ @@ -62,8 +60,7 @@ ioveq(const d_iov_t *iov1, const d_iov_t *iov2) static int test_svc_name_cb(d_iov_t *id, char **name) { - ID_OK(id); - D_STRNDUP(*name, test_svc_name, strlen(test_svc_name)); + D_STRNDUP(*name, id->iov_buf, id->iov_len - 1); D_ASSERT(*name != NULL); return 0; } @@ -73,8 +70,7 @@ test_svc_locate_cb(d_iov_t *id, char **path) { int rc; - ID_OK(id); - rc = asprintf(path, "%s/rdbt-%s", dss_storage_path, test_svc_name); + rc = asprintf(path, "%s/rdbt-%s", dss_storage_path, (char *)id->iov_buf); D_ASSERTF(rc > 0, "%d\n", rc); D_ASSERT(*path != NULL); return 0; @@ -85,10 +81,14 @@ test_svc_alloc_cb(d_iov_t *id, struct ds_rsvc **svcp) { struct rdbt_svc *svc; - ID_OK(id); D_ALLOC_PTR(svc); D_ASSERT(svc != NULL); - svc->rt_rsvc.s_id = test_svc_id; + + D_ALLOC(svc->rt_rsvc.s_id.iov_buf, id->iov_len); + D_ASSERT(svc->rt_rsvc.s_id.iov_buf != NULL); + memcpy(svc->rt_rsvc.s_id.iov_buf, id->iov_buf, id->iov_len); + svc->rt_rsvc.s_id.iov_buf_len = id->iov_len; + svc->rt_rsvc.s_id.iov_len = id->iov_len; MUST(rdb_path_init(&svc->rt_root_kvs_path)); MUST(rdb_path_push(&svc->rt_root_kvs_path, &rdb_path_root_key)); @@ -108,6 +108,7 @@ test_svc_free_cb(struct ds_rsvc *rsvc) svc = rdbt_svc_obj(rsvc); rdb_path_fini(&svc->rt_kvs1_path); rdb_path_fini(&svc->rt_root_kvs_path); + D_FREE(svc->rt_rsvc.s_id.iov_buf); D_FREE(svc); } @@ -261,6 +262,39 @@ rdbt_test_path(void) rdb_path_fini(&path); } +static void +rdbt_test_rsvc(void) +{ + char *svc_name = "tmp"; + d_iov_t svc_id; + uuid_t uuid; + int rc; + + d_iov_set(&svc_id, svc_name, strlen(svc_name) + 1); + uuid_generate(uuid); + + /* + * A leader of an older term can't destroy a replica created by a + * leader with a newer term. + */ + MUST(ds_rsvc_start(DS_RSVC_CLASS_TEST, &svc_id, uuid, 2 /* term */, true /* create */, + DB_CAP, NULL /* replicas */, NULL /* arg */)); + rc = ds_rsvc_stop(DS_RSVC_CLASS_TEST, &svc_id, 1 /* term */, true /* destroy */); + D_ASSERTF(rc == -DER_STALE, DF_RC"\n", DP_RC(rc)); + + /* + * A leader of an older term can't destroy a replica touched by a + * leader with a newer term. + */ + rc = ds_rsvc_start(DS_RSVC_CLASS_TEST, &svc_id, uuid, 3 /* term */, true /* create */, + DB_CAP, NULL /* replicas */, NULL /* arg */); + D_ASSERTF(rc == -DER_ALREADY, DF_RC"\n", DP_RC(rc)); + rc = ds_rsvc_stop(DS_RSVC_CLASS_TEST, &svc_id, 2 /* term */, true /* destroy */); + D_ASSERTF(rc == -DER_STALE, DF_RC"\n", DP_RC(rc)); + + MUST(ds_rsvc_stop(DS_RSVC_CLASS_TEST, &svc_id, 3 /* term */, true /* destroy */)); +} + struct iterate_cb_arg { uint64_t *keys; int nkeys; @@ -606,9 +640,8 @@ rdbt_init_handler(crt_rpc_t *rpc) for (ri = 0; ri < ranks->rl_nr; ri++) D_WARN("ranks[%u]=%u\n", ri, ranks->rl_ranks[ri]); - MUST(ds_rsvc_dist_start(DS_RSVC_CLASS_TEST, &test_svc_id, in->tii_uuid, - ranks, true /* create */, true /* bootstrap */, - DB_CAP)); + MUST(ds_rsvc_dist_start(DS_RSVC_CLASS_TEST, &test_svc_id, in->tii_uuid, ranks, RDB_NIL_TERM, + true /* create */, true /* bootstrap */, DB_CAP)); crt_reply_send(rpc); } @@ -629,8 +662,7 @@ rdbt_fini_handler(crt_rpc_t *rpc) for (ri = 0; ri < ranks->rl_nr; ri++) D_WARN("ranks[%u]=%u\n", ri, ranks->rl_ranks[ri]); - MUST(ds_rsvc_dist_stop(DS_RSVC_CLASS_TEST, &test_svc_id, ranks, NULL, - true)); + MUST(ds_rsvc_dist_stop(DS_RSVC_CLASS_TEST, &test_svc_id, ranks, NULL, RDB_NIL_TERM, true)); crt_reply_send(rpc); } @@ -698,6 +730,7 @@ rdbt_test_handler(crt_rpc_t *rpc) rdbt_membership_opname(in->tti_memb_op)); rdbt_test_util(); rdbt_test_path(); + rdbt_test_rsvc(); rc = rdbt_test_tx(in->tti_update, in->tti_memb_op, in->tti_key, in->tti_val, &out->tto_val, &out->tto_hint); out->tto_rc = rc; @@ -760,8 +793,7 @@ rdbt_replicas_remove_handler(crt_rpc_t *rpc) if (rc != 0) goto out; - rc = ds_rsvc_remove_replicas(DS_RSVC_CLASS_TEST, &test_svc_id, ranks, - true /* stop */, &out->rtmo_hint); + rc = ds_rsvc_remove_replicas(DS_RSVC_CLASS_TEST, &test_svc_id, ranks, &out->rtmo_hint); out->rtmo_failed = ranks; out: diff --git a/src/rsvc/rpc.h b/src/rsvc/rpc.h index 21f36ffc179..8f8fd658a88 100644 --- a/src/rsvc/rpc.h +++ b/src/rsvc/rpc.h @@ -1,5 +1,5 @@ /* - * (C) Copyright 2016-2021 Intel Corporation. + * (C) Copyright 2016-2022 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -23,7 +23,7 @@ * These are for daos_rpc::dr_opc and DAOS_RPC_OPCODE(opc, ...) rather than * crt_req_create(..., opc, ...). See src/include/daos/rpc.h. */ -#define DAOS_RSVC_VERSION 2 +#define DAOS_RSVC_VERSION 3 /* LIST of internal RPCS in form of: * OPCODE, flags, FMT, handler, corpc_hdlr, */ @@ -54,6 +54,7 @@ extern struct crt_proto_format rsvc_proto_fmt; ((uint32_t) (sai_class) CRT_VAR) \ ((uint32_t) (sai_flags) CRT_VAR) \ ((uint64_t) (sai_size) CRT_VAR) \ + ((uint64_t) (sai_term) CRT_VAR) \ ((d_rank_list_t) (sai_ranks) CRT_PTR) #define DAOS_OSEQ_RSVC_START /* output fields (rc: err count) */ \ @@ -66,7 +67,8 @@ CRT_RPC_DECLARE(rsvc_start, DAOS_ISEQ_RSVC_START, DAOS_OSEQ_RSVC_START) #define DAOS_ISEQ_RSVC_STOP /* input fields */ \ ((d_iov_t) (soi_svc_id) CRT_VAR) \ ((uint32_t) (soi_class) CRT_VAR) \ - ((uint32_t) (soi_flags) CRT_VAR) + ((uint32_t) (soi_flags) CRT_VAR) \ + ((uint64_t) (soi_term) CRT_VAR) #define DAOS_OSEQ_RSVC_STOP /* output fields */ \ ((int32_t) (soo_rc) CRT_VAR) diff --git a/src/rsvc/srv.c b/src/rsvc/srv.c index ed48e1a58ab..934fe1fa3fe 100644 --- a/src/rsvc/srv.c +++ b/src/rsvc/srv.c @@ -42,8 +42,8 @@ rsvc_class(enum ds_rsvc_class_id id) return rsvc_classes[id]; } -static char * -state_str(enum ds_rsvc_state state) +char * +ds_rsvc_state_str(enum ds_rsvc_state state) { switch (state) { case DS_RSVC_UP_EMPTY: @@ -388,8 +388,8 @@ ds_rsvc_put_leader(struct ds_rsvc *svc) static void change_state(struct ds_rsvc *svc, enum ds_rsvc_state state) { - D_DEBUG(DB_MD, "%s: term "DF_U64" state %s to %s\n", svc->s_name, - svc->s_term, state_str(svc->s_state), state_str(state)); + D_DEBUG(DB_MD, "%s: term "DF_U64" state %s to %s\n", svc->s_name, svc->s_term, + ds_rsvc_state_str(svc->s_state), ds_rsvc_state_str(state)); svc->s_state = state; ABT_cond_broadcast(svc->s_state_cv); } @@ -686,7 +686,7 @@ self_only(d_rank_list_t *replicas) } static int -start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, bool create, +start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, uint64_t term, bool create, size_t size, d_rank_list_t *replicas, void *arg, struct ds_rsvc **svcp) { struct rdb_storage *storage; @@ -699,10 +699,10 @@ start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, bool create, svc->s_ref++; if (create) - rc = rdb_create(svc->s_db_path, svc->s_db_uuid, size, replicas, &rsvc_rdb_cbs, svc, - &storage); + rc = rdb_create(svc->s_db_path, svc->s_db_uuid, term, size, replicas, &rsvc_rdb_cbs, + svc, &storage); else - rc = rdb_open(svc->s_db_path, svc->s_db_uuid, &rsvc_rdb_cbs, svc, &storage); + rc = rdb_open(svc->s_db_path, svc->s_db_uuid, term, &rsvc_rdb_cbs, svc, &storage); if (rc != 0) goto err_svc; @@ -838,6 +838,7 @@ ds_rsvc_stop_nodb(enum ds_rsvc_class_id class, d_iov_t *id) * \param[in] class replicated service class * \param[in] id replicated service ID * \param[in] db_uuid DB UUID + * \param[in] caller_term caller term if not RDB_NIL_TERM (see rdb_open) * \param[in] create whether to create the replica before starting * \param[in] size replica size in bytes * \param[in] replicas optional initial membership @@ -845,9 +846,10 @@ ds_rsvc_stop_nodb(enum ds_rsvc_class_id class, d_iov_t *id) * * \retval -DER_ALREADY replicated service already started * \retval -DER_CANCELED replicated service stopping + * \retval -DER_STALE stale \a caller_term */ int -ds_rsvc_start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, +ds_rsvc_start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, uint64_t caller_term, bool create, size_t size, d_rank_list_t *replicas, void *arg) { struct ds_rsvc *svc = NULL; @@ -859,8 +861,16 @@ ds_rsvc_start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, entry = d_hash_rec_find(&rsvc_hash, id->iov_buf, id->iov_len); if (entry != NULL) { svc = rsvc_obj(entry); - D_DEBUG(DB_MD, "%s: found: stop=%d\n", svc->s_name, - svc->s_stop); + D_DEBUG(DB_MD, "%s: found: stop=%d\n", svc->s_name, svc->s_stop); + if (caller_term != RDB_NIL_TERM) { + rc = rdb_ping(svc->s_db, caller_term); + if (rc != 0) { + D_CDEBUG(rc == -DER_STALE, DB_MD, DLOG_ERR, + "%s: failed to ping local replica\n", svc->s_name); + ds_rsvc_put(svc); + goto out; + } + } if (svc->s_stop) rc = -DER_CANCELED; else @@ -869,7 +879,7 @@ ds_rsvc_start(enum ds_rsvc_class_id class, d_iov_t *id, uuid_t db_uuid, goto out; } - rc = start(class, id, db_uuid, create, size, replicas, arg, &svc); + rc = start(class, id, db_uuid, caller_term, create, size, replicas, arg, &svc); if (rc != 0) goto out; @@ -931,22 +941,37 @@ stop(struct ds_rsvc *svc, bool destroy) * * \param[in] class replicated service class * \param[in] id replicated service ID + * \param[in] caller_term caller term if not RDB_NIL_TERM (see rdb_open) * \param[in] destroy whether to destroy the replica after stopping * * \retval -DER_ALREADY replicated service already stopped * \retval -DER_CANCELED replicated service stopping + * \retval -DER_STALE stale \a caller_term */ int -ds_rsvc_stop(enum ds_rsvc_class_id class, d_iov_t *id, bool destroy) +ds_rsvc_stop(enum ds_rsvc_class_id class, d_iov_t *id, uint64_t caller_term, bool destroy) { struct ds_rsvc *svc; int rc; D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); + rc = ds_rsvc_lookup(class, id, &svc); if (rc != 0) return -DER_ALREADY; + + if (caller_term != RDB_NIL_TERM) { + rc = rdb_ping(svc->s_db, caller_term); + if (rc != 0) { + D_CDEBUG(rc == -DER_STALE, DB_MD, DLOG_ERR, + "%s: failed to ping local replica\n", svc->s_name); + ds_rsvc_put(svc); + return rc; + } + } + d_hash_rec_delete_at(&rsvc_hash, &svc->s_entry); + return stop(svc, destroy); } @@ -1050,7 +1075,7 @@ ds_rsvc_add_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, size_t size) { int rc; - rc = ds_rsvc_dist_start(svc->s_class, &svc->s_id, svc->s_db_uuid, ranks, + rc = ds_rsvc_dist_start(svc->s_class, &svc->s_id, svc->s_db_uuid, ranks, svc->s_term, true /* create */, false /* bootstrap */, size); /* TODO: Attempt to only add replicas that were successfully started */ @@ -1061,12 +1086,24 @@ ds_rsvc_add_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, size_t size) /* Clean up ranks that were not added */ if (ranks->rl_nr > 0) { D_ASSERT(rc != 0); - ds_rsvc_dist_stop(svc->s_class, &svc->s_id, ranks, - NULL, true /* destroy */); + ds_rsvc_dist_stop(svc->s_class, &svc->s_id, ranks, NULL, svc->s_term, + true /* destroy */); } return rc; } +enum ds_rsvc_state +ds_rsvc_get_state(struct ds_rsvc *svc) +{ + return svc->s_state; +} + +void +ds_rsvc_set_state(struct ds_rsvc *svc, enum ds_rsvc_state state) +{ + change_state(svc, state); +} + int ds_rsvc_add_replicas(enum ds_rsvc_class_id class, d_iov_t *id, d_rank_list_t *ranks, size_t size, struct rsvc_hint *hint) @@ -1084,7 +1121,7 @@ ds_rsvc_add_replicas(enum ds_rsvc_class_id class, d_iov_t *id, } int -ds_rsvc_remove_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, bool stop) +ds_rsvc_remove_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks) { d_rank_list_t *stop_ranks; int rc; @@ -1096,16 +1133,16 @@ ds_rsvc_remove_replicas_s(struct ds_rsvc *svc, d_rank_list_t *ranks, bool stop) /* filter out failed ranks */ daos_rank_list_filter(ranks, stop_ranks, true /* exclude */); - if (stop_ranks->rl_nr > 0 && stop) - ds_rsvc_dist_stop(svc->s_class, &svc->s_id, stop_ranks, - NULL, true /* destroy */); + if (stop_ranks->rl_nr > 0) + ds_rsvc_dist_stop(svc->s_class, &svc->s_id, stop_ranks, NULL, svc->s_term, + true /* destroy */); d_rank_list_free(stop_ranks); return rc; } int ds_rsvc_remove_replicas(enum ds_rsvc_class_id class, d_iov_t *id, - d_rank_list_t *ranks, bool stop, struct rsvc_hint *hint) + d_rank_list_t *ranks, struct rsvc_hint *hint) { struct ds_rsvc *svc; int rc; @@ -1113,7 +1150,7 @@ ds_rsvc_remove_replicas(enum ds_rsvc_class_id class, d_iov_t *id, rc = ds_rsvc_lookup_leader(class, id, &svc, hint); if (rc != 0) return rc; - rc = ds_rsvc_remove_replicas_s(svc, ranks, stop); + rc = ds_rsvc_remove_replicas_s(svc, ranks); ds_rsvc_set_hint(svc, hint); ds_rsvc_put_leader(svc); return rc; @@ -1161,13 +1198,14 @@ bcast_create(crt_opcode_t opc, bool filter_invert, d_rank_list_t *filter_ranks, * \param[in] id replicated service ID * \param[in] dbid database UUID * \param[in] ranks list of replica ranks + * \param[in] caller_term caller term if not RDB_NIL_TERM (see rdb_open) * \param[in] create create replicas first * \param[in] bootstrap start with an initial list of replicas * \param[in] size size of each replica in bytes if \a create */ int ds_rsvc_dist_start(enum ds_rsvc_class_id class, d_iov_t *id, const uuid_t dbid, - const d_rank_list_t *ranks, bool create, bool bootstrap, + const d_rank_list_t *ranks, uint64_t caller_term, bool create, bool bootstrap, size_t size) { crt_rpc_t *rpc; @@ -1194,6 +1232,7 @@ ds_rsvc_dist_start(enum ds_rsvc_class_id class, d_iov_t *id, const uuid_t dbid, if (bootstrap) in->sai_flags |= RDB_AF_BOOTSTRAP; in->sai_size = size; + in->sai_term = caller_term; in->sai_ranks = (d_rank_list_t *)ranks; rc = dss_rpc_send(rpc); @@ -1206,7 +1245,7 @@ ds_rsvc_dist_start(enum ds_rsvc_class_id class, d_iov_t *id, const uuid_t dbid, D_ERROR(DF_UUID": failed to start%s %d replicas: "DF_RC"\n", DP_UUID(dbid), create ? "/create" : "", rc, DP_RC(out->sao_rc_errval)); - ds_rsvc_dist_stop(class, id, ranks, NULL, create); + ds_rsvc_dist_stop(class, id, ranks, NULL, caller_term, create); rc = out->sao_rc_errval; } @@ -1232,9 +1271,8 @@ ds_rsvc_start_handler(crt_rpc_t *rpc) goto out; } - rc = ds_rsvc_start(in->sai_class, &in->sai_svc_id, in->sai_db_uuid, - create, in->sai_size, - bootstrap ? in->sai_ranks : NULL, NULL /* arg */); + rc = ds_rsvc_start(in->sai_class, &in->sai_svc_id, in->sai_db_uuid, in->sai_term, create, + in->sai_size, bootstrap ? in->sai_ranks : NULL, NULL /* arg */); if (rc == -DER_ALREADY) rc = 0; @@ -1277,12 +1315,12 @@ ds_rsvc_start_aggregator(crt_rpc_t *source, crt_rpc_t *result, void *priv) * \param[in] id replicated service ID * \param[in] ranks list of \a ranks->rl_nr replica ranks * \param[in] excluded excluded rank list. + * \param[in] caller_term caller term if not RDB_NIL_TERM (see rdb_open) * \param[in] destroy destroy after close */ int -ds_rsvc_dist_stop(enum ds_rsvc_class_id class, d_iov_t *id, - const d_rank_list_t *ranks, d_rank_list_t *excluded, - bool destroy) +ds_rsvc_dist_stop(enum ds_rsvc_class_id class, d_iov_t *id, const d_rank_list_t *ranks, + d_rank_list_t *excluded, uint64_t caller_term, bool destroy) { crt_rpc_t *rpc; struct rsvc_stop_in *in; @@ -1304,6 +1342,7 @@ ds_rsvc_dist_stop(enum ds_rsvc_class_id class, d_iov_t *id, goto out_rpc; if (destroy) in->soi_flags |= RDB_OF_DESTROY; + in->soi_term = caller_term; rc = dss_rpc_send(rpc); if (rc != 0) @@ -1332,10 +1371,11 @@ ds_rsvc_stop_handler(crt_rpc_t *rpc) struct rsvc_stop_out *out = crt_reply_get(rpc); int rc = 0; - rc = ds_rsvc_stop(in->soi_class, &in->soi_svc_id, + rc = ds_rsvc_stop(in->soi_class, &in->soi_svc_id, in->soi_term, in->soi_flags & RDB_OF_DESTROY); if (rc == -DER_ALREADY) rc = 0; + out->soo_rc = (rc == 0 ? 0 : 1); crt_reply_send(rpc); } diff --git a/src/tests/ftest/pool/svc.yaml b/src/tests/ftest/pool/svc.yaml index cc3cb11ca68..f43c08577a3 100644 --- a/src/tests/ftest/pool/svc.yaml +++ b/src/tests/ftest/pool/svc.yaml @@ -15,9 +15,9 @@ pool: pool_query_timeout: 30 svc_params_mux: !mux svc_params_none: - svc_params: [None, 4] + svc_params: [None, 5] svc_params_0: - svc_params: [0, 4] + svc_params: [0, 5] svc_params_1: svc_params: [1, 1] svc_params_2: @@ -27,6 +27,6 @@ svc_params_mux: !mux svc_params_4: svc_params: [4, 3] svc_params_5: - svc_params: [5, 4] + svc_params: [5, 5] svc_params_6: svc_params: [6, 0] diff --git a/src/tests/suite/daos_obj.c b/src/tests/suite/daos_obj.c index 4d3bda7746a..10201cc9958 100644 --- a/src/tests/suite/daos_obj.c +++ b/src/tests/suite/daos_obj.c @@ -4023,10 +4023,9 @@ io_capa_iv_fetch(void **state) skip(); test_get_leader(arg, &leader); - D_ASSERT(leader > 0); oid = daos_test_oid_gen(arg->coh, DAOS_OC_R1S_SPEC_RANK, 0, 0, arg->myrank); - oid = dts_oid_set_rank(oid, leader - 1); + oid = dts_oid_set_rank(oid, leader == 0 ? leader + 1 : leader - 1); if (arg->myrank == 0) daos_debug_set_params(arg->group, -1, DMG_KEY_FAIL_LOC,