Skip to content

Commit

Permalink
feat(split): small refactor and fix of replica split XiaoMi#636
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong committed Oct 14, 2020
1 parent f8067b8 commit f5ce126
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 26 deletions.
49 changes: 24 additions & 25 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace replication {
replica_split_manager::replica_split_manager(replica *r)
: replica_base(r), _replica(r), _stub(r->get_replica_stub())
{
_partition_version = _replica->_app_info.partition_count - 1;
_partition_version.store(_replica->_app_info.partition_count - 1);
}

replica_split_manager::~replica_split_manager() {}
Expand Down Expand Up @@ -104,7 +104,7 @@ void replica_split_manager::child_init_replica(gpid parent_gpid,
FAIL_POINT_INJECT_F("replica_child_init_replica", [](dsn::string_view) {});

if (status() != partition_status::PS_INACTIVE) {
derror_replica("wrong status({})", enum_to_string(status()));
dwarn_replica("wrong status({})", enum_to_string(status()));
_stub->split_replica_error_handler(
parent_gpid,
std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1));
Expand Down Expand Up @@ -209,7 +209,7 @@ void replica_split_manager::parent_prepare_states(const std::string &dir) // on
// generate checkpoint
error_code ec = _replica->_app->copy_checkpoint_to_dir(dir.c_str(), &checkpoint_decree);
if (ec == ERR_OK) {
ddebug_replica("prepare checkpoint succeed: checkpoint dir={}, checkpoint decree={}",
ddebug_replica("prepare checkpoint succeed: checkpoint dir = {}, checkpoint decree = {}",
dir,
checkpoint_decree);
parent_states.to_decree_included = checkpoint_decree;
Expand Down Expand Up @@ -238,21 +238,17 @@ void replica_split_manager::parent_prepare_states(const std::string &dir) // on
std::make_shared<prepare_list>(_replica, *_replica->_prepare_list);
plist->truncate(last_committed_decree());

dassert_replica(last_committed_decree() == checkpoint_decree || !mutation_list.empty() ||
!files.empty(),
"last_committed_decree({}) VS checkpoint_decree({}), mutation_list count={}, "
"private log count={}",
last_committed_decree(),
checkpoint_decree,
mutation_list.size(),
files.size());

ddebug_replica("prepare state succeed: {} mutations, {} private log files, total file size={}, "
"last_committed_decree={}",
mutation_list.size(),
files.size(),
total_file_size,
last_committed_decree());
dcheck_eq(last_committed_decree(), checkpoint_decree);
dcheck_gt(mutation_list.size(), 0);
dcheck_gt(files.size(), 0);

ddebug_replica(
"prepare state succeed: {} mutations, {} private log files, total file size = {}, "
"last_committed_decree = {}",
mutation_list.size(),
files.size(),
total_file_size,
last_committed_decree());

ec = _stub->split_replica_exec(LPC_PARTITION_SPLIT,
_child_gpid,
Expand Down Expand Up @@ -330,7 +326,7 @@ void replica_split_manager::child_learn_states(learn_state lstate,
FAIL_POINT_INJECT_F("replica_child_learn_states", [](dsn::string_view) {});

if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status({})", enum_to_string(status()));
derror_replica("wrong status({})", enum_to_string(status()));
child_handle_async_learn_error();
return;
}
Expand Down Expand Up @@ -419,7 +415,7 @@ replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi

// replay private log
ec = mutation_log::replay(plog_files,
[this, &plist](int log_length, mutation_ptr &mu) {
[&plist](int log_length, mutation_ptr &mu) {
decree d = mu->data.header.decree;
if (d <= plist.last_committed_decree()) {
return false;
Expand Down Expand Up @@ -563,8 +559,8 @@ void replica_split_manager::child_notify_catch_up() // on child partition

notify_catch_up_rpc rpc(std::move(request),
RPC_SPLIT_NOTIFY_CATCH_UP,
0_ms,
0,
/*never timeout*/ 0_ms,
/*partition_hash*/ 0,
_replica->_split_states.parent_gpid.thread_hash());
rpc.call(_replica->_config.primary, tracker(), [this, rpc](error_code ec) mutable {
auto response = rpc.response();
Expand Down Expand Up @@ -994,8 +990,11 @@ void replica_split_manager::parent_send_register_request(
request.parent_config.ballot,
request.child_config.ballot);

register_child_rpc rpc(
std::move(req), RPC_CM_REGISTER_CHILD_REPLICA, 0_ms, 0, get_gpid().thread_hash());
register_child_rpc rpc(std::move(req),
RPC_CM_REGISTER_CHILD_REPLICA,
/*never timeout*/ 0_ms,
/*partition_hash*/ 0,
get_gpid().thread_hash());
_replica->_primary_states.register_child_task =
rpc.call(meta_address, tracker(), [this, rpc](error_code ec) mutable {
on_register_child_on_meta_reply(ec, rpc.request(), rpc.response());
Expand Down Expand Up @@ -1111,11 +1110,11 @@ void replica_split_manager::child_partition_active(
return;
}

ddebug_replica("child partition become active");
_stub->_counter_replicas_splitting_recent_split_succ_count->increment();
_replica->_primary_states.last_prepare_decree_on_new_primary =
_replica->_prepare_list->max_decree();
_replica->update_configuration(config);
ddebug_replica("child partition is active, status={}", enum_to_string(status()));
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
2 changes: 1 addition & 1 deletion src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class replica_split_manager : replica_base

// _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition
// _child_gpid.app_id = 0 for parent partition not in partition split and child partition
dsn::gpid _child_gpid{0, 0};
gpid _child_gpid{0, 0};
// ballot when starting partition split and split will stop if ballot changed
// _child_init_ballot = 0 if partition not in partition split
ballot _child_init_ballot{0};
Expand Down

0 comments on commit f5ce126

Please sign in to comment.