Skip to content

Commit

Permalink
DAOS-16809 vos: container based stable epoch
Browse files Browse the repository at this point in the history
For the purpose of efficient calculating container based local stable epoch,
we will maintain some kind of sorted list for active DTX entries with epoch
order. But consider related overhead, it is not easy to maintain a strictly
sorted list for all active DTX entries. For the DTX which leader resides on
current target, its epoch is already sorted when generate on current engine.
So the main difficulty is for those DTX entries which leaders are on remote
targets.

On the other hand, the local stable epoch is mainly used to generate global
stable epoch that is for incremental reintegration. In fact, we do not need
a very accurate global stable epoch for incremental reintegration. It means
that it is no matter (or non-fatal) if the calculated stable epoch is a bit
smaller than the real case. For example, seconds error for the stable epoch
almost can be ignored if we compare such overhead with rebuilding the whole
target from scratch. So for the DTX entry which leader is on remote target,
we will maintain it in the list with relative incremental trend based on the
epoch instead of strict sorting the epoch. We introduce an O(1) algorithm to
handle such unsorted DTX entries list for calculating local stable epoch.

Main VOS APIs for the stable epoch:

/* Calculate current locally known stable epoch for the given container. */
daos_epoch_t vos_cont_get_local_stable_epoch(daos_handle_t coh);

/* Get global stable epoch for the given container. */
daos_epoch_t vos_cont_get_global_stable_epoch(daos_handle_t coh);

/* Set global stable epoch for the given container. */
int vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch);

Another important enhancement in the patch is about handling potential
conflict between EC/VOS aggregation and delayed modification with very
old epoch.

For standalone transaction, when it is started on the DTX leader, its epoch
is generated by the leader, then the modification RPC will be forwarded to
other related non-leader(s). If the forwarded RPC is delayed for some reason,
such as network congestion or system busy on the non-leader, as to the epoch
for such transaction becomes very old (exceed related threshold), as to VOS
aggregation may has already aggregated related epoch rang. Under such case,
the non-leader will reject such modification to avoid data lost/corruption.

For distributed transaction, if there is no read (fetch, query, enumerate,
and so on) before client commit_tx, then related DTX leader will generate
epoch for the transaction after client commit_tx. Then it will be the same
as above standalone transaction for epoch handling.

If the distributed transaction involves some read before client commit_tx,
its epoch will be generated by the first accessed engine for read. If the
transaction takes too long time after that, then when client commit_tx, its
epoch may become very old as to related DTX leader will have to reject the
transaction to avoid above mentioned conflict. And even if the DTX leader
did not reject the transaction, some non-leader may also reject it because
of the very old epoch. So it means that under such framework, the life for
a distributed transaction cannot be too long. That can be adjusted via the
server side environment variable DAOS_VOS_AGG_GAP. The default value is 60
seconds.

NOTE: EC/VOS aggregation should avoid aggregating in the epoch range where
      lots of data records are pending to commit, so the aggregation epoch
      upper bound is 'current HLC - vos_agg_gap'.

Signed-off-by: Fan Yong <[email protected]>
  • Loading branch information
Nasf-Fan committed Dec 14, 2024
1 parent b1a16a8 commit 0062344
Show file tree
Hide file tree
Showing 21 changed files with 544 additions and 89 deletions.
8 changes: 7 additions & 1 deletion src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
DAOS_FAIL_CHECK(DAOS_FORCE_EC_AGG_PEER_FAIL)))
interval = 0;
else
interval = d_sec2hlc(DAOS_AGG_THRESHOLD);
interval = cont->sc_agg_eph_gap;

