diff --git a/src/container/srv_target.c b/src/container/srv_target.c index 653cb5f7d37..71d13f637bd 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -321,7 +321,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DAOS_FAIL_CHECK(DAOS_FORCE_EC_AGG_PEER_FAIL))) interval = 0; else - interval = d_sec2hlc(DAOS_AGG_THRESHOLD); + interval = cont->sc_agg_eph_gap; D_ASSERT(hlc > (interval * 2)); /* @@ -409,6 +409,9 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), tgt_id, epoch_range.epr_lo, epoch_range.epr_hi); + if (!param->ap_vos_agg) + vos_cont_set_mod_bound(cont->sc_hdl, epoch_range.epr_hi); + flags |= VOS_AGG_FL_FORCE_MERGE; rc = agg_cb(cont, &epoch_range, flags, param); if (rc) @@ -425,6 +428,9 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), tgt_id, epoch_range.epr_lo, epoch_range.epr_hi); + if (!param->ap_vos_agg) + vos_cont_set_mod_bound(cont->sc_hdl, epoch_range.epr_hi); + if (dss_xstream_is_busy()) flags &= ~VOS_AGG_FL_FORCE_MERGE; rc = agg_cb(cont, &epoch_range, flags, param); diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 1ee74ae11a4..96e8119d4ce 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -922,6 +922,7 @@ dtx_handle_init(struct dtx_id *dti, daos_handle_t xoh, struct dtx_epoch *epoch, dth->dth_for_migration = (flags & DTX_FOR_MIGRATION) ? 1 : 0; dth->dth_ignore_uncommitted = (flags & DTX_IGNORE_UNCOMMITTED) ? 1 : 0; dth->dth_prepared = (flags & DTX_PREPARED) ? 1 : 0; + dth->dth_epoch_owner = (flags & DTX_EPOCH_OWNER) ? 1 : 0; dth->dth_aborted = 0; dth->dth_already = 0; dth->dth_need_validation = 0; @@ -1853,6 +1854,8 @@ dtx_cont_register(struct ds_cont_child *cont) D_GOTO(out, rc = -DER_NOMEM); } + cont->sc_agg_eph_gap = d_sec2hlc(vos_get_agg_gap()); + ds_cont_child_get(cont); dbca->dbca_refs = 0; dbca->dbca_cont = cont; diff --git a/src/dtx/tests/dts_structs.c b/src/dtx/tests/dts_structs.c index dc4347fed7c..a763546f824 100644 --- a/src/dtx/tests/dts_structs.c +++ b/src/dtx/tests/dts_structs.c @@ -70,8 +70,9 @@ struct_dtx_handle(void **state) SET_BITFIELD_1(dummy, dth_need_validation); SET_BITFIELD_1(dummy, dth_ignore_uncommitted); SET_BITFIELD_1(dummy, dth_local); + SET_BITFIELD_1(dummy, dth_epoch_owner); SET_BITFIELD_1(dummy, dth_local_complete); - SET_BITFIELD(dummy, padding1, 13); + SET_BITFIELD(dummy, padding1, 12); SET_FIELD(dummy, dth_dti_cos_count); SET_FIELD(dummy, dth_dti_cos); diff --git a/src/engine/sched.c b/src/engine/sched.c index 49a46ca3618..807f150839e 100644 --- a/src/engine/sched.c +++ b/src/engine/sched.c @@ -197,17 +197,6 @@ enum { static int sched_policy; -/* - * Time threshold for giving IO up throttling. If space pressure stays in the - * highest level for enough long time, we assume that no more space can be - * reclaimed and choose to give up IO throttling, so that ENOSPACE error could - * be returned to client earlier. - * - * To make time for aggregation reclaiming overwriteen space, this threshold - * should be longer than the DAOS_AGG_THRESHOLD. - */ -#define SCHED_DELAY_THRESH 40000 /* msecs */ - struct pressure_ratio { unsigned int pr_free; /* free space ratio */ unsigned int pr_gc_ratio; /* CPU percentage for GC & Aggregation */ @@ -943,12 +932,21 @@ is_gc_pending(struct sched_pool_info *spi) return spi->spi_gc_ults && (spi->spi_gc_ults > spi->spi_gc_sleeping); } -/* Just run into this space pressure situation recently? */ +/* + * Just run into this space pressure situation recently? + * + * If space pressure stays in the highest level for enough long time, we assume + * that no more space can be reclaimed and choose to give up IO throttling, so + * that ENOSPACE error could be returned to client earlier. + * + * To make time for aggregation reclaiming overwriteen space, this threshold + * should be longer than VOS aggregation epoch gap with current HLC. + */ static inline bool is_pressure_recent(struct sched_info *info, struct sched_pool_info *spi) { D_ASSERT(info->si_cur_ts >= spi->spi_pressure_ts); - return (info->si_cur_ts - spi->spi_pressure_ts) < SCHED_DELAY_THRESH; + return (info->si_cur_ts - spi->spi_pressure_ts) < info->si_agg_gap; } static inline uint64_t @@ -2256,6 +2254,8 @@ sched_run(ABT_sched sched) return; } + dx->dx_sched_info.si_agg_gap = (vos_get_agg_gap() + 10) * 1000; /* msecs */ + while (1) { /* Try to pick network poll ULT */ pool = pools[DSS_POOL_NET_POLL]; diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 222f07e4906..d1f240270fb 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -61,6 +61,7 @@ struct sched_info { /* Number of kicked requests for each type in current cycle */ uint32_t si_kicked_req_cnt[SCHED_REQ_MAX]; unsigned int si_stop:1; + uint64_t si_agg_gap; }; struct mem_stats { diff --git a/src/include/daos/dtx.h b/src/include/daos/dtx.h index ca719077a14..f3aa2546850 100644 --- a/src/include/daos/dtx.h +++ b/src/include/daos/dtx.h @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -27,17 +27,6 @@ /* The time (in second) threshold for batched DTX commit. */ #define DTX_COMMIT_THRESHOLD_AGE 10 -/* - * VOS aggregation should try to avoid aggregating in the epoch range where - * lots of data records are pending to commit, so the aggregation epoch upper - * bound is: current HLC - (DTX batched commit threshold + buffer period) - * - * To avoid conflicting of aggregation vs. transactions, any transactional - * update/fetch with epoch lower than the aggregation upper bound should be - * rejected and restarted. - */ -#define DAOS_AGG_THRESHOLD (DTX_COMMIT_THRESHOLD_AGE + 10) /* seconds */ - enum dtx_target_flags { /* The target only contains read-only operations for the DTX. */ DTF_RDONLY = (1 << 0), diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index 9fc615c2a8b..14953a1972a 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -129,6 +129,12 @@ struct ds_cont_child { */ uint64_t sc_ec_update_timestamp; + /* + * The gap between the max allowed aggregation epoch and current HLC. The modification + * with older epoch out of range may cause conflict with aggregation as to be rejected. + */ + uint64_t sc_agg_eph_gap; + /* The objects with committable DTXs in DRAM. */ daos_handle_t sc_dtx_cos_hdl; /* The DTX COS-btree. */ diff --git a/src/include/daos_srv/dtx_srv.h b/src/include/daos_srv/dtx_srv.h index 7c60d2deaa0..f648b42d7be 100644 --- a/src/include/daos_srv/dtx_srv.h +++ b/src/include/daos_srv/dtx_srv.h @@ -113,8 +113,10 @@ struct dtx_handle { dth_ignore_uncommitted : 1, /* Local transaction */ dth_local : 1, + /* Locally generate the epoch. */ + dth_epoch_owner : 1, /* Flag to commit the local transaction */ - dth_local_complete : 1, padding1 : 13; + dth_local_complete : 1, padding1 : 12; /* The count the DTXs in the dth_dti_cos array. */ uint32_t dth_dti_cos_count; @@ -287,6 +289,8 @@ enum dtx_flags { DTX_RELAY = (1 << 10), /** Local transaction */ DTX_LOCAL = (1 << 11), + /** Locally generate the epoch. */ + DTX_EPOCH_OWNER = (1 << 12), }; void diff --git a/src/include/daos_srv/vos.h b/src/include/daos_srv/vos.h index b6287c2986e..e34ade044dd 100644 --- a/src/include/daos_srv/vos.h +++ b/src/include/daos_srv/vos.h @@ -938,6 +938,56 @@ vos_update_renew_epoch(daos_handle_t ioh, struct dtx_handle *dth); void vos_dtx_renew_epoch(struct dtx_handle *dth); +/** + * Calculate current locally known stable epoch for the given container. + * + * \param coh [IN] Container open handle + * + * \return The epoch on success, negative value if error. + */ +daos_epoch_t +vos_cont_get_local_stable_epoch(daos_handle_t coh); + +/** + * Get global stable epoch for the given container. + * + * \param coh [IN] Container open handle + * + * \return The epoch on success, negative value if error. + */ +daos_epoch_t +vos_cont_get_global_stable_epoch(daos_handle_t coh); + +/** + * Set global stable epoch for the given container. + * + * \param coh [IN] Container open handle + * \param epoch [IN] The epoch to be used as the new global stable epoch. + * + * \return Zero on success, negative value if error. + */ +int +vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch); + +/** + * Set the lowest allowed modification epoch for the given container. + * + * \param coh [IN] Container open handle + * \param epoch [IN] The lowest allowed epoch for modification. + * + * \return Zero on success, negative value if error. + */ +int +vos_cont_set_mod_bound(daos_handle_t coh, uint64_t epoch); + +/** + * Query the gap between the max allowed aggregation epoch and current HLC. + * + * \return The gap value in seconds. + */ +uint32_t +vos_get_agg_gap(void); + /** * Get the recx/epoch list. * diff --git a/src/include/daos_srv/vos_types.h b/src/include/daos_srv/vos_types.h index 0a52851c390..fa173cf1f63 100644 --- a/src/include/daos_srv/vos_types.h +++ b/src/include/daos_srv/vos_types.h @@ -58,6 +58,8 @@ enum dtx_entry_flags { * on all yet, need to be re-committed. */ DTE_PARTIAL_COMMITTED = (1 << 5), + /* The DTX epoch is sorted locally. */ + DTE_EPOCH_SORTED = (1 << 6), }; struct dtx_entry { diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index a79cec03f6f..2654df4d5f9 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2896,8 +2896,10 @@ ds_obj_rw_handler(crt_rpc_t *rpc) rc = process_epoch(&orw->orw_epoch, &orw->orw_epoch_first, &orw->orw_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { orw->orw_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } if (obj_rpc_is_fetch(rpc)) { struct dtx_handle *dth; @@ -3856,8 +3858,10 @@ ds_obj_punch_handler(crt_rpc_t *rpc) rc = process_epoch(&opi->opi_epoch, NULL /* epoch_first */, &opi->opi_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { opi->opi_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } version = opi->opi_map_ver; max_ver = opi->opi_map_ver; @@ -5110,6 +5114,7 @@ ds_obj_dtx_leader(struct daos_cpd_args *dca) &dcsh->dcsh_epoch.oe_first, &dcsh->dcsh_epoch.oe_rpc_flags); if (rc == PE_OK_LOCAL) { + dtx_flags |= DTX_EPOCH_OWNER; /* * In this case, writes to local RDGs can use the chosen epoch * without any uncertainty. This optimization is left to future @@ -5701,8 +5706,10 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) if (ocpi->ocpi_flags & ORF_LEADER) { rc = process_epoch(&ocpi->ocpi_epoch, NULL /* epoch_first */, &ocpi->ocpi_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { ocpi->ocpi_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } } else if (dct_nr == 1) { rc = obj_coll_local(rpc, dcts[0].dct_shards, dce, &version, &ioc, NULL, odm->odm_mbs, obj_coll_tgt_punch); diff --git a/src/tests/ftest/util/server_utils_params.py b/src/tests/ftest/util/server_utils_params.py index 46db4891220..b02e640ca25 100644 --- a/src/tests/ftest/util/server_utils_params.py +++ b/src/tests/ftest/util/server_utils_params.py @@ -436,6 +436,7 @@ class EngineYamlParameters(YamlParameters): "D_LOG_FILE_APPEND_PID=1", "DAOS_POOL_RF=4", "CRT_EVENT_DELAY=1", + "DAOS_VOS_AGG_GAP=25", "COVFILE=/tmp/test.cov"], "ofi+tcp": [], "ofi+tcp;ofi_rxm": [], diff --git a/src/utils/ddb/tests/ddb_test_driver.c b/src/utils/ddb/tests/ddb_test_driver.c index 15f6f9289c3..f5dbda34078 100644 --- a/src/utils/ddb/tests/ddb_test_driver.c +++ b/src/utils/ddb/tests/ddb_test_driver.c @@ -410,6 +410,7 @@ dvt_dtx_begin_helper(daos_handle_t coh, const daos_unit_oid_t *oid, daos_epoch_t struct dtx_handle *dth; struct dtx_memberships *mbs; size_t size; + int rc; D_ALLOC_PTR(dth); assert_non_null(dth); @@ -449,7 +450,8 @@ dvt_dtx_begin_helper(daos_handle_t coh, const daos_unit_oid_t *oid, daos_epoch_t dth->dth_shares_inited = 1; vos_dtx_rsrvd_init(dth); - vos_dtx_attach(dth, false, false); + rc = vos_dtx_attach(dth, false, false); + assert_rc_equal(rc, 0); *dthp = dth; } @@ -478,7 +480,7 @@ dvt_vos_insert_dtx_records(daos_handle_t coh, uint32_t nr, uint32_t committed_nr daos_recx_t recxs[recxs_nr]; daos_iod_t iod = {0}; d_sg_list_t sgl = {0}; - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); uint64_t dkey_hash = 0x123; int i; diff --git a/src/vos/tests/vts_dtx.c b/src/vos/tests/vts_dtx.c index bd54dd52838..402e265a938 100644 --- a/src/vos/tests/vts_dtx.c +++ b/src/vos/tests/vts_dtx.c @@ -43,6 +43,7 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch, uint64_t dkey_hash, struct dtx_handle **dthp) { struct dtx_handle *dth; + int rc; D_ALLOC_PTR(dth); assert_non_null(dth); @@ -66,6 +67,7 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch, dth->dth_for_migration = 0; dth->dth_ignore_uncommitted = 0; dth->dth_prepared = 0; + dth->dth_epoch_owner = 0; dth->dth_aborted = 0; dth->dth_already = 0; dth->dth_need_validation = 0; @@ -91,7 +93,8 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch, dth->dth_shares_inited = 1; vos_dtx_rsrvd_init(dth); - vos_dtx_attach(dth, false, false); + rc = vos_dtx_attach(dth, false, false); + assert_rc_equal(rc, 0); *dthp = dth; } diff --git a/src/vos/tests/vts_io.c b/src/vos/tests/vts_io.c index ff02abaf1e2..00d2a4172d0 100644 --- a/src/vos/tests/vts_io.c +++ b/src/vos/tests/vts_io.c @@ -175,7 +175,7 @@ test_args_init(struct io_test_args *args, memset(args, 0, sizeof(*args)); memset(&vts_cntr, 0, sizeof(vts_cntr)); - vts_epoch_gen = 1; + vts_epoch_gen = d_hlc_get(); rc = vts_ctx_init(&args->ctx, pool_size); if (rc != 0) @@ -1590,7 +1590,7 @@ vos_iterate_test(void **state) struct all_info info = {0}; vos_iter_param_t param = {0}; struct vos_iter_anchors anchors = {0}; - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); int rc = 0; unsigned long old_flags = arg->ta_flags; @@ -2038,7 +2038,7 @@ io_simple_one_key_cross_container(void **state) d_sg_list_t sgl; daos_key_t dkey; daos_key_t akey; - daos_epoch_t epoch = gen_rand_epoch(); + daos_epoch_t epoch; daos_unit_oid_t l_oid; /* Creating an additional container */ @@ -2087,6 +2087,7 @@ io_simple_one_key_cross_container(void **state) iod.iod_type = DAOS_IOD_ARRAY; l_oid = gen_oid(arg->otype); + epoch = gen_rand_epoch(); rc = vos_obj_update(arg->ctx.tc_co_hdl, arg->oid, epoch, 0, 0, &dkey, 1, &iod, NULL, &sgl); if (rc) { @@ -2526,7 +2527,7 @@ oid_iter_test_with_anchor(void **state) #define KEY_INC 127 #define MAX_INT_KEY (NUM_KEYS * KEY_INC) -static void gen_query_tree(struct io_test_args *arg, daos_unit_oid_t oid) +static void gen_query_tree(struct io_test_args *arg, daos_unit_oid_t oid, daos_epoch_t epoch) { daos_iod_t iod = {0}; d_sg_list_t sgl = {0}; @@ -2534,7 +2535,6 @@ static void gen_query_tree(struct io_test_args *arg, daos_unit_oid_t oid) daos_key_t akey; d_iov_t val_iov; daos_recx_t recx; - daos_epoch_t epoch = 1; uint64_t dkey_value; uint64_t akey_value; int i, j; @@ -2608,7 +2608,7 @@ io_query_key(void **state) int i, j; struct dtx_handle *dth; struct dtx_id xid; - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); daos_key_t dkey; daos_key_t akey; daos_key_t dkey_read; @@ -2623,7 +2623,7 @@ io_query_key(void **state) oid = gen_oid(arg->otype); - gen_query_tree(arg, oid); + gen_query_tree(arg, oid, epoch); for (i = 1; i <= NUM_KEYS; i++) { for (j = 1; j <= NUM_KEYS; j++) { @@ -2873,7 +2873,7 @@ io_query_key_punch_update(void **state) { struct io_test_args *arg = *state; int rc = 0; - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); daos_key_t dkey = { 0 }; daos_key_t akey; daos_recx_t recx_read; @@ -2949,7 +2949,7 @@ io_query_key_negative(void **state) &recx_read, NULL, 0, 0, NULL); assert_rc_equal(rc, -DER_NONEXIST); - gen_query_tree(arg, oid); + gen_query_tree(arg, oid, d_hlc_get()); rc = vos_obj_query_key(arg->ctx.tc_co_hdl, arg->oid, DAOS_GET_DKEY | DAOS_GET_MAX, 4, @@ -3006,7 +3006,7 @@ gang_sv_test(void **state) char dkey_buf[UPDATE_DKEY_SIZE], akey_buf[UPDATE_AKEY_SIZE]; char *update_buf, *fetch_buf; daos_size_t rsize = (27UL << 20); /* 27MB */ - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); int rc; D_ALLOC(update_buf, rsize); diff --git a/src/vos/tests/vts_mvcc.c b/src/vos/tests/vts_mvcc.c index 6c625d2051a..d951b8b3bc9 100644 --- a/src/vos/tests/vts_mvcc.c +++ b/src/vos/tests/vts_mvcc.c @@ -1,5 +1,5 @@ /* - * (C) Copyright 2020-2023 Intel Corporation. + * (C) Copyright 2020-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -1720,7 +1720,7 @@ setup_mvcc(void **state) D_ASSERT(arg->custom == NULL); D_ALLOC_PTR(mvcc_arg); D_ASSERT(mvcc_arg != NULL); - mvcc_arg->epoch = 500; + mvcc_arg->epoch = d_hlc_get() + 500; d_getenv_bool("CMOCKA_TEST_ABORT", &mvcc_arg->fail_fast); arg->custom = mvcc_arg; return 0; diff --git a/src/vos/tests/vts_pm.c b/src/vos/tests/vts_pm.c index 99e53fc71b6..471806963bb 100644 --- a/src/vos/tests/vts_pm.c +++ b/src/vos/tests/vts_pm.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2022 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -25,7 +25,6 @@ #endif -static int start_epoch = 5; #define BUF_SIZE 2000 static int buf_size = BUF_SIZE; struct pm_info { @@ -1271,11 +1270,11 @@ cond_test(void **state) daos_unit_oid_t oid; d_sg_list_t sgl[MAX_SGL] = {0}; d_iov_t iov[MAX_SGL]; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; int i; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get() + 1000; oid = gen_oid(0); for (i = 0; i < MAX_SGL; i++) { @@ -1369,8 +1368,6 @@ cond_test(void **state) 0, -DER_NO_PERM, sgl, 5, "new", "foo", "f", "bar", "d", "val", "e", "flag", "new", "temp"); - - start_epoch = epoch + 1; } /** Making the oid generation deterministic, I get to 18201 before I hit a false @@ -1386,12 +1383,11 @@ multiple_oid_cond_test(void **state) daos_unit_oid_t oid; d_sg_list_t sgl = {0}; d_iov_t iov = {0}; - daos_epoch_t epoch = start_epoch + NUM_OIDS * 3; + daos_epoch_t epoch; int i; - start_epoch = epoch + 1; - test_args_reset(arg, VPOOL_SIZE); + epoch = d_hlc_get() + NUM_OIDS * 3; sgl.sg_iovs = &iov; sgl.sg_nr = 1; sgl.sg_nr_out = 1; @@ -1495,13 +1491,13 @@ remove_test(void **state) d_sg_list_t sgl; daos_recx_t recx[SM_BUF_LEN]; daos_unit_oid_t oid; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; int rc = 0; char key1 = 'a'; char key2 = 'b'; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); oid = gen_oid(0); d_iov_set(&dkey, &key1, sizeof(key1)); @@ -1594,8 +1590,6 @@ remove_test(void **state) FETCH_DATA, 1, &REM_VAL1[0], FETCH_HOLE, sizeof(REM_VAL1) + sizeof(REM_VAL2) + sizeof(REM_VAL3) - 5, FETCH_DATA, 1, &REM_VAL3[sizeof(REM_VAL3) - 2], FETCH_END); - - start_epoch = epoch + 1; } static void @@ -1670,7 +1664,7 @@ minor_epoch_punch_sv(void **state) daos_recx_t rex; daos_iod_t iod; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; struct dtx_handle *dth; struct dtx_id xid; const char *expected = "xxxxx"; @@ -1681,7 +1675,7 @@ minor_epoch_punch_sv(void **state) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(&rex, 0, sizeof(rex)); memset(&iod, 0, sizeof(iod)); @@ -1741,7 +1735,6 @@ minor_epoch_punch_sv(void **state) assert_memory_equal(buf, expected, strlen(expected)); d_sgl_fini(&sgl, false); - start_epoch = epoch + 1; } static void @@ -1754,7 +1747,7 @@ minor_epoch_punch_array(void **state) daos_recx_t rex; daos_iod_t iod; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; struct dtx_handle *dth; struct dtx_id xid; const char *expected = "xxxxxLonelyWorld"; @@ -1766,7 +1759,7 @@ minor_epoch_punch_array(void **state) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(&rex, 0, sizeof(rex)); memset(&iod, 0, sizeof(iod)); @@ -1837,7 +1830,6 @@ minor_epoch_punch_array(void **state) assert_memory_equal(buf, expected, strlen(expected)); d_sgl_fini(&sgl, false); - start_epoch = epoch + 1; } static void @@ -1850,7 +1842,7 @@ minor_epoch_punch_rebuild(void **state) daos_recx_t rex; daos_iod_t iod; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; const char *expected = "xxxxxlonelyworld"; const char *first = "hello"; const char *second = "lonelyworld"; @@ -1860,7 +1852,7 @@ minor_epoch_punch_rebuild(void **state) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(&rex, 0, sizeof(rex)); memset(&iod, 0, sizeof(iod)); @@ -1930,8 +1922,6 @@ minor_epoch_punch_rebuild(void **state) epoch += 2; d_sgl_fini(&sgl, false); - - start_epoch = epoch + 1; } #define NUM_RANKS 100 @@ -1948,7 +1938,7 @@ many_keys(void **state) daos_recx_t rex; daos_iod_t iod; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch = d_hlc_get(); const char *w = "x"; char *dkey_buf = DKEY_NAME; char akey_buf[UPDATE_DKEY_SIZE]; @@ -1995,8 +1985,6 @@ many_keys(void **state) } d_sgl_fini(&sgl, false); - - start_epoch = epoch + 1; } #define CELL_SZ 2 @@ -2102,7 +2090,7 @@ ec_size(void **state) struct io_test_args *arg = *state; int rc = 0; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; const char w[] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; daos_unit_oid_t oid; uint64_t size; @@ -2110,7 +2098,7 @@ ec_size(void **state) int i; test_args_reset(arg, VPOOL_1G); - + epoch = d_hlc_get(); rc = d_sgl_init(&sgl, 1); assert_rc_equal(rc, 0); @@ -2201,8 +2189,6 @@ ec_size(void **state) assert_int_equal(size, 201 * STRIPE_SZ); d_sgl_fini(&sgl, false); - - start_epoch = epoch + 1; } static void @@ -2219,7 +2205,7 @@ test_inprogress_parent_punch(void **state) d_sg_list_t sgl; struct dtx_handle *dth1; struct dtx_handle *dth2; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; struct dtx_id xid1; struct dtx_id xid2; const char *expected = "xxxxx"; @@ -2232,7 +2218,7 @@ test_inprogress_parent_punch(void **state) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(&rex, 0, sizeof(rex)); memset(&iod, 0, sizeof(iod)); @@ -2336,8 +2322,6 @@ test_inprogress_parent_punch(void **state) assert_memory_equal(buf, expected, strlen(expected)); d_sgl_fini(&sgl, false); - - start_epoch = epoch + 1; } #define NR_OBJ 10 @@ -2458,9 +2442,9 @@ many_tx(void **state) d_sg_list_t sgl; d_sg_list_t fetch_sgl; char buf[32]; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; daos_handle_t coh; - daos_epoch_range_t epr = {epoch, epoch}; + daos_epoch_range_t epr; struct vos_ioreq req[NR_TX] = {0}; const char *first = "Hello"; char dkey_buf[NR_DKEY][UPDATE_DKEY_SIZE]; @@ -2482,9 +2466,12 @@ many_tx(void **state) test_args_reset(arg, VPOOL_SIZE); coh = arg->ctx.tc_co_hdl; - memset(&iod, 0, sizeof(iod)); + epoch = d_hlc_get() + 1000; + epr.epr_lo = epoch; + epr.epr_hi = epoch; + rc = d_sgl_init(&sgl, 1); assert_rc_equal(rc, 0); rc = d_sgl_init(&fetch_sgl, 1); @@ -2624,7 +2611,6 @@ many_tx(void **state) d_sgl_fini(&sgl, false); d_sgl_fini(&fetch_sgl, false); - start_epoch = epoch + 1; } static struct dtx_id @@ -2686,7 +2672,7 @@ uncommitted_parent(void **state) daos_iod_t iod; d_sg_list_t sgl; char buf[32]; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; daos_handle_t coh; char *first = "Hello"; char dkey_buf[UPDATE_DKEY_SIZE]; @@ -2696,7 +2682,7 @@ uncommitted_parent(void **state) test_args_reset(arg, VPOOL_SIZE); coh = arg->ctx.tc_co_hdl; - + epoch = d_hlc_get(); memset(&iod, 0, sizeof(iod)); rc = d_sgl_init(&sgl, 1); @@ -2736,7 +2722,6 @@ uncommitted_parent(void **state) assert_memory_equal(buf, first, 5); d_sgl_fini(&sgl, false); - start_epoch = epoch + 1; } static void @@ -2750,7 +2735,7 @@ test_uncommitted_key(void **state) daos_iod_t iod; d_sg_list_t sgl; char buf[32]; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; daos_handle_t coh; char *first = "Hello"; char dkey_buf[UPDATE_DKEY_SIZE]; @@ -2760,7 +2745,7 @@ test_uncommitted_key(void **state) test_args_reset(arg, VPOOL_SIZE); coh = arg->ctx.tc_co_hdl; - + epoch = d_hlc_get(); memset(&iod, 0, sizeof(iod)); rc = d_sgl_init(&sgl, 1); @@ -2798,7 +2783,6 @@ test_uncommitted_key(void **state) assert_memory_equal(buf, "Hello", 5); d_sgl_fini(&sgl, false); - start_epoch = epoch + 1; } static void @@ -2812,7 +2796,7 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) daos_recx_t rex[2] = {0}; daos_iod_t iod[2] = {0}; d_sg_list_t sgl[2] = {0}; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; struct dtx_handle *dth = NULL; struct dtx_id xid; const char *expected = "xxxxx"; @@ -2826,7 +2810,7 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(rex, 0, sizeof(rex)); memset(iod, 0, sizeof(iod)); @@ -3011,7 +2995,6 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) if (with_dtx) vts_dtx_end(dth); - start_epoch = epoch + 1; d_sgl_fini(&sgl[0], false); d_sgl_fini(&sgl[1], false); } diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index e19768d4c03..c57802a4f16 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -620,6 +620,42 @@ struct dss_module_key vos_module_key = { daos_epoch_t vos_start_epoch = DAOS_EPOCH_MAX; +/* + * For standalone transaction, when it is started on the DTX leader, its epoch + * is generated by the leader, then the modification RPC will be forwarded to + * other related non-leader(s). If the forwarded RPC is delayed for some reason, + * such as network congestion or system busy on the non-leader, as to the epoch + * for such transaction becomes very old (exceed related threshold), as to VOS + * aggregation may has already aggregated related epoch rang. Under such case, + * the non-leader will reject such modification to avoid data lost/corruption. + * + * For distributed transaction, if there is no read (fetch, query, enumerate, + * and so on) before client commit_tx, then related DTX leader will generate + * epoch for the transaction after client commit_tx. Then it will be the same + * as above standalone transaction for epoch handling. + * + * If the distributed transaction involves some read before client commit_tx, + * its epoch will be generated by the first accessed engine for read. If the + * transaction takes too long time after that, then when client commit_tx, its + * epoch may become very old as to related DTX leader will have to reject the + * transaction to avoid above mentioned conflict. And even if the DTX leader + * did not reject the transaction, some non-leader may also reject it because + * of the very old epoch. So it means that under such framework, the life for + * a distributed transaction cannot be too long. That can be adjusted via the + * server side environment variable DAOS_VOS_AGG_GAP. + * + * NOTE: EC/VOS aggregation should avoid aggregating in the epoch range where + * lots of data records are pending to commit, so the aggregation epoch + * upper bound is 'current HLC - vos_agg_gap'. + */ +uint32_t vos_agg_gap; + +uint32_t +vos_get_agg_gap(void) +{ + return vos_agg_gap; +} + static int vos_mod_init(void) { @@ -679,6 +715,15 @@ vos_mod_init(void) d_getenv_bool("DAOS_DKEY_PUNCH_PROPAGATE", &vos_dkey_punch_propagate); D_INFO("DKEY punch propagation is %s\n", vos_dkey_punch_propagate ? "enabled" : "disabled"); + vos_agg_gap = VOS_AGG_GAP_DEF; + d_getenv_uint("DAOS_VOS_AGG_GAP", &vos_agg_gap); + if (vos_agg_gap < VOS_AGG_GAP_MIN || vos_agg_gap > VOS_AGG_GAP_MAX) { + D_WARN("Invalid DAOS_VOS_AGG_GAP value, " + "valid range [%u, %u], set it as default %u (second)\n", + VOS_AGG_GAP_MIN, VOS_AGG_GAP_MAX, VOS_AGG_GAP_DEF); + vos_agg_gap = VOS_AGG_GAP_DEF; + } + D_INFO("Set DAOS VOS aggregation gap as %u (second)\n", vos_agg_gap); return rc; } diff --git a/src/vos/vos_container.c b/src/vos/vos_container.c index a5a55a902b9..5d0b656b7ff 100644 --- a/src/vos/vos_container.c +++ b/src/vos/vos_container.c @@ -198,6 +198,9 @@ cont_free_internal(struct vos_container *cont) lrua_array_free(cont->vc_dtx_array); D_ASSERT(d_list_empty(&cont->vc_dtx_act_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_sorted_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_unsorted_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_reindex_list)); dbtree_close(cont->vc_btr_hdl); @@ -395,6 +398,9 @@ vos_cont_open(daos_handle_t poh, uuid_t co_uuid, daos_handle_t *coh) cont->vc_cmt_dtx_indexed = 0; cont->vc_cmt_dtx_reindex_pos = cont->vc_cont_df->cd_dtx_committed_head; D_INIT_LIST_HEAD(&cont->vc_dtx_act_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_sorted_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_unsorted_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_reindex_list); cont->vc_dtx_committed_count = 0; cont->vc_solo_dtx_epoch = d_hlc_get(); rc = gc_open_cont(cont); @@ -461,6 +467,21 @@ vos_cont_open(daos_handle_t poh, uuid_t co_uuid, daos_handle_t *coh) } } + /* + * Assign vc_mod_epoch_bound with current HLC, then all former reported local stable + * epoch (without persistently stored) before re-opening the container will be older + * than vc_mod_epoch_bound. It is possible that some modification was started before + * current container reopen (such as for engine restart without related pool service + * down), but related RPC was not forwarded to current engine in time. After current + * engine re-opening the container (shard), it will reject such old modification and + * ask related DTX leader to restart the transaction. It only may affect inflight IO + * during re-opening container without restarting pool service. + * + * With the assignment, we also do not need to consider former EC/VOS aggregation up + * boundary when reopen the container. + */ + cont->vc_mod_epoch_bound = d_hlc_get(); + rc = vos_dtx_act_reindex(cont); if (rc != 0) { D_ERROR("Fail to reindex active DTX entries: %d\n", rc); @@ -815,3 +836,186 @@ struct vos_iter_ops vos_cont_iter_ops = { .iop_fetch = cont_iter_fetch, .iop_process = cont_iter_process, }; + +/* + * The local stable epoch can be used to calculate global stable epoch: all the container + * shards report each own local stable epoch to some leader who will find out the smallest + * one as the global stable epoch and dispatch it to all related container shards. + */ +daos_epoch_t +vos_cont_get_local_stable_epoch(daos_handle_t coh) +{ + struct vos_container *cont; + struct vos_dtx_act_ent *dae; + uint64_t gap = d_sec2hlc(vos_agg_gap); + daos_epoch_t epoch = d_hlc_get() - gap; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + /* + * If the oldest (that is at the head of the sorted list) sorted DTX's + * epoch is out of the boundary, then use it as the local stable epoch. + */ + if (!d_list_empty(&cont->vc_dtx_sorted_list)) { + dae = d_list_entry(cont->vc_dtx_sorted_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch >= DAE_EPOCH(dae)) + epoch = DAE_EPOCH(dae) - 1; + } + + /* + * It is not easy to know which DTX is the oldest one in the unsorted list. + * The one after the header in the list maybe older than the header. But the + * epoch difference will NOT exceed 'vos_agg_gap' since any DTX with older + * epoch will be rejected (and restart with newer epoch). + * + * So "DAE_EPOCH(header) - vos_agg_gap" can be used to estimate the local + * stable epoch for unsorted DTX entries. + */ + if (!d_list_empty(&cont->vc_dtx_unsorted_list)) { + dae = d_list_entry(cont->vc_dtx_unsorted_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch > DAE_EPOCH(dae) - gap) + epoch = DAE_EPOCH(dae) - gap; + } + + /* + * The historical vos_agg_gap for the DTX entries in the reindex list is unknown. + * We use cont->vc_dtx_reindex_eph_diff to estimate the local stable epoch. That + * may be over-estimated. Usually, the count of re-indexed DTX entries is quite + * limited, and will be purged soon after the container opened (via DTX resync). + * So it will not much affect the local stable epoch calculation. + */ + if (unlikely(!d_list_empty(&cont->vc_dtx_reindex_list))) { + dae = d_list_entry(cont->vc_dtx_reindex_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch > DAE_EPOCH(dae) - cont->vc_dtx_reindex_eph_diff) + epoch = DAE_EPOCH(dae) - cont->vc_dtx_reindex_eph_diff; + } + + /* + * vc_mod_epoch_bound guarantee that no modification with older epoch after last + * reporting local stable epoch can be accepted. So if the new calculated result + * is older, then reuse the former one. + */ + if (unlikely(epoch < cont->vc_local_stable_epoch)) + epoch = cont->vc_local_stable_epoch; + else + cont->vc_local_stable_epoch = epoch; + + /* + * Update vc_mod_epoch_bound to guarantee that on update with older epoch can be + * acceptable after reporting the new local stable epoch. The semantics maybe so + * strict as to a lot of DTX restart. + */ + if (cont->vc_mod_epoch_bound < epoch) { + D_DEBUG(DB_TRACE, "Increase acceptable modification boundary from " + DF_X64 " to " DF_X64 " for container " DF_UUID "\n", + cont->vc_mod_epoch_bound, epoch, DP_UUID(cont->vc_id)); + cont->vc_mod_epoch_bound = epoch; + } + + return epoch; +} + +/* + * The global stable epoch can be used for incremental reintegration: all the modifications + * involved in current target (container shard) under the global stable epoch have already + * been persistently stored globally, only need to care about the modification with newer + * epoch when reintegrate into the system. + */ +daos_epoch_t +vos_cont_get_globla_stable_epoch(daos_handle_t coh) +{ + struct vos_container *cont; + struct vos_cont_ext_df *cont_ext; + daos_epoch_t epoch = 0; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + cont_ext = umem_off2ptr(vos_cont2umm(cont), cont->vc_cont_df->cd_ext); + if (cont_ext != NULL) + epoch = cont_ext->ced_global_stable_epoch; + + return epoch; +} + +int +vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch) +{ + struct umem_instance *umm; + struct vos_container *cont; + struct vos_cont_ext_df *cont_ext; + daos_epoch_t old = 0; + int rc = 0; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + umm = vos_cont2umm(cont); + cont_ext = umem_off2ptr(umm, cont->vc_cont_df->cd_ext); + + /* Do not allow to set global stable epoch against old container without extension. */ + if (cont_ext == NULL) + D_GOTO(out, rc = -DER_NOTSUPPORTED); + + /* + * Either the leader gives wrong global stable epoch or current target does not participant + * in the calculating new globle stable epoch. Then do not allow to set globle stable epoch. + */ + if (unlikely(cont->vc_local_stable_epoch < epoch)) { + D_WARN("Invalid global stable epoch: " DF_X64" vs " DF_X64 " for container " + DF_UUID "\n", cont->vc_local_stable_epoch, epoch, DP_UUID(cont->vc_id)); + D_GOTO(out, rc = -DER_NO_PERM); + } + + if (unlikely(cont_ext->ced_global_stable_epoch > epoch)) { + D_WARN("Do not allow to rollback global stable epoch from " + DF_X64" to " DF_X64 " for container " DF_UUID "\n", + cont_ext->ced_global_stable_epoch, epoch, DP_UUID(cont->vc_id)); + D_GOTO(out, rc = -DER_NO_PERM); + } + + if (cont_ext->ced_global_stable_epoch == epoch) + D_GOTO(out, rc = 0); + + old = cont_ext->ced_global_stable_epoch; + rc = umem_tx_begin(umm, NULL); + if (rc == 0) { + rc = umem_tx_add_ptr(umm, &cont_ext->ced_global_stable_epoch, + sizeof(cont_ext->ced_global_stable_epoch)); + if (rc == 0) { + cont_ext->ced_global_stable_epoch = epoch; + rc = umem_tx_commit(vos_cont2umm(cont)); + } else { + rc = umem_tx_abort(umm, rc); + } + } + + DL_CDEBUG(rc != 0, DLOG_ERR, DB_MGMT, rc, + "Set global stable epoch from "DF_X64" to " DF_X64 " for container " DF_UUID, + old , epoch, DP_UUID(cont->vc_id)); + +out: + return rc; +} + +int +vos_cont_set_mod_bound(daos_handle_t coh, uint64_t epoch) +{ + struct vos_container *cont; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + if (cont->vc_mod_epoch_bound < epoch) { + D_DEBUG(DB_TRACE, "Increase acceptable modification boundary from " + DF_X64 " to " DF_X64 " for container " DF_UUID "\n", + cont->vc_mod_epoch_bound, epoch, DP_UUID(cont->vc_id)); + cont->vc_mod_epoch_bound = epoch; + } + + return 0; +} diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 86c100f4739..1973d6fc025 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -261,8 +261,10 @@ dtx_act_ent_free(struct btr_instance *tins, struct btr_record *rec, dae = umem_off2ptr(&tins->ti_umm, rec->rec_off); rec->rec_off = UMOFF_NULL; - if (dae != NULL) + if (dae != NULL) { + d_list_del_init(&dae->dae_order_link); d_list_del_init(&dae->dae_link); + } if (args != NULL) { /* Return the record addreass (offset in DRAM). @@ -990,11 +992,76 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) uint32_t idx; d_iov_t kiov; d_iov_t riov; + uint64_t now; int rc = 0; cont = vos_hdl2cont(dth->dth_coh); D_ASSERT(cont != NULL); + /* Do not allow the modification with too old epoch. */ + if (dth->dth_epoch <= cont->vc_mod_epoch_bound) { + now = daos_gettime_coarse(); + if (now - cont->vc_dtx_reject_ts > 10) { + D_WARN("Reject DTX (1) " DF_DTI " with epoch " DF_X64 + " vs bound " DF_X64 "\n", DP_DTI(&dth->dth_xid), + dth->dth_epoch, cont->vc_mod_epoch_bound); + cont->vc_dtx_reject_ts = now; + } + return -DER_TX_RESTART; + } + + /* + * NOTE: For the purpose of efficient calculating container based local stable epoch, + * we will maintain some kind of sorted list for active DTX entries with epoch + * order. But consider related overhead, it is not easy to maintain a strictly + * sorted list for all active DTX entries. For the DTX which leader resides on + * current target, its epoch is already sorted when generate on current engine. + * So the main difficulty is for those DTX entries which leaders are on remote + * targets. + * + * On the other hand, the local stable epoch is mainly used to generate global + * stable epoch that is for incremental reintegration. In fact, we do not need + * a very accurate global stable epoch for incremental reintegration. It means + * that it is no matter (or non-fatal) if the calculated stable epoch is a bit + * smaller than the real case. For example, seconds error for the stable epoch + * almost can be ignored if we compare such overhead with rebuilding the whole + * target from scratch. So for the DTX entry which leader is on remote target, + * we will maintain it in the list with relative incremental trend based on the + * epoch instead of strict sorting the epoch. We introduce an O(1) algorithm to + * handle such unsorted DTX entries list. + * + * For distributed transaction, its epoch may be generated on non-leader. + */ + + if (!dth->dth_epoch_owner && !d_list_empty(&cont->vc_dtx_unsorted_list)) { + dae = d_list_entry(cont->vc_dtx_unsorted_list.prev, struct vos_dtx_act_ent, + dae_order_link); + if (dth->dth_epoch < DAE_EPOCH(dae) && + cont->vc_mod_epoch_bound < DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap)) { + /* + * It guarantees that even if there was some older DTX to be added, + * the epoch difference between it and all former added ones cannot + * exceed vos_agg_gap. So we can easily calculate the local stable + * epoch. Please reference vos_cont_get_local_stable_epoch(). + */ + D_DEBUG(DB_TRACE, "Increase acceptable modification boundary from " + DF_X64 " to " DF_X64 " for container " DF_UUID "\n", + cont->vc_mod_epoch_bound, + DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap), DP_UUID(cont->vc_id)); + cont->vc_mod_epoch_bound = DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap); + if (dth->dth_epoch <= cont->vc_mod_epoch_bound) { + now = daos_gettime_coarse(); + if (now - cont->vc_dtx_reject_ts > 10) { + D_WARN("Reject DTX (2) " DF_DTI " with epoch " DF_X64 + " vs bound " DF_X64 "\n", DP_DTI(&dth->dth_xid), + dth->dth_epoch, cont->vc_mod_epoch_bound); + cont->vc_dtx_reject_ts = now; + } + return -DER_TX_RESTART; + } + } + } + rc = lrua_allocx(cont->vc_dtx_array, &idx, dth->dth_epoch, &dae, &dth->dth_local_stub); if (rc != 0) { /* The array is full, need to commit some transactions first */ @@ -1007,6 +1074,7 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) } D_INIT_LIST_HEAD(&dae->dae_link); + D_INIT_LIST_HEAD(&dae->dae_order_link); DAE_LID(dae) = idx + DTX_LID_RESERVED; if (dth->dth_solo) DAE_LID(dae) |= DTX_LID_SOLO_FLAG; @@ -1015,6 +1083,8 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) DAE_DKEY_HASH(dae) = dth->dth_dkey_hash; DAE_EPOCH(dae) = dth->dth_epoch; DAE_FLAGS(dae) = dth->dth_flags; + if (dth->dth_epoch_owner) + DAE_FLAGS(dae) |= DTE_EPOCH_SORTED; DAE_VER(dae) = dth->dth_ver; if (dth->dth_mbs != NULL) { @@ -1043,6 +1113,15 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) if (rc == 0) { dae->dae_start_time = daos_gettime_coarse(); d_list_add_tail(&dae->dae_link, &cont->vc_dtx_act_list); + if (dth->dth_epoch_owner) + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_sorted_list); + else + /* + * Add all the others, including non-leader(s), into unsorted list. + * Then even though the leader was evicted for some reason, related + * DTX still can be considered via the new leader on another target. + */ + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_unsorted_list); dth->dth_ent = dae; } else { dtx_evict_lid(cont, dae); @@ -2938,6 +3017,13 @@ vos_dtx_act_reindex(struct vos_container *cont) umem_off_t dbd_off = cont_df->cd_dtx_active_head; d_iov_t kiov; d_iov_t riov; + struct vos_dtx_act_ent *prev = NULL; + /* The max epoch for all unsorted DTX entries to be re-indexed. */ + uint64_t max_eph = 0; + /* The min epoch which DTX entry is after the max_eph DTX. */ + uint64_t min_eph = 0; + /* The largest diff for above pairs 'max_eph - min_eph'. */ + uint64_t diff = 0; uint64_t start_time = daos_gettime_coarse(); int rc = 0; int i; @@ -3027,6 +3113,42 @@ vos_dtx_act_reindex(struct vos_container *cont) dae->dae_start_time = start_time; d_list_add_tail(&dae->dae_link, &cont->vc_dtx_act_list); + if (DAE_FLAGS(dae) & DTE_EPOCH_SORTED) { + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_sorted_list); + } else { + /* + * The DXT entries in the active blob may be generated against + * different VOS AGG GAP configurations, or even upgraded from + * old system that did not support VOS AGG GAP logic yet. Link + * them into a reindex list. During the reindex scanning, we + * will find out the pairs with the largest epoch difference. + * Using such difference to estimate the local stable epoch. + * + * NOTE: The min_eph may be not the smallest one in all the DTX + * entries to be re-indexed, instead, it is after current + * known max_eph, and if max_eph is changed, min_eph will + * be reset. So there may be multiple max/min pairs. Each + * pairs has epoch own difference. We use the largest one. + * + * This is an O(N) algorithm. N is the count of DTX entries to be + * re-indexed. Please reference vos_cont_get_local_stable_epoch(). + */ + if (prev == NULL || DAE_EPOCH(dae) > DAE_EPOCH(prev)) { + if (max_eph < DAE_EPOCH(dae)) { + max_eph = DAE_EPOCH(dae); + min_eph = 0; + } + } else { + if (min_eph == 0 || min_eph > DAE_EPOCH(dae)) { + min_eph = DAE_EPOCH(dae); + if (diff < max_eph - min_eph) + diff = max_eph - min_eph; + } + } + + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_reindex_list); + } + prev = dae; dbd_count++; } @@ -3044,6 +3166,8 @@ vos_dtx_act_reindex(struct vos_container *cont) dbd_off = dbd->dbd_next; } + cont->vc_dtx_reindex_eph_diff = diff; + out: return rc > 0 ? 0 : rc; } @@ -3320,8 +3444,10 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) vos_dtx_cleanup_internal(dth); } - D_ERROR("Failed to pin DTX entry for "DF_DTI": "DF_RC"\n", - DP_DTI(&dth->dth_xid), DP_RC(rc)); + if (rc != 0) + DL_CDEBUG(rc != -DER_TX_RESTART, DLOG_ERR, DB_TRACE, rc, + "Failed to pin DTX entry for "DF_DTI": "DF_RC, + DP_DTI(&dth->dth_xid), DP_RC(rc)); } return rc; diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 30e92318299..0d86edf4b8e 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -140,6 +140,12 @@ enum { /* Throttle ENOSPACE error message */ #define VOS_NOSPC_ERROR_INTVL 60 /* seconds */ +extern uint32_t vos_agg_gap; + +#define VOS_AGG_GAP_MIN 20 /* seconds */ +#define VOS_AGG_GAP_DEF 60 +#define VOS_AGG_GAP_MAX 180 + extern unsigned int vos_agg_nvme_thresh; extern bool vos_dkey_punch_propagate; @@ -359,6 +365,31 @@ struct vos_container { struct btr_root vc_dtx_committed_btr; /* The list for active DTXs, roughly ordered in time. */ d_list_t vc_dtx_act_list; + /* The list for the active DTX entries with epoch sorted. */ + d_list_t vc_dtx_sorted_list; + /* The list for the active DTX entries (but not re-indexed) with epoch unsorted. */ + d_list_t vc_dtx_unsorted_list; + /* The list for the active DTX entries that are re-indexed when open the container. */ + d_list_t vc_dtx_reindex_list; + /* The largest epoch difference for re-indexed DTX entries max/min pairs. */ + uint64_t vc_dtx_reindex_eph_diff; + /* The latest calculated local stable epoch. */ + daos_epoch_t vc_local_stable_epoch; + /* + * The lowest epoch boundary for current acceptable modification. It cannot be lower than + * vc_local_stable_epoch, otherwise, it may break stable epoch semantics. Because current + * target reported local stable epoch may be used as global stable epoch. There is window + * between current target reporting the local stable epoch and related leader setting the + * global stable epoch. If the modification with older epoch arrives during such internal, + * we have to reject it to avoid potential conflict. + * + * On the other hand, it must be higher than EC/VOS aggregation up boundary. Under space + * pressure, the EC/VOS aggregation up boundary may be higher than vc_local_stable_epoch, + * then it will cause vc_mod_epoch_bound > vc_local_stable_epoch. + */ + daos_epoch_t vc_mod_epoch_bound; + /* Last timestamp when VOS reject DTX because of stale epoch. */ + uint64_t vc_dtx_reject_ts; /* The count of committed DTXs. */ uint32_t vc_dtx_committed_count; /** Index for timestamp lookup */ @@ -428,6 +459,8 @@ struct vos_dtx_act_ent { daos_unit_oid_t *dae_oids; /* The time (hlc) when the DTX entry is created. */ uint64_t dae_start_time; + /* Link into container::vc_dtx_{sorted,unsorted,reindex}_list. */ + d_list_t dae_order_link; /* Link into container::vc_dtx_act_list. */ d_list_t dae_link; /* Back pointer to the DTX handle. */ diff --git a/src/vos/vos_layout.h b/src/vos/vos_layout.h index 87d092bc882..40daa55da93 100644 --- a/src/vos/vos_layout.h +++ b/src/vos/vos_layout.h @@ -271,8 +271,13 @@ enum vos_io_stream { struct vos_cont_ext_df { /* GC bucket extension */ struct vos_gc_bkt_df ced_gc_bkt; + /* + * Any modification involved in current target (container shard) under the global + * stable epoch have already been persistently stored globally. + */ + uint64_t ced_global_stable_epoch; /* Reserved for potential new features */ - uint64_t ced_paddings[38]; + uint64_t ced_paddings[37]; /* Reserved for future extension */ uint64_t ced_reserve; };