Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(split): child replica apply private logs, in-memory mutations an…
Browse files Browse the repository at this point in the history
…d catch up parent (#319)
  • Loading branch information
hycdong authored Feb 18, 2020
1 parent 0db7b36 commit baa3e91
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 48 deletions.
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class replication_app_base : public replica_base
// routines for replica internal usage
friend class replica;
friend class replica_stub;
friend class mock_replica;

::dsn::error_code open_internal(replica *r);
::dsn::error_code
Expand Down
1 change: 0 additions & 1 deletion src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <dsn/utility/fail_point.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace replication {
Expand Down
15 changes: 11 additions & 4 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,23 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
uint64_t total_file_size,
decree last_committed_decree);

error_code child_replay_private_log(std::vector<std::string> plog_files,
// TODO(heyuchen): total_file_size is used for split perf-counter in further pull request
// Applies mutation logs that were learned from the parent of this child.
// This stage follows after that child applies the checkpoint of parent, and begins to apply the
// mutations.
// \param last_committed_decree: parent's last_committed_decree when the checkpoint was
// generated.
error_code child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree);

error_code child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree);

// child catch up parent states while executing async learn task
void child_catch_up_states();

// child send notification to primary parent when it finish async learn
void child_notify_catch_up();

// return true if parent status is valid
bool parent_check_states();

Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/lib/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ void replica::catch_up_with_private_logs(partition_status::type s)
[this, err]() { this->on_learn_remote_state_completed(err); },
get_gpid().thread_hash());
_potential_secondary_states.learn_remote_files_completed_task->enqueue();
} else if (s == partition_status::PS_PARTITION_SPLIT) {
_split_states.async_learn_task =
tasking::enqueue(LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica::child_catch_up_states, this),
get_gpid().thread_hash());
} else {
_secondary_states.checkpoint_completed_task =
tasking::create_task(LPC_CHECKPOINT_REPLICA_COMPLETED,
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ bool partition_split_context::cleanup(bool force)

parent_gpid.set_app_id(0);
is_prepare_list_copied = false;
is_caught_up = false;
return true;
}

Expand Down
6 changes: 4 additions & 2 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,15 @@ typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
class partition_split_context
{
public:
partition_split_context() : is_prepare_list_copied(false) {}
bool cleanup(bool force);
bool is_cleaned() const;

public:
gpid parent_gpid;
bool is_prepare_list_copied;
// whether child has copied parent prepare list
bool is_prepare_list_copied{false};
// whether child has catched up with parent during async-learn
bool is_caught_up{false};

// child replica async learn parent states
dsn::task_ptr async_learn_task;
Expand Down
167 changes: 144 additions & 23 deletions src/dist/replication/lib/replica_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void replica::child_init_replica(gpid parent_gpid,
// init split states
_split_states.parent_gpid = parent_gpid;
_split_states.is_prepare_list_copied = false;
_split_states.is_caught_up = false;

ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid);

Expand Down Expand Up @@ -279,34 +280,28 @@ void replica::child_learn_states(learn_state lstate,
plog_files.size(),
mutation_list.size());

// apply parent checkpoint
error_code err;
auto cleanup = defer([this, &err]() {
if (err != ERR_OK) {
child_handle_async_learn_error();
}
});

// apply parent checkpoint
err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate);
if (err != ERR_OK) {
derror_replica("failed to apply checkpoint, error={}", err);
return;
}

// replay parent private log
err = child_replay_private_log(plog_files, total_file_size, last_committed_decree);
// replay parent private log and learn in-memory mutations
err =
child_apply_private_logs(plog_files, mutation_list, total_file_size, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to replay private log, error={}", err);
return;
}

// learn parent in-memory mutations
err = child_learn_mutations(mutation_list, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to learn mutations, error={}", err);
return;
}

// generate a checkpoint synchronously
err = _app->sync_checkpoint();
if (err != ERR_OK) {
Expand All @@ -330,29 +325,155 @@ void replica::child_learn_states(learn_state lstate,
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_replay_private_log(std::vector<std::string> plog_files,
error_code replica::child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_replay_private_log",
[](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
}
FAIL_POINT_INJECT_F("replica_child_apply_private_logs", [](dsn::string_view arg) {
return error_code::try_get(arg.data(), ERR_OK);
});

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_learn_mutations", [](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status={}", enum_to_string(status()));
return ERR_INVALID_STATE;
}

error_code ec;
int64_t offset;
// temp prepare_list used for apply states
prepare_list plist(this,
_app->last_committed_decree(),
_options->max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree == _app->last_committed_decree() + 1) {
_app->apply_mutation(mu);
}
});