D_ASSERT(hlc > (interval * 2));
/*
Expand Down Expand Up @@ -409,6 +409,9 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
tgt_id, epoch_range.epr_lo, epoch_range.epr_hi);

if (!param->ap_vos_agg)
vos_cont_set_mod_bound(cont->sc_hdl, epoch_range.epr_hi);

flags |= VOS_AGG_FL_FORCE_MERGE;
rc = agg_cb(cont, &epoch_range, flags, param);
if (rc)
Expand All @@ -425,6 +428,9 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
tgt_id, epoch_range.epr_lo, epoch_range.epr_hi);

if (!param->ap_vos_agg)
vos_cont_set_mod_bound(cont->sc_hdl, epoch_range.epr_hi);

if (dss_xstream_is_busy())
flags &= ~VOS_AGG_FL_FORCE_MERGE;
rc = agg_cb(cont, &epoch_range, flags, param);
Expand Down
3 changes: 3 additions & 0 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@ dtx_handle_init(struct dtx_id *dti, daos_handle_t xoh, struct dtx_epoch *epoch,
dth->dth_for_migration = (flags & DTX_FOR_MIGRATION) ? 1 : 0;
dth->dth_ignore_uncommitted = (flags & DTX_IGNORE_UNCOMMITTED) ? 1 : 0;
dth->dth_prepared = (flags & DTX_PREPARED) ? 1 : 0;
dth->dth_epoch_owner = (flags & DTX_EPOCH_OWNER) ? 1 : 0;
dth->dth_aborted = 0;
dth->dth_already = 0;
dth->dth_need_validation = 0;
Expand Down Expand Up @@ -1853,6 +1854,8 @@ dtx_cont_register(struct ds_cont_child *cont)
D_GOTO(out, rc = -DER_NOMEM);
}

cont->sc_agg_eph_gap = d_sec2hlc(vos_get_agg_gap());

ds_cont_child_get(cont);
dbca->dbca_refs = 0;
dbca->dbca_cont = cont;
Expand Down
3 changes: 2 additions & 1 deletion src/dtx/tests/dts_structs.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ struct_dtx_handle(void **state)
SET_BITFIELD_1(dummy, dth_need_validation);
SET_BITFIELD_1(dummy, dth_ignore_uncommitted);
SET_BITFIELD_1(dummy, dth_local);
SET_BITFIELD_1(dummy, dth_epoch_owner);
SET_BITFIELD_1(dummy, dth_local_complete);
SET_BITFIELD(dummy, padding1, 13);
SET_BITFIELD(dummy, padding1, 12);

SET_FIELD(dummy, dth_dti_cos_count);
SET_FIELD(dummy, dth_dti_cos);
Expand Down
26 changes: 13 additions & 13 deletions src/engine/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,6 @@ enum {

static int sched_policy;

/*
* Time threshold for giving IO up throttling. If space pressure stays in the
* highest level for enough long time, we assume that no more space can be
* reclaimed and choose to give up IO throttling, so that ENOSPACE error could
* be returned to client earlier.
*
* To make time for aggregation reclaiming overwriteen space, this threshold
* should be longer than the DAOS_AGG_THRESHOLD.
*/
#define SCHED_DELAY_THRESH 40000 /* msecs */

struct pressure_ratio {
unsigned int pr_free; /* free space ratio */
unsigned int pr_gc_ratio; /* CPU percentage for GC & Aggregation */
Expand Down Expand Up @@ -943,12 +932,21 @@ is_gc_pending(struct sched_pool_info *spi)
return spi->spi_gc_ults && (spi->spi_gc_ults > spi->spi_gc_sleeping);
}

/* Just run into this space pressure situation recently? */
/*
* Just run into this space pressure situation recently?
*
* If space pressure stays in the highest level for enough long time, we assume
* that no more space can be reclaimed and choose to give up IO throttling, so
* that ENOSPACE error could be returned to client earlier.
*
* To make time for aggregation reclaiming overwriteen space, this threshold
* should be longer than VOS aggregation epoch gap with current HLC.
*/
static inline bool
is_pressure_recent(struct sched_info *info, struct sched_pool_info *spi)
{
D_ASSERT(info->si_cur_ts >= spi->spi_pressure_ts);
return (info->si_cur_ts - spi->spi_pressure_ts) < SCHED_DELAY_THRESH;
return (info->si_cur_ts - spi->spi_pressure_ts) < info->si_agg_gap;
}

