Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor: move primary learning preparation of cache into another fun…
Browse files Browse the repository at this point in the history
…ction
  • Loading branch information
neverchanje committed Dec 24, 2019
1 parent b0af591 commit 27cba8c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 65 deletions.
7 changes: 7 additions & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void notify_learn_completion();
error_code apply_learned_state_from_private_log(learn_state &state);

bool prepare_learn_from_cache(const learn_request &request,
decree learn_start_decree,
decree local_committed_decree,
remote_learner_state &learner_state,
learn_response &response,
bool &delayed_replay_prepare_list);

/////////////////////////////////////////////////////////////////
// failure handling
void handle_local_failure(error_code error);
Expand Down
145 changes: 80 additions & 65 deletions src/dist/replication/lib/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,62 +345,14 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
response.last_committed_decree = local_committed_decree;
response.err = ERR_OK;

// set prepare_start_decree when to-be-learn state is covered by prepare list,
// note min_decree can be NOT present in prepare list when list.count == 0
if (learn_start_decree > _prepare_list->min_decree() ||
(learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) {
if (learner_state.prepare_start_decree == invalid_decree) {
// start from (last_committed_decree + 1)
learner_state.prepare_start_decree = local_committed_decree + 1;

cleanup_preparing_mutations(false);

// the replayed prepare msg needs to be AFTER the learning response msg
// to reduce probability that preparing messages arrive remote early than
// learning response msg.
delayed_replay_prepare_list = true;

ddebug("%s: on_learn[%016" PRIx64
"]: learner = %s, set prepare_start_decree = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
local_committed_decree + 1);
}

response.prepare_start_decree = learner_state.prepare_start_decree;
} else {
learner_state.prepare_start_decree = invalid_decree;
}

// only learn mutation cache in range of [learn_start_decree, prepare_start_decree),
// in this case, the state on the PS should be contiguous (+ to-be-sent prepare list)
if (response.prepare_start_decree != invalid_decree) {
binary_writer writer;
int count = 0;
for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) {
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d);
mu->write_to(writer, nullptr);
count++;
}
response.type = learn_type::LT_CACHE;
response.state.meta = writer.get_buffer();
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, "
"learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", "
"learn_mutation_count = %d, learn_data_size = %d",
name(),
request.signature,
request.learner.to_string(),
learn_start_decree,
response.prepare_start_decree,
count,
response.state.meta.length());
}

// learn delta state or checkpoint
// in this case, the state on the PS is still incomplete
else {
if (!prepare_learn_from_cache(request,
learn_start_decree,
local_committed_decree,
learn_state,
response,
delayed_replay_prepare_list)) {
if (learn_start_decree > _app->last_durable_decree()) {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, "
"because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64
Expand Down Expand Up @@ -475,15 +427,15 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
err.to_string());
} else {
response.base_local_dir = _app->data_dir();
ddebug(
"%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, "
"learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
response.state.meta.length(),
static_cast<uint32_t>(response.state.files.size()),
response.state.to_decree_included);
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, "
"learned_meta_size = %u, learned_file_count = %u, learned_to_decree = "
"%" PRId64,
name(),
request.signature,
request.learner.to_string(),
response.state.meta.length(),
static_cast<uint32_t>(response.state.files.size()),
response.state.to_decree_included);
}
}
}
Expand Down Expand Up @@ -904,6 +856,69 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response
}
}

bool replica::prepare_learn_from_cache(const learn_request &request,
decree learn_start_decree,
decree local_committed_decree,
remote_learner_state &learner_state,
learn_response &response,
bool &delayed_replay_prepare_list)
{
// set prepare_start_decree when to-be-learn state is covered by prepare list,
// note min_decree can be NOT present in prepare list when list.count == 0
if (learn_start_decree > _prepare_list->min_decree() ||
(learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) {
if (learner_state.prepare_start_decree == invalid_decree) {
// start from (last_committed_decree + 1)
learner_state.prepare_start_decree = local_committed_decree + 1;

cleanup_preparing_mutations(false);

// the replayed prepare msg needs to be AFTER the learning response msg
// to reduce probability that preparing messages arrive remote early than
// learning response msg.
delayed_replay_prepare_list = true;

ddebug("%s: on_learn[%016" PRIx64
"]: learner = %s, set prepare_start_decree = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
local_committed_decree + 1);
}

response.prepare_start_decree = learner_state.prepare_start_decree;
} else {
learner_state.prepare_start_decree = invalid_decree;
}

// only learn mutation cache in range of [learn_start_decree, prepare_start_decree),
// in this case, the state on the PS should be contiguous (+ to-be-sent prepare list)
if (response.prepare_start_decree != invalid_decree) {
binary_writer writer;
int count = 0;
for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) {
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d);
mu->write_to(writer, nullptr);
count++;
}
response.type = learn_type::LT_CACHE;
response.state.meta = writer.get_buffer();
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, "
"learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", "
"learn_mutation_count = %d, learn_data_size = %d",
name(),
request.signature,
request.learner.to_string(),
learn_start_decree,
response.prepare_start_decree,
count,
response.state.meta.length());
return true;
}
return false;
}

void replica::on_copy_remote_state_completed(error_code err,
size_t size,
uint64_t copy_start_time,
Expand Down Expand Up @@ -1470,5 +1485,5 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)

return err;
}
}
} // namespace
} // namespace replication
} // namespace dsn

0 comments on commit 27cba8c

Please sign in to comment.