// replay private log
ec = mutation_log::replay(plog_files,
[this, &plist](int log_length, mutation_ptr &mu) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
return false;
}
mutation_ptr origin_mu = plist.get_mutation_by_decree(d);
if (origin_mu != nullptr &&
origin_mu->data.header.ballot >= mu->data.header.ballot) {
return false;
}
plist.prepare(mu, partition_status::PS_SECONDARY);
return true;
},
offset);
if (ec != ERR_OK) {
dwarn_replica(
"replay private_log files failed, file count={}, app last_committed_decree={}",
plog_files.size(),
_app->last_committed_decree());
return ec;
}

ddebug_replica("replay private_log files succeed, file count={}, app last_committed_decree={}",
plog_files.size(),
_app->last_committed_decree());

// apply in-memory mutations if replay private logs succeed
int count = 0;
for (mutation_ptr &mu : mutation_list) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
continue;
}
mutation_ptr origin_mu = plist.get_mutation_by_decree(d);
if (origin_mu != nullptr && origin_mu->data.header.ballot >= mu->data.header.ballot) {
continue;
}
if (!mu->is_logged()) {
mu->set_logged();
}
plist.prepare(mu, partition_status::PS_SECONDARY);
++count;
}
plist.commit(last_committed_decree, COMMIT_TO_DECREE_HARD);
ddebug_replica(
"apply in-memory mutations succeed, mutation count={}, app last_committed_decree={}",
count,
_app->last_committed_decree());

return ec;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_catch_up_states() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_catch_up_states", [](dsn::string_view) {});

if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status, status is {}", enum_to_string(status()));
return;
}

// parent will copy mutations to child during async-learn, as a result:
// - child prepare_list last_committed_decree = parent prepare_list last_committed_decree, also
// is catch_up goal_decree
// - local_decree is child local last_committed_decree which is the last decree in async-learn.
decree goal_decree = _prepare_list->last_committed_decree();
decree local_decree = _app->last_committed_decree();

// there are mutations written to parent during async-learn
// child does not catch up parent, there are still some mutations child not learn
if (local_decree < goal_decree) {
if (local_decree >= _prepare_list->min_decree()) {
// all missing mutations are all in prepare list
dwarn_replica("there are some in-memory mutations should be learned, app "
"last_committed_decree={}, "
"goal decree={}, prepare_list min_decree={}",
local_decree,
goal_decree,
_prepare_list->min_decree());
for (decree d = local_decree + 1; d <= goal_decree; ++d) {
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "");
error_code ec = _app->apply_mutation(mu);
if (ec != ERR_OK) {
child_handle_split_error("child_catchup failed because apply mutation failed");
return;
}
}
} else {
// some missing mutations have already in private log
// should call `catch_up_with_private_logs` to catch up all missing mutations
dwarn_replica(
"there are some private logs should be learned, app last_committed_decree="
"{}, prepare_list min_decree={}, please wait",
local_decree,
_prepare_list->min_decree());
_split_states.async_learn_task = tasking::enqueue(
LPC_CATCHUP_WITH_PRIVATE_LOGS,
tracker(),
[this]() {
catch_up_with_private_logs(partition_status::PS_PARTITION_SPLIT);
_split_states.async_learn_task = nullptr;
},
get_gpid().thread_hash());
return;
}
}

ddebug_replica("child catch up parent states, goal decree={}, local decree={}",
_prepare_list->last_committed_decree(),
_app->last_committed_decree());
_split_states.is_caught_up = true;

child_notify_catch_up();
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_notify_catch_up() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_notify_catch_up", [](dsn::string_view) {});
// TODO(heyuchen): TBD
}

Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/lib/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <fstream>
#include <sstream>
#include <memory>
#include <dsn/utility/fail_point.h>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -456,6 +457,9 @@ int replication_app_base::on_batched_write_requests(int64_t decree,

::dsn::error_code replication_app_base::apply_mutation(const mutation *mu)
{
FAIL_POINT_INJECT_F("replication_app_base_apply_mutation",
[](dsn::string_view) { return ERR_OK; });

dassert(mu->data.header.decree == last_committed_decree() + 1,
"invalid mutation decree, decree = %" PRId64 " VS %" PRId64 "",
mu->data.header.decree,
Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ class mock_replica : public replica
void set_init_child_ballot(ballot b) { _child_init_ballot = b; }
void set_last_committed_decree(decree d) { _prepare_list->reset(d); }
prepare_list *get_plist() { return _prepare_list; }
void prepare_list_truncate(decree d) { _prepare_list->truncate(d); }
void prepare_list_commit_hard(decree d) { _prepare_list->commit(d, COMMIT_TO_DECREE_HARD); }
decree get_app_last_committed_decree() { return _app->last_committed_decree(); }
void set_app_last_committed_decree(decree d) { _app->_last_committed_decree = d; }

private:
decree _max_gced_decree{invalid_decree - 1};
Expand Down
Loading

0 comments on commit baa3e91

Please sign in to comment.