From 88e58e1e50408ba59210639913062887931ee515 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Thu, 12 Dec 2024 22:53:24 +0800 Subject: [PATCH] DAOS-16809 vos: container based stable epoch For the purpose of efficient calculating container based local stable epoch, we will maintain some kind of sorted list for active DTX entries with epoch order. But consider related overhead, it is not easy to maintain a strictly sorted list for all active DTX entries. For the DTX which leader resides on current target, its epoch is already sorted when generate on current engine. So the main difficulty is for those DTX entries which leaders are on remote targets. On the other hand, the local stable epoch is mainly used to generate global stable epoch that is for incremental reintegration. In fact, we do not need a very accurate global stable epoch for incremental reintegration. It means that it is no matter (or non-fatal) if the calculated stable epoch is a bit smaller than the real case. For example, seconds error for the stable epoch almost can be ignored if we compare such overhead with rebuilding the whole target from scratch. So for the DTX entry which leader is on remote target, we will maintain it in the list with relative incremental trend based on the epoch instead of strict sorting the epoch. We introduce an O(1) algorithm to handle such unsorted DTX entries list for calculating local stable epoch. Main VOS APIs for the stable epoch: /* Calculate current locally known stable epoch for the given container. */ daos_epoch_t vos_cont_get_local_stable_epoch(daos_handle_t coh); /* Get global stable epoch for the given container. */ daos_epoch_t vos_cont_get_global_stable_epoch(daos_handle_t coh); /* Set global stable epoch for the given container. */ int vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch); Another important enhancement in the patch is about handling potential conflict between EC/VOS aggregation and delayed modification with very old epoch. For standalone transaction, when it is started on the DTX leader, its epoch is generated by the leader, then the modification RPC will be forwarded to other related non-leader(s). If the forwarded RPC is delayed for some reason, such as network congestion or system busy on the non-leader, as to the epoch for such transaction becomes very old (exceed related threshold), as to VOS aggregation may has already aggregated related epoch rang. Under such case, the non-leader will reject such modification to avoid data lost/corruption. For distributed transaction, if there is no read (fetch, query, enumerate, and so on) before client commit_tx, then related DTX leader will generate epoch for the transaction after client commit_tx. Then it will be the same as above standalone transaction for epoch handling. If the distributed transaction involves some read before client commit_tx, its epoch will be generated by the first accessed engine for read. If the transaction takes too long time after that, then when client commit_tx, its epoch may become very old as to related DTX leader will have to reject the transaction to avoid above mentioned conflict. And even if the DTX leader did not reject the transaction, some non-leader may also reject it because of the very old epoch. So it means that under such framework, the life for a distributed transaction cannot be too long. That can be adjusted via the server side environment variable DAOS_VOS_AGG_GAP. The default value is 60 seconds. NOTE: EC/VOS aggregation should avoid aggregating in the epoch range where lots of data records are pending to commit, so the aggregation epoch upper bound is 'current HLC - vos_agg_gap'. Signed-off-by: Fan Yong --- src/container/srv_target.c | 8 +- src/dtx/dtx_common.c | 3 + src/dtx/tests/dts_structs.c | 3 +- src/engine/sched.c | 26 +-- src/engine/srv_internal.h | 1 + src/include/daos/dtx.h | 13 +- src/include/daos_srv/container.h | 6 + src/include/daos_srv/dtx_srv.h | 6 +- src/include/daos_srv/vos.h | 50 +++++ src/include/daos_srv/vos_types.h | 2 + src/object/srv_obj.c | 13 +- src/tests/ftest/util/server_utils_params.py | 1 + src/utils/ddb/tests/ddb_test_driver.c | 6 +- src/vos/tests/vts_dtx.c | 5 +- src/vos/tests/vts_io.c | 20 +- src/vos/tests/vts_mvcc.c | 4 +- src/vos/tests/vts_pm.c | 77 +++----- src/vos/vos_common.c | 45 +++++ src/vos/vos_container.c | 204 ++++++++++++++++++++ src/vos/vos_dtx.c | 132 ++++++++++++- src/vos/vos_internal.h | 33 ++++ src/vos/vos_layout.h | 7 +- 22 files changed, 568 insertions(+), 97 deletions(-) diff --git a/src/container/srv_target.c b/src/container/srv_target.c index 653cb5f7d37..71d13f637bd 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -321,7 +321,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DAOS_FAIL_CHECK(DAOS_FORCE_EC_AGG_PEER_FAIL))) interval = 0; else - interval = d_sec2hlc(DAOS_AGG_THRESHOLD); + interval = cont->sc_agg_eph_gap; D_ASSERT(hlc > (interval * 2)); /* @@ -409,6 +409,9 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), tgt_id, epoch_range.epr_lo, epoch_range.epr_hi); + if (!param->ap_vos_agg) + vos_cont_set_mod_bound(cont->sc_hdl, epoch_range.epr_hi); + flags |= VOS_AGG_FL_FORCE_MERGE; rc = agg_cb(cont, &epoch_range, flags, param); if (rc) @@ -425,6 +428,9 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), tgt_id, epoch_range.epr_lo, epoch_range.epr_hi); + if (!param->ap_vos_agg) + vos_cont_set_mod_bound(cont->sc_hdl, epoch_range.epr_hi); + if (dss_xstream_is_busy()) flags &= ~VOS_AGG_FL_FORCE_MERGE; rc = agg_cb(cont, &epoch_range, flags, param); diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 1ee74ae11a4..96e8119d4ce 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -922,6 +922,7 @@ dtx_handle_init(struct dtx_id *dti, daos_handle_t xoh, struct dtx_epoch *epoch, dth->dth_for_migration = (flags & DTX_FOR_MIGRATION) ? 1 : 0; dth->dth_ignore_uncommitted = (flags & DTX_IGNORE_UNCOMMITTED) ? 1 : 0; dth->dth_prepared = (flags & DTX_PREPARED) ? 1 : 0; + dth->dth_epoch_owner = (flags & DTX_EPOCH_OWNER) ? 1 : 0; dth->dth_aborted = 0; dth->dth_already = 0; dth->dth_need_validation = 0; @@ -1853,6 +1854,8 @@ dtx_cont_register(struct ds_cont_child *cont) D_GOTO(out, rc = -DER_NOMEM); } + cont->sc_agg_eph_gap = d_sec2hlc(vos_get_agg_gap()); + ds_cont_child_get(cont); dbca->dbca_refs = 0; dbca->dbca_cont = cont; diff --git a/src/dtx/tests/dts_structs.c b/src/dtx/tests/dts_structs.c index dc4347fed7c..a763546f824 100644 --- a/src/dtx/tests/dts_structs.c +++ b/src/dtx/tests/dts_structs.c @@ -70,8 +70,9 @@ struct_dtx_handle(void **state) SET_BITFIELD_1(dummy, dth_need_validation); SET_BITFIELD_1(dummy, dth_ignore_uncommitted); SET_BITFIELD_1(dummy, dth_local); + SET_BITFIELD_1(dummy, dth_epoch_owner); SET_BITFIELD_1(dummy, dth_local_complete); - SET_BITFIELD(dummy, padding1, 13); + SET_BITFIELD(dummy, padding1, 12); SET_FIELD(dummy, dth_dti_cos_count); SET_FIELD(dummy, dth_dti_cos); diff --git a/src/engine/sched.c b/src/engine/sched.c index 49a46ca3618..807f150839e 100644 --- a/src/engine/sched.c +++ b/src/engine/sched.c @@ -197,17 +197,6 @@ enum { static int sched_policy; -/* - * Time threshold for giving IO up throttling. If space pressure stays in the - * highest level for enough long time, we assume that no more space can be - * reclaimed and choose to give up IO throttling, so that ENOSPACE error could - * be returned to client earlier. - * - * To make time for aggregation reclaiming overwriteen space, this threshold - * should be longer than the DAOS_AGG_THRESHOLD. - */ -#define SCHED_DELAY_THRESH 40000 /* msecs */ - struct pressure_ratio { unsigned int pr_free; /* free space ratio */ unsigned int pr_gc_ratio; /* CPU percentage for GC & Aggregation */ @@ -943,12 +932,21 @@ is_gc_pending(struct sched_pool_info *spi) return spi->spi_gc_ults && (spi->spi_gc_ults > spi->spi_gc_sleeping); } -/* Just run into this space pressure situation recently? */ +/* + * Just run into this space pressure situation recently? + * + * If space pressure stays in the highest level for enough long time, we assume + * that no more space can be reclaimed and choose to give up IO throttling, so + * that ENOSPACE error could be returned to client earlier. + * + * To make time for aggregation reclaiming overwriteen space, this threshold + * should be longer than VOS aggregation epoch gap with current HLC. + */ static inline bool is_pressure_recent(struct sched_info *info, struct sched_pool_info *spi) { D_ASSERT(info->si_cur_ts >= spi->spi_pressure_ts); - return (info->si_cur_ts - spi->spi_pressure_ts) < SCHED_DELAY_THRESH; + return (info->si_cur_ts - spi->spi_pressure_ts) < info->si_agg_gap; } static inline uint64_t @@ -2256,6 +2254,8 @@ sched_run(ABT_sched sched) return; } + dx->dx_sched_info.si_agg_gap = (vos_get_agg_gap() + 10) * 1000; /* msecs */ + while (1) { /* Try to pick network poll ULT */ pool = pools[DSS_POOL_NET_POLL]; diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 222f07e4906..d1f240270fb 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -61,6 +61,7 @@ struct sched_info { /* Number of kicked requests for each type in current cycle */ uint32_t si_kicked_req_cnt[SCHED_REQ_MAX]; unsigned int si_stop:1; + uint64_t si_agg_gap; }; struct mem_stats { diff --git a/src/include/daos/dtx.h b/src/include/daos/dtx.h index ca719077a14..f3aa2546850 100644 --- a/src/include/daos/dtx.h +++ b/src/include/daos/dtx.h @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -27,17 +27,6 @@ /* The time (in second) threshold for batched DTX commit. */ #define DTX_COMMIT_THRESHOLD_AGE 10 -/* - * VOS aggregation should try to avoid aggregating in the epoch range where - * lots of data records are pending to commit, so the aggregation epoch upper - * bound is: current HLC - (DTX batched commit threshold + buffer period) - * - * To avoid conflicting of aggregation vs. transactions, any transactional - * update/fetch with epoch lower than the aggregation upper bound should be - * rejected and restarted. - */ -#define DAOS_AGG_THRESHOLD (DTX_COMMIT_THRESHOLD_AGE + 10) /* seconds */ - enum dtx_target_flags { /* The target only contains read-only operations for the DTX. */ DTF_RDONLY = (1 << 0), diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index 9fc615c2a8b..14953a1972a 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -129,6 +129,12 @@ struct ds_cont_child { */ uint64_t sc_ec_update_timestamp; + /* + * The gap between the max allowed aggregation epoch and current HLC. The modification + * with older epoch out of range may cause conflict with aggregation as to be rejected. + */ + uint64_t sc_agg_eph_gap; + /* The objects with committable DTXs in DRAM. */ daos_handle_t sc_dtx_cos_hdl; /* The DTX COS-btree. */ diff --git a/src/include/daos_srv/dtx_srv.h b/src/include/daos_srv/dtx_srv.h index 7c60d2deaa0..f648b42d7be 100644 --- a/src/include/daos_srv/dtx_srv.h +++ b/src/include/daos_srv/dtx_srv.h @@ -113,8 +113,10 @@ struct dtx_handle { dth_ignore_uncommitted : 1, /* Local transaction */ dth_local : 1, + /* Locally generate the epoch. */ + dth_epoch_owner : 1, /* Flag to commit the local transaction */ - dth_local_complete : 1, padding1 : 13; + dth_local_complete : 1, padding1 : 12; /* The count the DTXs in the dth_dti_cos array. */ uint32_t dth_dti_cos_count; @@ -287,6 +289,8 @@ enum dtx_flags { DTX_RELAY = (1 << 10), /** Local transaction */ DTX_LOCAL = (1 << 11), + /** Locally generate the epoch. */ + DTX_EPOCH_OWNER = (1 << 12), }; void diff --git a/src/include/daos_srv/vos.h b/src/include/daos_srv/vos.h index b6287c2986e..e34ade044dd 100644 --- a/src/include/daos_srv/vos.h +++ b/src/include/daos_srv/vos.h @@ -938,6 +938,56 @@ vos_update_renew_epoch(daos_handle_t ioh, struct dtx_handle *dth); void vos_dtx_renew_epoch(struct dtx_handle *dth); +/** + * Calculate current locally known stable epoch for the given container. + * + * \param coh [IN] Container open handle + * + * \return The epoch on success, negative value if error. + */ +daos_epoch_t +vos_cont_get_local_stable_epoch(daos_handle_t coh); + +/** + * Get global stable epoch for the given container. + * + * \param coh [IN] Container open handle + * + * \return The epoch on success, negative value if error. + */ +daos_epoch_t +vos_cont_get_global_stable_epoch(daos_handle_t coh); + +/** + * Set global stable epoch for the given container. + * + * \param coh [IN] Container open handle + * \param epoch [IN] The epoch to be used as the new global stable epoch. + * + * \return Zero on success, negative value if error. + */ +int +vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch); + +/** + * Set the lowest allowed modification epoch for the given container. + * + * \param coh [IN] Container open handle + * \param epoch [IN] The lowest allowed epoch for modification. + * + * \return Zero on success, negative value if error. + */ +int +vos_cont_set_mod_bound(daos_handle_t coh, uint64_t epoch); + +/** + * Query the gap between the max allowed aggregation epoch and current HLC. + * + * \return The gap value in seconds. + */ +uint32_t +vos_get_agg_gap(void); + /** * Get the recx/epoch list. * diff --git a/src/include/daos_srv/vos_types.h b/src/include/daos_srv/vos_types.h index 0a52851c390..fa173cf1f63 100644 --- a/src/include/daos_srv/vos_types.h +++ b/src/include/daos_srv/vos_types.h @@ -58,6 +58,8 @@ enum dtx_entry_flags { * on all yet, need to be re-committed. */ DTE_PARTIAL_COMMITTED = (1 << 5), + /* The DTX epoch is sorted locally. */ + DTE_EPOCH_SORTED = (1 << 6), }; struct dtx_entry { diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index a79cec03f6f..2654df4d5f9 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2896,8 +2896,10 @@ ds_obj_rw_handler(crt_rpc_t *rpc) rc = process_epoch(&orw->orw_epoch, &orw->orw_epoch_first, &orw->orw_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { orw->orw_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } if (obj_rpc_is_fetch(rpc)) { struct dtx_handle *dth; @@ -3856,8 +3858,10 @@ ds_obj_punch_handler(crt_rpc_t *rpc) rc = process_epoch(&opi->opi_epoch, NULL /* epoch_first */, &opi->opi_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { opi->opi_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } version = opi->opi_map_ver; max_ver = opi->opi_map_ver; @@ -5110,6 +5114,7 @@ ds_obj_dtx_leader(struct daos_cpd_args *dca) &dcsh->dcsh_epoch.oe_first, &dcsh->dcsh_epoch.oe_rpc_flags); if (rc == PE_OK_LOCAL) { + dtx_flags |= DTX_EPOCH_OWNER; /* * In this case, writes to local RDGs can use the chosen epoch * without any uncertainty. This optimization is left to future @@ -5701,8 +5706,10 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) if (ocpi->ocpi_flags & ORF_LEADER) { rc = process_epoch(&ocpi->ocpi_epoch, NULL /* epoch_first */, &ocpi->ocpi_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { ocpi->ocpi_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } } else if (dct_nr == 1) { rc = obj_coll_local(rpc, dcts[0].dct_shards, dce, &version, &ioc, NULL, odm->odm_mbs, obj_coll_tgt_punch); diff --git a/src/tests/ftest/util/server_utils_params.py b/src/tests/ftest/util/server_utils_params.py index 46db4891220..b02e640ca25 100644 --- a/src/tests/ftest/util/server_utils_params.py +++ b/src/tests/ftest/util/server_utils_params.py @@ -436,6 +436,7 @@ class EngineYamlParameters(YamlParameters): "D_LOG_FILE_APPEND_PID=1", "DAOS_POOL_RF=4", "CRT_EVENT_DELAY=1", + "DAOS_VOS_AGG_GAP=25", "COVFILE=/tmp/test.cov"], "ofi+tcp": [], "ofi+tcp;ofi_rxm": [], diff --git a/src/utils/ddb/tests/ddb_test_driver.c b/src/utils/ddb/tests/ddb_test_driver.c index 15f6f9289c3..f5dbda34078 100644 --- a/src/utils/ddb/tests/ddb_test_driver.c +++ b/src/utils/ddb/tests/ddb_test_driver.c @@ -410,6 +410,7 @@ dvt_dtx_begin_helper(daos_handle_t coh, const daos_unit_oid_t *oid, daos_epoch_t struct dtx_handle *dth; struct dtx_memberships *mbs; size_t size; + int rc; D_ALLOC_PTR(dth); assert_non_null(dth); @@ -449,7 +450,8 @@ dvt_dtx_begin_helper(daos_handle_t coh, const daos_unit_oid_t *oid, daos_epoch_t dth->dth_shares_inited = 1; vos_dtx_rsrvd_init(dth); - vos_dtx_attach(dth, false, false); + rc = vos_dtx_attach(dth, false, false); + assert_rc_equal(rc, 0); *dthp = dth; } @@ -478,7 +480,7 @@ dvt_vos_insert_dtx_records(daos_handle_t coh, uint32_t nr, uint32_t committed_nr daos_recx_t recxs[recxs_nr]; daos_iod_t iod = {0}; d_sg_list_t sgl = {0}; - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); uint64_t dkey_hash = 0x123; int i; diff --git a/src/vos/tests/vts_dtx.c b/src/vos/tests/vts_dtx.c index bd54dd52838..402e265a938 100644 --- a/src/vos/tests/vts_dtx.c +++ b/src/vos/tests/vts_dtx.c @@ -43,6 +43,7 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch, uint64_t dkey_hash, struct dtx_handle **dthp) { struct dtx_handle *dth; + int rc; D_ALLOC_PTR(dth); assert_non_null(dth); @@ -66,6 +67,7 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch, dth->dth_for_migration = 0; dth->dth_ignore_uncommitted = 0; dth->dth_prepared = 0; + dth->dth_epoch_owner = 0; dth->dth_aborted = 0; dth->dth_already = 0; dth->dth_need_validation = 0; @@ -91,7 +93,8 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch, dth->dth_shares_inited = 1; vos_dtx_rsrvd_init(dth); - vos_dtx_attach(dth, false, false); + rc = vos_dtx_attach(dth, false, false); + assert_rc_equal(rc, 0); *dthp = dth; } diff --git a/src/vos/tests/vts_io.c b/src/vos/tests/vts_io.c index ff02abaf1e2..00d2a4172d0 100644 --- a/src/vos/tests/vts_io.c +++ b/src/vos/tests/vts_io.c @@ -175,7 +175,7 @@ test_args_init(struct io_test_args *args, memset(args, 0, sizeof(*args)); memset(&vts_cntr, 0, sizeof(vts_cntr)); - vts_epoch_gen = 1; + vts_epoch_gen = d_hlc_get(); rc = vts_ctx_init(&args->ctx, pool_size); if (rc != 0) @@ -1590,7 +1590,7 @@ vos_iterate_test(void **state) struct all_info info = {0}; vos_iter_param_t param = {0}; struct vos_iter_anchors anchors = {0}; - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); int rc = 0; unsigned long old_flags = arg->ta_flags; @@ -2038,7 +2038,7 @@ io_simple_one_key_cross_container(void **state) d_sg_list_t sgl; daos_key_t dkey; daos_key_t akey; - daos_epoch_t epoch = gen_rand_epoch(); + daos_epoch_t epoch; daos_unit_oid_t l_oid; /* Creating an additional container */ @@ -2087,6 +2087,7 @@ io_simple_one_key_cross_container(void **state) iod.iod_type = DAOS_IOD_ARRAY; l_oid = gen_oid(arg->otype); + epoch = gen_rand_epoch(); rc = vos_obj_update(arg->ctx.tc_co_hdl, arg->oid, epoch, 0, 0, &dkey, 1, &iod, NULL, &sgl); if (rc) { @@ -2526,7 +2527,7 @@ oid_iter_test_with_anchor(void **state) #define KEY_INC 127 #define MAX_INT_KEY (NUM_KEYS * KEY_INC) -static void gen_query_tree(struct io_test_args *arg, daos_unit_oid_t oid) +static void gen_query_tree(struct io_test_args *arg, daos_unit_oid_t oid, daos_epoch_t epoch) { daos_iod_t iod = {0}; d_sg_list_t sgl = {0}; @@ -2534,7 +2535,6 @@ static void gen_query_tree(struct io_test_args *arg, daos_unit_oid_t oid) daos_key_t akey; d_iov_t val_iov; daos_recx_t recx; - daos_epoch_t epoch = 1; uint64_t dkey_value; uint64_t akey_value; int i, j; @@ -2608,7 +2608,7 @@ io_query_key(void **state) int i, j; struct dtx_handle *dth; struct dtx_id xid; - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); daos_key_t dkey; daos_key_t akey; daos_key_t dkey_read; @@ -2623,7 +2623,7 @@ io_query_key(void **state) oid = gen_oid(arg->otype); - gen_query_tree(arg, oid); + gen_query_tree(arg, oid, epoch); for (i = 1; i <= NUM_KEYS; i++) { for (j = 1; j <= NUM_KEYS; j++) { @@ -2873,7 +2873,7 @@ io_query_key_punch_update(void **state) { struct io_test_args *arg = *state; int rc = 0; - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); daos_key_t dkey = { 0 }; daos_key_t akey; daos_recx_t recx_read; @@ -2949,7 +2949,7 @@ io_query_key_negative(void **state) &recx_read, NULL, 0, 0, NULL); assert_rc_equal(rc, -DER_NONEXIST); - gen_query_tree(arg, oid); + gen_query_tree(arg, oid, d_hlc_get()); rc = vos_obj_query_key(arg->ctx.tc_co_hdl, arg->oid, DAOS_GET_DKEY | DAOS_GET_MAX, 4, @@ -3006,7 +3006,7 @@ gang_sv_test(void **state) char dkey_buf[UPDATE_DKEY_SIZE], akey_buf[UPDATE_AKEY_SIZE]; char *update_buf, *fetch_buf; daos_size_t rsize = (27UL << 20); /* 27MB */ - daos_epoch_t epoch = 1; + daos_epoch_t epoch = d_hlc_get(); int rc; D_ALLOC(update_buf, rsize); diff --git a/src/vos/tests/vts_mvcc.c b/src/vos/tests/vts_mvcc.c index 6c625d2051a..d951b8b3bc9 100644 --- a/src/vos/tests/vts_mvcc.c +++ b/src/vos/tests/vts_mvcc.c @@ -1,5 +1,5 @@ /* - * (C) Copyright 2020-2023 Intel Corporation. + * (C) Copyright 2020-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -1720,7 +1720,7 @@ setup_mvcc(void **state) D_ASSERT(arg->custom == NULL); D_ALLOC_PTR(mvcc_arg); D_ASSERT(mvcc_arg != NULL); - mvcc_arg->epoch = 500; + mvcc_arg->epoch = d_hlc_get() + 500; d_getenv_bool("CMOCKA_TEST_ABORT", &mvcc_arg->fail_fast); arg->custom = mvcc_arg; return 0; diff --git a/src/vos/tests/vts_pm.c b/src/vos/tests/vts_pm.c index 99e53fc71b6..471806963bb 100644 --- a/src/vos/tests/vts_pm.c +++ b/src/vos/tests/vts_pm.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2022 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -25,7 +25,6 @@ #endif -static int start_epoch = 5; #define BUF_SIZE 2000 static int buf_size = BUF_SIZE; struct pm_info { @@ -1271,11 +1270,11 @@ cond_test(void **state) daos_unit_oid_t oid; d_sg_list_t sgl[MAX_SGL] = {0}; d_iov_t iov[MAX_SGL]; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; int i; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get() + 1000; oid = gen_oid(0); for (i = 0; i < MAX_SGL; i++) { @@ -1369,8 +1368,6 @@ cond_test(void **state) 0, -DER_NO_PERM, sgl, 5, "new", "foo", "f", "bar", "d", "val", "e", "flag", "new", "temp"); - - start_epoch = epoch + 1; } /** Making the oid generation deterministic, I get to 18201 before I hit a false @@ -1386,12 +1383,11 @@ multiple_oid_cond_test(void **state) daos_unit_oid_t oid; d_sg_list_t sgl = {0}; d_iov_t iov = {0}; - daos_epoch_t epoch = start_epoch + NUM_OIDS * 3; + daos_epoch_t epoch; int i; - start_epoch = epoch + 1; - test_args_reset(arg, VPOOL_SIZE); + epoch = d_hlc_get() + NUM_OIDS * 3; sgl.sg_iovs = &iov; sgl.sg_nr = 1; sgl.sg_nr_out = 1; @@ -1495,13 +1491,13 @@ remove_test(void **state) d_sg_list_t sgl; daos_recx_t recx[SM_BUF_LEN]; daos_unit_oid_t oid; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; int rc = 0; char key1 = 'a'; char key2 = 'b'; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); oid = gen_oid(0); d_iov_set(&dkey, &key1, sizeof(key1)); @@ -1594,8 +1590,6 @@ remove_test(void **state) FETCH_DATA, 1, &REM_VAL1[0], FETCH_HOLE, sizeof(REM_VAL1) + sizeof(REM_VAL2) + sizeof(REM_VAL3) - 5, FETCH_DATA, 1, &REM_VAL3[sizeof(REM_VAL3) - 2], FETCH_END); - - start_epoch = epoch + 1; } static void @@ -1670,7 +1664,7 @@ minor_epoch_punch_sv(void **state) daos_recx_t rex; daos_iod_t iod; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; struct dtx_handle *dth; struct dtx_id xid; const char *expected = "xxxxx"; @@ -1681,7 +1675,7 @@ minor_epoch_punch_sv(void **state) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(&rex, 0, sizeof(rex)); memset(&iod, 0, sizeof(iod)); @@ -1741,7 +1735,6 @@ minor_epoch_punch_sv(void **state) assert_memory_equal(buf, expected, strlen(expected)); d_sgl_fini(&sgl, false); - start_epoch = epoch + 1; } static void @@ -1754,7 +1747,7 @@ minor_epoch_punch_array(void **state) daos_recx_t rex; daos_iod_t iod; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; struct dtx_handle *dth; struct dtx_id xid; const char *expected = "xxxxxLonelyWorld"; @@ -1766,7 +1759,7 @@ minor_epoch_punch_array(void **state) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(&rex, 0, sizeof(rex)); memset(&iod, 0, sizeof(iod)); @@ -1837,7 +1830,6 @@ minor_epoch_punch_array(void **state) assert_memory_equal(buf, expected, strlen(expected)); d_sgl_fini(&sgl, false); - start_epoch = epoch + 1; } static void @@ -1850,7 +1842,7 @@ minor_epoch_punch_rebuild(void **state) daos_recx_t rex; daos_iod_t iod; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; const char *expected = "xxxxxlonelyworld"; const char *first = "hello"; const char *second = "lonelyworld"; @@ -1860,7 +1852,7 @@ minor_epoch_punch_rebuild(void **state) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(&rex, 0, sizeof(rex)); memset(&iod, 0, sizeof(iod)); @@ -1930,8 +1922,6 @@ minor_epoch_punch_rebuild(void **state) epoch += 2; d_sgl_fini(&sgl, false); - - start_epoch = epoch + 1; } #define NUM_RANKS 100 @@ -1948,7 +1938,7 @@ many_keys(void **state) daos_recx_t rex; daos_iod_t iod; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch = d_hlc_get(); const char *w = "x"; char *dkey_buf = DKEY_NAME; char akey_buf[UPDATE_DKEY_SIZE]; @@ -1995,8 +1985,6 @@ many_keys(void **state) } d_sgl_fini(&sgl, false); - - start_epoch = epoch + 1; } #define CELL_SZ 2 @@ -2102,7 +2090,7 @@ ec_size(void **state) struct io_test_args *arg = *state; int rc = 0; d_sg_list_t sgl; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; const char w[] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; daos_unit_oid_t oid; uint64_t size; @@ -2110,7 +2098,7 @@ ec_size(void **state) int i; test_args_reset(arg, VPOOL_1G); - + epoch = d_hlc_get(); rc = d_sgl_init(&sgl, 1); assert_rc_equal(rc, 0); @@ -2201,8 +2189,6 @@ ec_size(void **state) assert_int_equal(size, 201 * STRIPE_SZ); d_sgl_fini(&sgl, false); - - start_epoch = epoch + 1; } static void @@ -2219,7 +2205,7 @@ test_inprogress_parent_punch(void **state) d_sg_list_t sgl; struct dtx_handle *dth1; struct dtx_handle *dth2; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; struct dtx_id xid1; struct dtx_id xid2; const char *expected = "xxxxx"; @@ -2232,7 +2218,7 @@ test_inprogress_parent_punch(void **state) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(&rex, 0, sizeof(rex)); memset(&iod, 0, sizeof(iod)); @@ -2336,8 +2322,6 @@ test_inprogress_parent_punch(void **state) assert_memory_equal(buf, expected, strlen(expected)); d_sgl_fini(&sgl, false); - - start_epoch = epoch + 1; } #define NR_OBJ 10 @@ -2458,9 +2442,9 @@ many_tx(void **state) d_sg_list_t sgl; d_sg_list_t fetch_sgl; char buf[32]; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; daos_handle_t coh; - daos_epoch_range_t epr = {epoch, epoch}; + daos_epoch_range_t epr; struct vos_ioreq req[NR_TX] = {0}; const char *first = "Hello"; char dkey_buf[NR_DKEY][UPDATE_DKEY_SIZE]; @@ -2482,9 +2466,12 @@ many_tx(void **state) test_args_reset(arg, VPOOL_SIZE); coh = arg->ctx.tc_co_hdl; - memset(&iod, 0, sizeof(iod)); + epoch = d_hlc_get() + 1000; + epr.epr_lo = epoch; + epr.epr_hi = epoch; + rc = d_sgl_init(&sgl, 1); assert_rc_equal(rc, 0); rc = d_sgl_init(&fetch_sgl, 1); @@ -2624,7 +2611,6 @@ many_tx(void **state) d_sgl_fini(&sgl, false); d_sgl_fini(&fetch_sgl, false); - start_epoch = epoch + 1; } static struct dtx_id @@ -2686,7 +2672,7 @@ uncommitted_parent(void **state) daos_iod_t iod; d_sg_list_t sgl; char buf[32]; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; daos_handle_t coh; char *first = "Hello"; char dkey_buf[UPDATE_DKEY_SIZE]; @@ -2696,7 +2682,7 @@ uncommitted_parent(void **state) test_args_reset(arg, VPOOL_SIZE); coh = arg->ctx.tc_co_hdl; - + epoch = d_hlc_get(); memset(&iod, 0, sizeof(iod)); rc = d_sgl_init(&sgl, 1); @@ -2736,7 +2722,6 @@ uncommitted_parent(void **state) assert_memory_equal(buf, first, 5); d_sgl_fini(&sgl, false); - start_epoch = epoch + 1; } static void @@ -2750,7 +2735,7 @@ test_uncommitted_key(void **state) daos_iod_t iod; d_sg_list_t sgl; char buf[32]; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; daos_handle_t coh; char *first = "Hello"; char dkey_buf[UPDATE_DKEY_SIZE]; @@ -2760,7 +2745,7 @@ test_uncommitted_key(void **state) test_args_reset(arg, VPOOL_SIZE); coh = arg->ctx.tc_co_hdl; - + epoch = d_hlc_get(); memset(&iod, 0, sizeof(iod)); rc = d_sgl_init(&sgl, 1); @@ -2798,7 +2783,6 @@ test_uncommitted_key(void **state) assert_memory_equal(buf, "Hello", 5); d_sgl_fini(&sgl, false); - start_epoch = epoch + 1; } static void @@ -2812,7 +2796,7 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) daos_recx_t rex[2] = {0}; daos_iod_t iod[2] = {0}; d_sg_list_t sgl[2] = {0}; - daos_epoch_t epoch = start_epoch; + daos_epoch_t epoch; struct dtx_handle *dth = NULL; struct dtx_id xid; const char *expected = "xxxxx"; @@ -2826,7 +2810,7 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) daos_unit_oid_t oid; test_args_reset(arg, VPOOL_SIZE); - + epoch = d_hlc_get(); memset(rex, 0, sizeof(rex)); memset(iod, 0, sizeof(iod)); @@ -3011,7 +2995,6 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) if (with_dtx) vts_dtx_end(dth); - start_epoch = epoch + 1; d_sgl_fini(&sgl[0], false); d_sgl_fini(&sgl[1], false); } diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index e19768d4c03..c57802a4f16 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -620,6 +620,42 @@ struct dss_module_key vos_module_key = { daos_epoch_t vos_start_epoch = DAOS_EPOCH_MAX; +/* + * For standalone transaction, when it is started on the DTX leader, its epoch + * is generated by the leader, then the modification RPC will be forwarded to + * other related non-leader(s). If the forwarded RPC is delayed for some reason, + * such as network congestion or system busy on the non-leader, as to the epoch + * for such transaction becomes very old (exceed related threshold), as to VOS + * aggregation may has already aggregated related epoch rang. Under such case, + * the non-leader will reject such modification to avoid data lost/corruption. + * + * For distributed transaction, if there is no read (fetch, query, enumerate, + * and so on) before client commit_tx, then related DTX leader will generate + * epoch for the transaction after client commit_tx. Then it will be the same + * as above standalone transaction for epoch handling. + * + * If the distributed transaction involves some read before client commit_tx, + * its epoch will be generated by the first accessed engine for read. If the + * transaction takes too long time after that, then when client commit_tx, its + * epoch may become very old as to related DTX leader will have to reject the + * transaction to avoid above mentioned conflict. And even if the DTX leader + * did not reject the transaction, some non-leader may also reject it because + * of the very old epoch. So it means that under such framework, the life for + * a distributed transaction cannot be too long. That can be adjusted via the + * server side environment variable DAOS_VOS_AGG_GAP. + * + * NOTE: EC/VOS aggregation should avoid aggregating in the epoch range where + * lots of data records are pending to commit, so the aggregation epoch + * upper bound is 'current HLC - vos_agg_gap'. + */ +uint32_t vos_agg_gap; + +uint32_t +vos_get_agg_gap(void) +{ + return vos_agg_gap; +} + static int vos_mod_init(void) { @@ -679,6 +715,15 @@ vos_mod_init(void) d_getenv_bool("DAOS_DKEY_PUNCH_PROPAGATE", &vos_dkey_punch_propagate); D_INFO("DKEY punch propagation is %s\n", vos_dkey_punch_propagate ? "enabled" : "disabled"); + vos_agg_gap = VOS_AGG_GAP_DEF; + d_getenv_uint("DAOS_VOS_AGG_GAP", &vos_agg_gap); + if (vos_agg_gap < VOS_AGG_GAP_MIN || vos_agg_gap > VOS_AGG_GAP_MAX) { + D_WARN("Invalid DAOS_VOS_AGG_GAP value, " + "valid range [%u, %u], set it as default %u (second)\n", + VOS_AGG_GAP_MIN, VOS_AGG_GAP_MAX, VOS_AGG_GAP_DEF); + vos_agg_gap = VOS_AGG_GAP_DEF; + } + D_INFO("Set DAOS VOS aggregation gap as %u (second)\n", vos_agg_gap); return rc; } diff --git a/src/vos/vos_container.c b/src/vos/vos_container.c index a5a55a902b9..5d0b656b7ff 100644 --- a/src/vos/vos_container.c +++ b/src/vos/vos_container.c @@ -198,6 +198,9 @@ cont_free_internal(struct vos_container *cont) lrua_array_free(cont->vc_dtx_array); D_ASSERT(d_list_empty(&cont->vc_dtx_act_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_sorted_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_unsorted_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_reindex_list)); dbtree_close(cont->vc_btr_hdl); @@ -395,6 +398,9 @@ vos_cont_open(daos_handle_t poh, uuid_t co_uuid, daos_handle_t *coh) cont->vc_cmt_dtx_indexed = 0; cont->vc_cmt_dtx_reindex_pos = cont->vc_cont_df->cd_dtx_committed_head; D_INIT_LIST_HEAD(&cont->vc_dtx_act_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_sorted_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_unsorted_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_reindex_list); cont->vc_dtx_committed_count = 0; cont->vc_solo_dtx_epoch = d_hlc_get(); rc = gc_open_cont(cont); @@ -461,6 +467,21 @@ vos_cont_open(daos_handle_t poh, uuid_t co_uuid, daos_handle_t *coh) } } + /* + * Assign vc_mod_epoch_bound with current HLC, then all former reported local stable + * epoch (without persistently stored) before re-opening the container will be older + * than vc_mod_epoch_bound. It is possible that some modification was started before + * current container reopen (such as for engine restart without related pool service + * down), but related RPC was not forwarded to current engine in time. After current + * engine re-opening the container (shard), it will reject such old modification and + * ask related DTX leader to restart the transaction. It only may affect inflight IO + * during re-opening container without restarting pool service. + * + * With the assignment, we also do not need to consider former EC/VOS aggregation up + * boundary when reopen the container. + */ + cont->vc_mod_epoch_bound = d_hlc_get(); + rc = vos_dtx_act_reindex(cont); if (rc != 0) { D_ERROR("Fail to reindex active DTX entries: %d\n", rc); @@ -815,3 +836,186 @@ struct vos_iter_ops vos_cont_iter_ops = { .iop_fetch = cont_iter_fetch, .iop_process = cont_iter_process, }; + +/* + * The local stable epoch can be used to calculate global stable epoch: all the container + * shards report each own local stable epoch to some leader who will find out the smallest + * one as the global stable epoch and dispatch it to all related container shards. + */ +daos_epoch_t +vos_cont_get_local_stable_epoch(daos_handle_t coh) +{ + struct vos_container *cont; + struct vos_dtx_act_ent *dae; + uint64_t gap = d_sec2hlc(vos_agg_gap); + daos_epoch_t epoch = d_hlc_get() - gap; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + /* + * If the oldest (that is at the head of the sorted list) sorted DTX's + * epoch is out of the boundary, then use it as the local stable epoch. + */ + if (!d_list_empty(&cont->vc_dtx_sorted_list)) { + dae = d_list_entry(cont->vc_dtx_sorted_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch >= DAE_EPOCH(dae)) + epoch = DAE_EPOCH(dae) - 1; + } + + /* + * It is not easy to know which DTX is the oldest one in the unsorted list. + * The one after the header in the list maybe older than the header. But the + * epoch difference will NOT exceed 'vos_agg_gap' since any DTX with older + * epoch will be rejected (and restart with newer epoch). + * + * So "DAE_EPOCH(header) - vos_agg_gap" can be used to estimate the local + * stable epoch for unsorted DTX entries. + */ + if (!d_list_empty(&cont->vc_dtx_unsorted_list)) { + dae = d_list_entry(cont->vc_dtx_unsorted_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch > DAE_EPOCH(dae) - gap) + epoch = DAE_EPOCH(dae) - gap; + } + + /* + * The historical vos_agg_gap for the DTX entries in the reindex list is unknown. + * We use cont->vc_dtx_reindex_eph_diff to estimate the local stable epoch. That + * may be over-estimated. Usually, the count of re-indexed DTX entries is quite + * limited, and will be purged soon after the container opened (via DTX resync). + * So it will not much affect the local stable epoch calculation. + */ + if (unlikely(!d_list_empty(&cont->vc_dtx_reindex_list))) { + dae = d_list_entry(cont->vc_dtx_reindex_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch > DAE_EPOCH(dae) - cont->vc_dtx_reindex_eph_diff) + epoch = DAE_EPOCH(dae) - cont->vc_dtx_reindex_eph_diff; + } + + /* + * vc_mod_epoch_bound guarantee that no modification with older epoch after last + * reporting local stable epoch can be accepted. So if the new calculated result + * is older, then reuse the former one. + */ + if (unlikely(epoch < cont->vc_local_stable_epoch)) + epoch = cont->vc_local_stable_epoch; + else + cont->vc_local_stable_epoch = epoch; + + /* + * Update vc_mod_epoch_bound to guarantee that on update with older epoch can be + * acceptable after reporting the new local stable epoch. The semantics maybe so + * strict as to a lot of DTX restart. + */ + if (cont->vc_mod_epoch_bound < epoch) { + D_DEBUG(DB_TRACE, "Increase acceptable modification boundary from " + DF_X64 " to " DF_X64 " for container " DF_UUID "\n", + cont->vc_mod_epoch_bound, epoch, DP_UUID(cont->vc_id)); + cont->vc_mod_epoch_bound = epoch; + } + + return epoch; +} + +/* + * The global stable epoch can be used for incremental reintegration: all the modifications + * involved in current target (container shard) under the global stable epoch have already + * been persistently stored globally, only need to care about the modification with newer + * epoch when reintegrate into the system. + */ +daos_epoch_t +vos_cont_get_globla_stable_epoch(daos_handle_t coh) +{ + struct vos_container *cont; + struct vos_cont_ext_df *cont_ext; + daos_epoch_t epoch = 0; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + cont_ext = umem_off2ptr(vos_cont2umm(cont), cont->vc_cont_df->cd_ext); + if (cont_ext != NULL) + epoch = cont_ext->ced_global_stable_epoch; + + return epoch; +} + +int +vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch) +{ + struct umem_instance *umm; + struct vos_container *cont; + struct vos_cont_ext_df *cont_ext; + daos_epoch_t old = 0; + int rc = 0; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + umm = vos_cont2umm(cont); + cont_ext = umem_off2ptr(umm, cont->vc_cont_df->cd_ext); + + /* Do not allow to set global stable epoch against old container without extension. */ + if (cont_ext == NULL) + D_GOTO(out, rc = -DER_NOTSUPPORTED); + + /* + * Either the leader gives wrong global stable epoch or current target does not participant + * in the calculating new globle stable epoch. Then do not allow to set globle stable epoch. + */ + if (unlikely(cont->vc_local_stable_epoch < epoch)) { + D_WARN("Invalid global stable epoch: " DF_X64" vs " DF_X64 " for container " + DF_UUID "\n", cont->vc_local_stable_epoch, epoch, DP_UUID(cont->vc_id)); + D_GOTO(out, rc = -DER_NO_PERM); + } + + if (unlikely(cont_ext->ced_global_stable_epoch > epoch)) { + D_WARN("Do not allow to rollback global stable epoch from " + DF_X64" to " DF_X64 " for container " DF_UUID "\n", + cont_ext->ced_global_stable_epoch, epoch, DP_UUID(cont->vc_id)); + D_GOTO(out, rc = -DER_NO_PERM); + } + + if (cont_ext->ced_global_stable_epoch == epoch) + D_GOTO(out, rc = 0); + + old = cont_ext->ced_global_stable_epoch; + rc = umem_tx_begin(umm, NULL); + if (rc == 0) { + rc = umem_tx_add_ptr(umm, &cont_ext->ced_global_stable_epoch, + sizeof(cont_ext->ced_global_stable_epoch)); + if (rc == 0) { + cont_ext->ced_global_stable_epoch = epoch; + rc = umem_tx_commit(vos_cont2umm(cont)); + } else { + rc = umem_tx_abort(umm, rc); + } + } + + DL_CDEBUG(rc != 0, DLOG_ERR, DB_MGMT, rc, + "Set global stable epoch from "DF_X64" to " DF_X64 " for container " DF_UUID, + old , epoch, DP_UUID(cont->vc_id)); + +out: + return rc; +} + +int +vos_cont_set_mod_bound(daos_handle_t coh, uint64_t epoch) +{ + struct vos_container *cont; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + if (cont->vc_mod_epoch_bound < epoch) { + D_DEBUG(DB_TRACE, "Increase acceptable modification boundary from " + DF_X64 " to " DF_X64 " for container " DF_UUID "\n", + cont->vc_mod_epoch_bound, epoch, DP_UUID(cont->vc_id)); + cont->vc_mod_epoch_bound = epoch; + } + + return 0; +} diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 86c100f4739..1973d6fc025 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -261,8 +261,10 @@ dtx_act_ent_free(struct btr_instance *tins, struct btr_record *rec, dae = umem_off2ptr(&tins->ti_umm, rec->rec_off); rec->rec_off = UMOFF_NULL; - if (dae != NULL) + if (dae != NULL) { + d_list_del_init(&dae->dae_order_link); d_list_del_init(&dae->dae_link); + } if (args != NULL) { /* Return the record addreass (offset in DRAM). @@ -990,11 +992,76 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) uint32_t idx; d_iov_t kiov; d_iov_t riov; + uint64_t now; int rc = 0; cont = vos_hdl2cont(dth->dth_coh); D_ASSERT(cont != NULL); + /* Do not allow the modification with too old epoch. */ + if (dth->dth_epoch <= cont->vc_mod_epoch_bound) { + now = daos_gettime_coarse(); + if (now - cont->vc_dtx_reject_ts > 10) { + D_WARN("Reject DTX (1) " DF_DTI " with epoch " DF_X64 + " vs bound " DF_X64 "\n", DP_DTI(&dth->dth_xid), + dth->dth_epoch, cont->vc_mod_epoch_bound); + cont->vc_dtx_reject_ts = now; + } + return -DER_TX_RESTART; + } + + /* + * NOTE: For the purpose of efficient calculating container based local stable epoch, + * we will maintain some kind of sorted list for active DTX entries with epoch + * order. But consider related overhead, it is not easy to maintain a strictly + * sorted list for all active DTX entries. For the DTX which leader resides on + * current target, its epoch is already sorted when generate on current engine. + * So the main difficulty is for those DTX entries which leaders are on remote + * targets. + * + * On the other hand, the local stable epoch is mainly used to generate global + * stable epoch that is for incremental reintegration. In fact, we do not need + * a very accurate global stable epoch for incremental reintegration. It means + * that it is no matter (or non-fatal) if the calculated stable epoch is a bit + * smaller than the real case. For example, seconds error for the stable epoch + * almost can be ignored if we compare such overhead with rebuilding the whole + * target from scratch. So for the DTX entry which leader is on remote target, + * we will maintain it in the list with relative incremental trend based on the + * epoch instead of strict sorting the epoch. We introduce an O(1) algorithm to + * handle such unsorted DTX entries list. + * + * For distributed transaction, its epoch may be generated on non-leader. + */ + + if (!dth->dth_epoch_owner && !d_list_empty(&cont->vc_dtx_unsorted_list)) { + dae = d_list_entry(cont->vc_dtx_unsorted_list.prev, struct vos_dtx_act_ent, + dae_order_link); + if (dth->dth_epoch < DAE_EPOCH(dae) && + cont->vc_mod_epoch_bound < DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap)) { + /* + * It guarantees that even if there was some older DTX to be added, + * the epoch difference between it and all former added ones cannot + * exceed vos_agg_gap. So we can easily calculate the local stable + * epoch. Please reference vos_cont_get_local_stable_epoch(). + */ + D_DEBUG(DB_TRACE, "Increase acceptable modification boundary from " + DF_X64 " to " DF_X64 " for container " DF_UUID "\n", + cont->vc_mod_epoch_bound, + DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap), DP_UUID(cont->vc_id)); + cont->vc_mod_epoch_bound = DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap); + if (dth->dth_epoch <= cont->vc_mod_epoch_bound) { + now = daos_gettime_coarse(); + if (now - cont->vc_dtx_reject_ts > 10) { + D_WARN("Reject DTX (2) " DF_DTI " with epoch " DF_X64 + " vs bound " DF_X64 "\n", DP_DTI(&dth->dth_xid), + dth->dth_epoch, cont->vc_mod_epoch_bound); + cont->vc_dtx_reject_ts = now; + } + return -DER_TX_RESTART; + } + } + } + rc = lrua_allocx(cont->vc_dtx_array, &idx, dth->dth_epoch, &dae, &dth->dth_local_stub); if (rc != 0) { /* The array is full, need to commit some transactions first */ @@ -1007,6 +1074,7 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) } D_INIT_LIST_HEAD(&dae->dae_link); + D_INIT_LIST_HEAD(&dae->dae_order_link); DAE_LID(dae) = idx + DTX_LID_RESERVED; if (dth->dth_solo) DAE_LID(dae) |= DTX_LID_SOLO_FLAG; @@ -1015,6 +1083,8 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) DAE_DKEY_HASH(dae) = dth->dth_dkey_hash; DAE_EPOCH(dae) = dth->dth_epoch; DAE_FLAGS(dae) = dth->dth_flags; + if (dth->dth_epoch_owner) + DAE_FLAGS(dae) |= DTE_EPOCH_SORTED; DAE_VER(dae) = dth->dth_ver; if (dth->dth_mbs != NULL) { @@ -1043,6 +1113,15 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) if (rc == 0) { dae->dae_start_time = daos_gettime_coarse(); d_list_add_tail(&dae->dae_link, &cont->vc_dtx_act_list); + if (dth->dth_epoch_owner) + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_sorted_list); + else + /* + * Add all the others, including non-leader(s), into unsorted list. + * Then even though the leader was evicted for some reason, related + * DTX still can be considered via the new leader on another target. + */ + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_unsorted_list); dth->dth_ent = dae; } else { dtx_evict_lid(cont, dae); @@ -2938,6 +3017,13 @@ vos_dtx_act_reindex(struct vos_container *cont) umem_off_t dbd_off = cont_df->cd_dtx_active_head; d_iov_t kiov; d_iov_t riov; + struct vos_dtx_act_ent *prev = NULL; + /* The max epoch for all unsorted DTX entries to be re-indexed. */ + uint64_t max_eph = 0; + /* The min epoch which DTX entry is after the max_eph DTX. */ + uint64_t min_eph = 0; + /* The largest diff for above pairs 'max_eph - min_eph'. */ + uint64_t diff = 0; uint64_t start_time = daos_gettime_coarse(); int rc = 0; int i; @@ -3027,6 +3113,42 @@ vos_dtx_act_reindex(struct vos_container *cont) dae->dae_start_time = start_time; d_list_add_tail(&dae->dae_link, &cont->vc_dtx_act_list); + if (DAE_FLAGS(dae) & DTE_EPOCH_SORTED) { + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_sorted_list); + } else { + /* + * The DXT entries in the active blob may be generated against + * different VOS AGG GAP configurations, or even upgraded from + * old system that did not support VOS AGG GAP logic yet. Link + * them into a reindex list. During the reindex scanning, we + * will find out the pairs with the largest epoch difference. + * Using such difference to estimate the local stable epoch. + * + * NOTE: The min_eph may be not the smallest one in all the DTX + * entries to be re-indexed, instead, it is after current + * known max_eph, and if max_eph is changed, min_eph will + * be reset. So there may be multiple max/min pairs. Each + * pairs has epoch own difference. We use the largest one. + * + * This is an O(N) algorithm. N is the count of DTX entries to be + * re-indexed. Please reference vos_cont_get_local_stable_epoch(). + */ + if (prev == NULL || DAE_EPOCH(dae) > DAE_EPOCH(prev)) { + if (max_eph < DAE_EPOCH(dae)) { + max_eph = DAE_EPOCH(dae); + min_eph = 0; + } + } else { + if (min_eph == 0 || min_eph > DAE_EPOCH(dae)) { + min_eph = DAE_EPOCH(dae); + if (diff < max_eph - min_eph) + diff = max_eph - min_eph; + } + } + + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_reindex_list); + } + prev = dae; dbd_count++; } @@ -3044,6 +3166,8 @@ vos_dtx_act_reindex(struct vos_container *cont) dbd_off = dbd->dbd_next; } + cont->vc_dtx_reindex_eph_diff = diff; + out: return rc > 0 ? 0 : rc; } @@ -3320,8 +3444,10 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) vos_dtx_cleanup_internal(dth); } - D_ERROR("Failed to pin DTX entry for "DF_DTI": "DF_RC"\n", - DP_DTI(&dth->dth_xid), DP_RC(rc)); + if (rc != 0) + DL_CDEBUG(rc != -DER_TX_RESTART, DLOG_ERR, DB_TRACE, rc, + "Failed to pin DTX entry for "DF_DTI": "DF_RC, + DP_DTI(&dth->dth_xid), DP_RC(rc)); } return rc; diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 30e92318299..0d86edf4b8e 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -140,6 +140,12 @@ enum { /* Throttle ENOSPACE error message */ #define VOS_NOSPC_ERROR_INTVL 60 /* seconds */ +extern uint32_t vos_agg_gap; + +#define VOS_AGG_GAP_MIN 20 /* seconds */ +#define VOS_AGG_GAP_DEF 60 +#define VOS_AGG_GAP_MAX 180 + extern unsigned int vos_agg_nvme_thresh; extern bool vos_dkey_punch_propagate; @@ -359,6 +365,31 @@ struct vos_container { struct btr_root vc_dtx_committed_btr; /* The list for active DTXs, roughly ordered in time. */ d_list_t vc_dtx_act_list; + /* The list for the active DTX entries with epoch sorted. */ + d_list_t vc_dtx_sorted_list; + /* The list for the active DTX entries (but not re-indexed) with epoch unsorted. */ + d_list_t vc_dtx_unsorted_list; + /* The list for the active DTX entries that are re-indexed when open the container. */ + d_list_t vc_dtx_reindex_list; + /* The largest epoch difference for re-indexed DTX entries max/min pairs. */ + uint64_t vc_dtx_reindex_eph_diff; + /* The latest calculated local stable epoch. */ + daos_epoch_t vc_local_stable_epoch; + /* + * The lowest epoch boundary for current acceptable modification. It cannot be lower than + * vc_local_stable_epoch, otherwise, it may break stable epoch semantics. Because current + * target reported local stable epoch may be used as global stable epoch. There is window + * between current target reporting the local stable epoch and related leader setting the + * global stable epoch. If the modification with older epoch arrives during such internal, + * we have to reject it to avoid potential conflict. + * + * On the other hand, it must be higher than EC/VOS aggregation up boundary. Under space + * pressure, the EC/VOS aggregation up boundary may be higher than vc_local_stable_epoch, + * then it will cause vc_mod_epoch_bound > vc_local_stable_epoch. + */ + daos_epoch_t vc_mod_epoch_bound; + /* Last timestamp when VOS reject DTX because of stale epoch. */ + uint64_t vc_dtx_reject_ts; /* The count of committed DTXs. */ uint32_t vc_dtx_committed_count; /** Index for timestamp lookup */ @@ -428,6 +459,8 @@ struct vos_dtx_act_ent { daos_unit_oid_t *dae_oids; /* The time (hlc) when the DTX entry is created. */ uint64_t dae_start_time; + /* Link into container::vc_dtx_{sorted,unsorted,reindex}_list. */ + d_list_t dae_order_link; /* Link into container::vc_dtx_act_list. */ d_list_t dae_link; /* Back pointer to the DTX handle. */ diff --git a/src/vos/vos_layout.h b/src/vos/vos_layout.h index 87d092bc882..40daa55da93 100644 --- a/src/vos/vos_layout.h +++ b/src/vos/vos_layout.h @@ -271,8 +271,13 @@ enum vos_io_stream { struct vos_cont_ext_df { /* GC bucket extension */ struct vos_gc_bkt_df ced_gc_bkt; + /* + * Any modification involved in current target (container shard) under the global + * stable epoch have already been persistently stored globally. + */ + uint64_t ced_global_stable_epoch; /* Reserved for potential new features */ - uint64_t ced_paddings[38]; + uint64_t ced_paddings[37]; /* Reserved for future extension */ uint64_t ced_reserve; };