Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-4881 sched: CPU relaxing on idle #4332

Merged
merged 17 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/container/container_iv.c
Original file line number Diff line number Diff line change
Expand Up @@ -1249,8 +1249,9 @@ cont_iv_prop_fetch(struct ds_iv_ns *ns, uuid_t cont_uuid,
uuid_copy(arg.cont_uuid, cont_uuid);
arg.prop = cont_prop;
arg.eventual = eventual;
rc = dss_ult_create(cont_iv_prop_fetch_ult, &arg, DSS_XS_SYS,
0, DSS_DEEP_STACK_SZ, NULL);
/* XXX: EC aggregation periodically fetches cont prop */
rc = dss_ult_periodic(cont_iv_prop_fetch_ult, &arg, DSS_XS_SYS, 0,
DSS_DEEP_STACK_SZ, NULL);
if (rc)
D_GOTO(out, rc);

Expand Down
3 changes: 2 additions & 1 deletion src/container/srv_container.c
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,8 @@ ds_cont_tgt_refresh_agg_eph(uuid_t pool_uuid, uuid_t cont_uuid,
uuid_copy(arg.cont_uuid, cont_uuid);
arg.min_eph = eph;

rc = dss_task_collective(cont_refresh_vos_agg_eph_one, &arg, 0);
rc = dss_task_collective(cont_refresh_vos_agg_eph_one, &arg,
DSS_ULT_FL_PERIODIC);
return rc;
}

Expand Down
5 changes: 3 additions & 2 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ cont_start_agg_ult(struct ds_cont_child *cont)
return 0;

rc = dss_ult_create(cont_aggregate_ult, cont, DSS_XS_SELF,
0, 0, &agg_ult);
0, DSS_DEEP_STACK_SZ, &agg_ult);
if (rc) {
D_ERROR(DF_CONT"[%d]: Failed to create aggregation ULT. %d\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
Expand Down Expand Up @@ -2328,7 +2328,8 @@ ds_cont_tgt_ec_eph_query_ult(void *data)
coll_args.ca_aggregator = pool;
coll_args.ca_func_args = &coll_args.ca_stream_args;

rc = dss_thread_collective_reduce(&coll_ops, &coll_args, 0);
rc = dss_thread_collective_reduce(&coll_ops, &coll_args,
DSS_ULT_FL_PERIODIC);
if (rc) {
D_ERROR(DF_UUID": Can not collect min epoch: %d\n",
DP_UUID(pool->sp_uuid), rc);
Expand Down
40 changes: 31 additions & 9 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,28 @@ dtx_batched_commit(void *arg)
{
struct dss_module_info *dmi = dss_get_module_info();
struct dtx_batched_commit_args *dbca;
struct sched_req_attr attr = { 0 };
uuid_t anonym_uuid;
struct sched_request *sched_req;
struct dtx_batched_commit_args *tmp;

uuid_clear(anonym_uuid);
sched_req_attr_init(&attr, SCHED_REQ_ANONYM, &anonym_uuid);
sched_req = sched_req_get(&attr, ABT_THREAD_NULL);
if (sched_req == NULL) {
D_ERROR("Failed to get sched request.\n");
return;
}

dmi->dmi_dtx_req = sched_req;
dmi->dmi_dtx_batched_started = 1;

while (1) {
struct dtx_entry **dtes = NULL;
struct ds_cont_child *cont;
struct dtx_stat stat = { 0 };
int cnt;
int rc;
struct dtx_entry **dtes = NULL;
struct ds_cont_child *cont;
struct dtx_stat stat = { 0 };
int cnt, rc;
int sleep_time = 10; /* ms */

if (d_list_empty(&dmi->dmi_dtx_batched_list))
goto check;
Expand All @@ -175,6 +187,7 @@ dtx_batched_commit(void *arg)
(stat.dtx_oldest_committable_time != 0 &&
dtx_hlc_age2sec(stat.dtx_oldest_committable_time) >
DTX_COMMIT_THRESHOLD_AGE)) {
sleep_time = 0;
cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT,
NULL, DAOS_EPOCH_MAX,
&dtes);
Expand All @@ -199,6 +212,7 @@ dtx_batched_commit(void *arg)
stat.dtx_oldest_committed_time != 0 &&
dtx_hlc_age2sec(stat.dtx_oldest_committed_time) >=
DTX_AGG_THRESHOLD_AGE_UPPER))) {
sleep_time = 0;
ds_cont_child_get(cont);
cont->sc_dtx_aggregating = 1;
rc = dss_ult_create(dtx_aggregate, cont, DSS_XS_SELF,
Expand All @@ -210,13 +224,13 @@ dtx_batched_commit(void *arg)
}

ds_cont_child_put(cont);

check:
if (dss_xstream_exiting(dmi->dmi_xstream))
break;

ABT_thread_yield();
sched_req_sleep(sched_req, sleep_time);
}
dmi->dmi_dtx_req = NULL;
sched_req_put(sched_req);

d_list_for_each_entry_safe(dbca, tmp, &dmi->dmi_dtx_batched_list,
dbca_link)
Expand Down Expand Up @@ -775,8 +789,16 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_child *cont,
dth->dth_dkey_hash, dth->dth_epoch, flags);
dtx_entry_put(dte);
if (rc == 0) {
if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE))
if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) {
vos_dtx_mark_committable(dth);
if (cont->sc_dtx_committable_count >
DTX_THRESHOLD_COUNT) {
struct dss_module_info *dmi;

dmi = dss_get_module_info();
sched_req_wakeup(dmi->dmi_dtx_req);
}
}
} else {
dth->dth_sync = 1;
}
Expand Down
32 changes: 24 additions & 8 deletions src/include/daos_srv/daos_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ struct dss_module_info {
d_list_t dmi_dtx_batched_list;
/* the profile information */
struct daos_profile *dmi_dp;
struct sched_request *dmi_dtx_req;
};