static inline uint64_t
Expand Down Expand Up @@ -2256,6 +2254,8 @@ sched_run(ABT_sched sched)
return;
}

dx->dx_sched_info.si_agg_gap = (vos_get_agg_gap() + 10) * 1000; /* msecs */

while (1) {
/* Try to pick network poll ULT */
pool = pools[DSS_POOL_NET_POLL];
Expand Down
1 change: 1 addition & 0 deletions src/engine/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct sched_info {
/* Number of kicked requests for each type in current cycle */
uint32_t si_kicked_req_cnt[SCHED_REQ_MAX];
unsigned int si_stop:1;
uint64_t si_agg_gap;
};

struct mem_stats {
Expand Down
13 changes: 1 addition & 12 deletions src/include/daos/dtx.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -27,17 +27,6 @@
/* The time (in second) threshold for batched DTX commit. */
#define DTX_COMMIT_THRESHOLD_AGE 10

/*
* VOS aggregation should try to avoid aggregating in the epoch range where
* lots of data records are pending to commit, so the aggregation epoch upper
* bound is: current HLC - (DTX batched commit threshold + buffer period)
*
* To avoid conflicting of aggregation vs. transactions, any transactional
* update/fetch with epoch lower than the aggregation upper bound should be
* rejected and restarted.
*/
#define DAOS_AGG_THRESHOLD (DTX_COMMIT_THRESHOLD_AGE + 10) /* seconds */

enum dtx_target_flags {
/* The target only contains read-only operations for the DTX. */
DTF_RDONLY = (1 << 0),
Expand Down
6 changes: 6 additions & 0 deletions src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ struct ds_cont_child {
*/
uint64_t sc_ec_update_timestamp;

/*
* The gap between the max allowed aggregation epoch and current HLC. The modification
* with older epoch out of range may cause conflict with aggregation as to be rejected.
*/
uint64_t sc_agg_eph_gap;

/* The objects with committable DTXs in DRAM. */
daos_handle_t sc_dtx_cos_hdl;
/* The DTX COS-btree. */
Expand Down
6 changes: 5 additions & 1 deletion src/include/daos_srv/dtx_srv.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ struct dtx_handle {
dth_ignore_uncommitted : 1,
/* Local transaction */
dth_local : 1,
/* Locally generate the epoch. */
dth_epoch_owner : 1,
/* Flag to commit the local transaction */
dth_local_complete : 1, padding1 : 13;
dth_local_complete : 1, padding1 : 12;

/* The count the DTXs in the dth_dti_cos array. */
uint32_t dth_dti_cos_count;
Expand Down Expand Up @@ -287,6 +289,8 @@ enum dtx_flags {
DTX_RELAY = (1 << 10),
/** Local transaction */
DTX_LOCAL = (1 << 11),
/** Locally generate the epoch. */
DTX_EPOCH_OWNER = (1 << 12),
};

void
Expand Down
50 changes: 50 additions & 0 deletions src/include/daos_srv/vos.h
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,56 @@ vos_update_renew_epoch(daos_handle_t ioh, struct dtx_handle *dth);
void
vos_dtx_renew_epoch(struct dtx_handle *dth);

/**
* Calculate current locally known stable epoch for the given container.
*
* \param coh [IN] Container open handle
*
* \return The epoch on success, negative value if error.
*/
daos_epoch_t
vos_cont_get_local_stable_epoch(daos_handle_t coh);

/**
* Get global stable epoch for the given container.
*
* \param coh [IN] Container open handle
*
* \return The epoch on success, negative value if error.
*/
daos_epoch_t
vos_cont_get_global_stable_epoch(daos_handle_t coh);

/**
* Set global stable epoch for the given container.
*
* \param coh [IN] Container open handle
* \param epoch [IN] The epoch to be used as the new global stable epoch.
*
* \return Zero on success, negative value if error.
*/
int
vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch);

/**
* Set the lowest allowed modification epoch for the given container.
*
* \param coh [IN] Container open handle
* \param epoch [IN] The lowest allowed epoch for modification.
*
* \return Zero on success, negative value if error.
*/
int
vos_cont_set_mod_bound(daos_handle_t coh, uint64_t epoch);

/**
* Query the gap between the max allowed aggregation epoch and current HLC.
*
* \return The gap value in seconds.
*/
uint32_t
vos_get_agg_gap(void);

/**
* Get the recx/epoch list.
*
Expand Down
2 changes: 2 additions & 0 deletions src/include/daos_srv/vos_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ enum dtx_entry_flags {
* on all yet, need to be re-committed.
*/
DTE_PARTIAL_COMMITTED = (1 << 5),
/* The DTX epoch is sorted locally. */
DTE_EPOCH_SORTED = (1 << 6),
};

struct dtx_entry {
Expand Down
13 changes: 10 additions & 3 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2896,8 +2896,10 @@ ds_obj_rw_handler(crt_rpc_t *rpc)

rc = process_epoch(&orw->orw_epoch, &orw->orw_epoch_first,
&orw->orw_flags);
if (rc == PE_OK_LOCAL)
if (rc == PE_OK_LOCAL) {
orw->orw_flags &= ~ORF_EPOCH_UNCERTAIN;
dtx_flags |= DTX_EPOCH_OWNER;
}

if (obj_rpc_is_fetch(rpc)) {
struct dtx_handle *dth;
Expand Down Expand Up @@ -3856,8 +3858,10 @@ ds_obj_punch_handler(crt_rpc_t *rpc)

rc = process_epoch(&opi->opi_epoch, NULL /* epoch_first */,
&opi->opi_flags);
if (rc == PE_OK_LOCAL)
if (rc == PE_OK_LOCAL) {
opi->opi_flags &= ~ORF_EPOCH_UNCERTAIN;
dtx_flags |= DTX_EPOCH_OWNER;
}

version = opi->opi_map_ver;
max_ver = opi->opi_map_ver;
Expand Down Expand Up @@ -5110,6 +5114,7 @@ ds_obj_dtx_leader(struct daos_cpd_args *dca)
&dcsh->dcsh_epoch.oe_first,
&dcsh->dcsh_epoch.oe_rpc_flags);
if (rc == PE_OK_LOCAL) {
dtx_flags |= DTX_EPOCH_OWNER;
/*
* In this case, writes to local RDGs can use the chosen epoch
* without any uncertainty. This optimization is left to future
Expand Down Expand Up @@ -5701,8 +5706,10 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)

if (ocpi->ocpi_flags & ORF_LEADER) {
rc = process_epoch(&ocpi->ocpi_epoch, NULL /* epoch_first */, &ocpi->ocpi_flags);
if (rc == PE_OK_LOCAL)
if (rc == PE_OK_LOCAL) {
ocpi->ocpi_flags &= ~ORF_EPOCH_UNCERTAIN;
dtx_flags |= DTX_EPOCH_OWNER;
}
} else if (dct_nr == 1) {
rc = obj_coll_local(rpc, dcts[0].dct_shards, dce, &version, &ioc, NULL,
odm->odm_mbs, obj_coll_tgt_punch);
Expand Down
1 change: 1 addition & 0 deletions src/tests/ftest/util/server_utils_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class EngineYamlParameters(YamlParameters):
"D_LOG_FILE_APPEND_PID=1",
"DAOS_POOL_RF=4",
"CRT_EVENT_DELAY=1",
"DAOS_VOS_AGG_GAP=20",
"COVFILE=/tmp/test.cov"],
"ofi+tcp": [],
"ofi+tcp;ofi_rxm": [],
Expand Down
5 changes: 4 additions & 1 deletion src/vos/tests/vts_dtx.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch,
uint64_t dkey_hash, struct dtx_handle **dthp)
{
struct dtx_handle *dth;
int rc;

D_ALLOC_PTR(dth);
assert_non_null(dth);
Expand All @@ -66,6 +67,7 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch,
dth->dth_for_migration = 0;
dth->dth_ignore_uncommitted = 0;
dth->dth_prepared = 0;
dth->dth_epoch_owner = 0;
dth->dth_aborted = 0;
dth->dth_already = 0;
dth->dth_need_validation = 0;
Expand All @@ -91,7 +93,8 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch,
dth->dth_shares_inited = 1;

vos_dtx_rsrvd_init(dth);
vos_dtx_attach(dth, false, false);
rc = vos_dtx_attach(dth, false, false);
assert_rc_equal(rc, 0);

*dthp = dth;
}
Expand Down
12 changes: 6 additions & 6 deletions src/vos/tests/vts_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ test_args_init(struct io_test_args *args,
memset(args, 0, sizeof(*args));
memset(&vts_cntr, 0, sizeof(vts_cntr));

vts_epoch_gen = 1;
vts_epoch_gen = d_hlc_get();

rc = vts_ctx_init(&args->ctx, pool_size);
if (rc != 0)
Expand Down Expand Up @@ -1590,7 +1590,7 @@ vos_iterate_test(void **state)
struct all_info info = {0};
vos_iter_param_t param = {0};
struct vos_iter_anchors anchors = {0};
daos_epoch_t epoch = 1;
daos_epoch_t epoch = d_hlc_get();
int rc = 0;
unsigned long old_flags = arg->ta_flags;

Expand Down Expand Up @@ -2534,7 +2534,7 @@ static void gen_query_tree(struct io_test_args *arg, daos_unit_oid_t oid)
daos_key_t akey;
d_iov_t val_iov;
daos_recx_t recx;
daos_epoch_t epoch = 1;
daos_epoch_t epoch = d_hlc_get();
uint64_t dkey_value;
uint64_t akey_value;
int i, j;
Expand Down Expand Up @@ -2608,7 +2608,7 @@ io_query_key(void **state)
int i, j;
struct dtx_handle *dth;
struct dtx_id xid;
daos_epoch_t epoch = 1;
daos_epoch_t epoch = d_hlc_get();
daos_key_t dkey;
daos_key_t akey;
daos_key_t dkey_read;
Expand Down Expand Up @@ -2873,7 +2873,7 @@ io_query_key_punch_update(void **state)
{
struct io_test_args *arg = *state;
int rc = 0;
daos_epoch_t epoch = 1;
daos_epoch_t epoch = d_hlc_get();
daos_key_t dkey = { 0 };
daos_key_t akey;
daos_recx_t recx_read;
Expand Down Expand Up @@ -3006,7 +3006,7 @@ gang_sv_test(void **state)
char dkey_buf[UPDATE_DKEY_SIZE], akey_buf[UPDATE_AKEY_SIZE];
char *update_buf, *fetch_buf;
daos_size_t rsize = (27UL << 20); /* 27MB */
daos_epoch_t epoch = 1;
daos_epoch_t epoch = d_hlc_get();
int rc;

D_ALLOC(update_buf, rsize);
Expand Down
4 changes: 2 additions & 2 deletions src/vos/tests/vts_mvcc.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2020-2023 Intel Corporation.
* (C) Copyright 2020-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -1720,7 +1720,7 @@ setup_mvcc(void **state)
D_ASSERT(arg->custom == NULL);
D_ALLOC_PTR(mvcc_arg);
D_ASSERT(mvcc_arg != NULL);
mvcc_arg->epoch = 500;
mvcc_arg->epoch = d_hlc_get() + 500;
d_getenv_bool("CMOCKA_TEST_ABORT", &mvcc_arg->fail_fast);
arg->custom = mvcc_arg;
return 0;
Expand Down
Loading

0 comments on commit 0062344

Please sign in to comment.