diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index cc6d46f985..c44595cbf9 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -234,6 +234,13 @@ class replica : public serverlet, public ref_counter, public replica_ba void notify_learn_completion(); error_code apply_learned_state_from_private_log(learn_state &state); + bool prepare_learn_from_cache(const learn_request &request, + decree learn_start_decree, + decree local_committed_decree, + remote_learner_state &learner_state, + learn_response &response, + bool &delayed_replay_prepare_list); + ///////////////////////////////////////////////////////////////// // failure handling void handle_local_failure(error_code error); diff --git a/src/dist/replication/lib/replica_learn.cpp b/src/dist/replication/lib/replica_learn.cpp index da6f659d78..37ab8347d0 100644 --- a/src/dist/replication/lib/replica_learn.cpp +++ b/src/dist/replication/lib/replica_learn.cpp @@ -345,62 +345,14 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) response.last_committed_decree = local_committed_decree; response.err = ERR_OK; - // set prepare_start_decree when to-be-learn state is covered by prepare list, - // note min_decree can be NOT present in prepare list when list.count == 0 - if (learn_start_decree > _prepare_list->min_decree() || - (learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) { - if (learner_state.prepare_start_decree == invalid_decree) { - // start from (last_committed_decree + 1) - learner_state.prepare_start_decree = local_committed_decree + 1; - - cleanup_preparing_mutations(false); - - // the replayed prepare msg needs to be AFTER the learning response msg - // to reduce probability that preparing messages arrive remote early than - // learning response msg. - delayed_replay_prepare_list = true; - - ddebug("%s: on_learn[%016" PRIx64 - "]: learner = %s, set prepare_start_decree = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - local_committed_decree + 1); - } - - response.prepare_start_decree = learner_state.prepare_start_decree; - } else { - learner_state.prepare_start_decree = invalid_decree; - } - - // only learn mutation cache in range of [learn_start_decree, prepare_start_decree), - // in this case, the state on the PS should be contiguous (+ to-be-sent prepare list) - if (response.prepare_start_decree != invalid_decree) { - binary_writer writer; - int count = 0; - for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) { - auto mu = _prepare_list->get_mutation_by_decree(d); - dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d); - mu->write_to(writer, nullptr); - count++; - } - response.type = learn_type::LT_CACHE; - response.state.meta = writer.get_buffer(); - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, " - "learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", " - "learn_mutation_count = %d, learn_data_size = %d", - name(), - request.signature, - request.learner.to_string(), - learn_start_decree, - response.prepare_start_decree, - count, - response.state.meta.length()); - } - // learn delta state or checkpoint // in this case, the state on the PS is still incomplete - else { + if (!prepare_learn_from_cache(request, + learn_start_decree, + local_committed_decree, + learn_state, + response, + delayed_replay_prepare_list)) { if (learn_start_decree > _app->last_durable_decree()) { ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " "because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 @@ -475,15 +427,15 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) err.to_string()); } else { response.base_local_dir = _app->data_dir(); - ddebug( - "%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, " - "learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - response.state.meta.length(), - static_cast(response.state.files.size()), - response.state.to_decree_included); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, " + "learned_meta_size = %u, learned_file_count = %u, learned_to_decree = " + "%" PRId64, + name(), + request.signature, + request.learner.to_string(), + response.state.meta.length(), + static_cast(response.state.files.size()), + response.state.to_decree_included); } } } @@ -904,6 +856,69 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response } } +bool replica::prepare_learn_from_cache(const learn_request &request, + decree learn_start_decree, + decree local_committed_decree, + remote_learner_state &learner_state, + learn_response &response, + bool &delayed_replay_prepare_list) +{ + // set prepare_start_decree when to-be-learn state is covered by prepare list, + // note min_decree can be NOT present in prepare list when list.count == 0 + if (learn_start_decree > _prepare_list->min_decree() || + (learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) { + if (learner_state.prepare_start_decree == invalid_decree) { + // start from (last_committed_decree + 1) + learner_state.prepare_start_decree = local_committed_decree + 1; + + cleanup_preparing_mutations(false); + + // the replayed prepare msg needs to be AFTER the learning response msg + // to reduce probability that preparing messages arrive remote early than + // learning response msg. + delayed_replay_prepare_list = true; + + ddebug("%s: on_learn[%016" PRIx64 + "]: learner = %s, set prepare_start_decree = %" PRId64, + name(), + request.signature, + request.learner.to_string(), + local_committed_decree + 1); + } + + response.prepare_start_decree = learner_state.prepare_start_decree; + } else { + learner_state.prepare_start_decree = invalid_decree; + } + + // only learn mutation cache in range of [learn_start_decree, prepare_start_decree), + // in this case, the state on the PS should be contiguous (+ to-be-sent prepare list) + if (response.prepare_start_decree != invalid_decree) { + binary_writer writer; + int count = 0; + for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) { + auto mu = _prepare_list->get_mutation_by_decree(d); + dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d); + mu->write_to(writer, nullptr); + count++; + } + response.type = learn_type::LT_CACHE; + response.state.meta = writer.get_buffer(); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, " + "learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", " + "learn_mutation_count = %d, learn_data_size = %d", + name(), + request.signature, + request.learner.to_string(), + learn_start_decree, + response.prepare_start_decree, + count, + response.state.meta.length()); + return true; + } + return false; +} + void replica::on_copy_remote_state_completed(error_code err, size_t size, uint64_t copy_start_time, @@ -1470,5 +1485,5 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state) return err; } -} -} // namespace +} // namespace replication +} // namespace dsn