extern struct dss_module_key daos_srv_modkey;
Expand Down Expand Up @@ -343,6 +344,12 @@ bool sched_req_is_aborted(struct sched_request *req);
*/
int sched_req_space_check(struct sched_request *req);

/**
* Wrapper of ABT_cond_wait(), inform scheduler that it's going
* to be blocked for a relative long time.
*/
void sched_cond_wait(ABT_cond cond, ABT_mutex mutex);

static inline bool
dss_ult_exiting(struct sched_request *req)
{
Expand Down Expand Up @@ -434,7 +441,7 @@ struct dss_module {
enum dss_xs_type {
/** current xstream */
DSS_XS_SELF = -1,
/** operations needs accessing VOS */
/** operations need to access VOS */
DSS_XS_VOS = 0,
/** forward/dispatch IO request for TX coordinator */
DSS_XS_IOFW = 1,
Expand All @@ -448,16 +455,24 @@ enum dss_xs_type {

int dss_parameters_set(unsigned int key_id, uint64_t value);

typedef ABT_pool (*dss_abt_pool_choose_cb_t)(crt_rpc_t *rpc, ABT_pool *pools);
enum dss_ult_flags {
/* Periodically created ULTs */
DSS_ULT_FL_PERIODIC = (1 << 0),
};

void dss_abt_pool_choose_cb_register(unsigned int mod_id,
dss_abt_pool_choose_cb_t cb);
int dss_ult_create(void (*func)(void *), void *arg, int xs_type, int tgt_id,
size_t stack_size, ABT_thread *ult);
int dss_ult_execute(int (*func)(void *), void *arg, void (*user_cb)(void *),
void *cb_args, int xs_type, int tgt_id, size_t stack_size);
int dss_ult_create_all(void (*func)(void *), void *arg, bool main);

/*
* If server wants to create ULTs periodically, it should call this special
* ult create function to avoid bumping the 'xstream busy timestamp'.
*/
int dss_ult_periodic(void (*func)(void *), void *arg, int xs_type, int tgt_id,
size_t stack_size, ABT_thread *ult);

int dss_sleep(uint64_t ms);

/* Pack return codes with additional argument to reduce */
Expand Down Expand Up @@ -534,12 +549,13 @@ struct dss_coll_args {
*/
int
dss_task_collective_reduce(struct dss_coll_ops *ops,
struct dss_coll_args *coll_args, int flag);
struct dss_coll_args *coll_args, unsigned int flags);
int
dss_thread_collective_reduce(struct dss_coll_ops *ops,
struct dss_coll_args *coll_args, int flag);
int dss_task_collective(int (*func)(void *), void *arg, int flag);
int dss_thread_collective(int (*func)(void *), void *arg, int flag);
struct dss_coll_args *coll_args,
unsigned int flags);
int dss_task_collective(int (*func)(void *), void *arg, unsigned int flags);
int dss_thread_collective(int (*func)(void *), void *arg, unsigned int flags);

struct dss_module *dss_module_get(int mod_id);
/* Convert Argobots errno to DAOS ones. */
Expand Down
8 changes: 6 additions & 2 deletions src/iosrv/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -694,15 +694,19 @@ server_fini(bool force)
D_INFO("drpc_fini() done\n");
server_init_state_fini();
D_INFO("server_init_state_fini() done\n");
dss_srv_fini(force);
D_INFO("dss_srv_fini() done\n");
/*
* Client stuff finalization needs be done after all ULTs drained
* in dss_srv_fini().
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What problem does the original order have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some ULTs like EC aggregation ULT, rebuild ULT uses client stack, like dsc_pool/cont_open/close(), so we need to finalize the client stuff like daos_hhash_fini() after all ULTs drained.

if (dss_mod_facs & DSS_FAC_LOAD_CLI) {
daos_fini();
} else {
pl_fini();
daos_hhash_fini();
}
D_INFO("daos_fini() or pl_fini() done\n");
dss_srv_fini(force);
D_INFO("dss_srv_fini() done\n");
dss_module_unload_all();
D_INFO("dss_module_unload_all() done\n");
ds_iv_fini();
Expand Down
Loading