Skip to content

Commit

Permalink
feat(split): replica server handle pause and cancel status (XiaoMi#681)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored and zhangyifan27 committed Jan 26, 2021
1 parent 4e4a1ab commit cb07440
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 25 deletions.
14 changes: 12 additions & 2 deletions include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions src/common/replication_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/replica/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ void replica::on_group_check_reply(error_code err,
req->config.status == partition_status::PS_POTENTIAL_SECONDARY) {
handle_learning_succeeded_on_primary(req->node, resp->learner_signature);
}
_split_mgr->primary_parent_handle_stop_split(req, resp);
}
}

Expand Down
18 changes: 11 additions & 7 deletions src/replica/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ void primary_context::cleanup(bool clean_pending_mutations)
// clean up checkpoint
CLEANUP_TASK_ALWAYS(checkpoint_task)

// clean up register child task
CLEANUP_TASK_ALWAYS(register_child_task)

// cleanup group bulk load
for (auto &kv : group_bulk_load_pending_replies) {
CLEANUP_TASK_ALWAYS(kv.second);
Expand All @@ -78,11 +75,9 @@ void primary_context::cleanup(bool clean_pending_mutations)

membership.ballot = 0;

caught_up_children.clear();

sync_send_write_request = false;

cleanup_bulk_load_states();

cleanup_split_states();
}

bool primary_context::is_cleaned()
Expand Down Expand Up @@ -170,6 +165,15 @@ void primary_context::cleanup_bulk_load_states()
ingestion_is_empty_prepare_sent = false;
}

void primary_context::cleanup_split_states()
{
CLEANUP_TASK_ALWAYS(register_child_task)

caught_up_children.clear();
sync_send_write_request = false;
split_stopped_secondary.clear();
}

bool secondary_context::cleanup(bool force)
{
CLEANUP_TASK(checkpoint_task, force)
Expand Down
6 changes: 6 additions & 0 deletions src/replica/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class primary_context

void cleanup_bulk_load_states();

void cleanup_split_states();

public:
// membership mgr, including learners
partition_configuration membership;
Expand Down Expand Up @@ -152,6 +154,10 @@ class primary_context
// primary parent register child on meta_server task
dsn::task_ptr register_child_task;

// Used partition split
// secondary replica address who has paused or canceled split
std::unordered_set<rpc_address> split_stopped_secondary;

// Used for bulk load
// group bulk_load response tasks of RPC_GROUP_BULK_LOAD for each secondary replica
node_tasks group_bulk_load_pending_replies;
Expand Down
98 changes: 95 additions & 3 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ void replica_split_manager::parent_start_split(
return;
}

// TODO(heyuchen): if partition is primary, reset split related varieties

if (status() == partition_status::PS_PRIMARY) {
_replica->_primary_states.cleanup_split_states();
}
_partition_version.store(_replica->_app_info.partition_count - 1);

_split_status = split_status::SPLITTING;
Expand Down Expand Up @@ -1144,6 +1145,17 @@ void replica_split_manager::trigger_primary_parent_split(
return;
}

if (meta_split_status == split_status::PAUSING ||
meta_split_status == split_status::CANCELING) {
parent_stop_split(meta_split_status);
return;
}

if (meta_split_status == split_status::PAUSED) {
dwarn_replica("split has been paused, ignore it");
return;
}

// TODO(heyuchen): add other split_status check
}

Expand All @@ -1169,7 +1181,11 @@ void replica_split_manager::trigger_secondary_parent_split(
return;
}

// TODO(heyuchen): add other split_status check, response will be used in future
if (request.meta_split_status == split_status::PAUSING ||
request.meta_split_status == split_status::CANCELING) { // secondary pause or cancel split
parent_stop_split(request.meta_split_status);
response.__set_is_split_stopped(true);
}
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down Expand Up @@ -1257,5 +1273,81 @@ void replica_split_manager::on_copy_mutation_reply(error_code ec,
// TBD
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::parent_stop_split(
split_status::type meta_split_status) // on parent partition
{
dassert_replica(status() == partition_status::PS_PRIMARY ||
status() == partition_status::PS_SECONDARY,
"wrong partition_status({})",
enum_to_string(status()));
dassert_replica(_split_status == split_status::SPLITTING ||
_split_status == split_status::NOT_SPLIT,
"wrong split_status({})",
enum_to_string(_split_status));

auto old_status = _split_status;
if (_split_status == split_status::SPLITTING) {
_stub->split_replica_error_handler(
_child_gpid,
std::bind(&replica_split_manager::child_handle_split_error,
std::placeholders::_1,
"stop partition split"));
parent_cleanup_split_context();
}
_partition_version.store(_replica->_app_info.partition_count - 1);

if (status() == partition_status::PS_PRIMARY) {
_replica->_primary_states.sync_send_write_request = false;
_replica->broadcast_group_check();
}
ddebug_replica(
"{} split succeed, status = {}, old split_status = {}, child partition_index = {}",
meta_split_status == split_status::PAUSING ? "pause" : "cancel",
enum_to_string(status()),
enum_to_string(old_status),
get_gpid().get_partition_index() + _replica->_app_info.partition_count);
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::primary_parent_handle_stop_split(
const std::shared_ptr<group_check_request> &req,
const std::shared_ptr<group_check_response> &resp) // on primary parent partition
{
if (!req->__isset.meta_split_status || (req->meta_split_status != split_status::PAUSING &&
req->meta_split_status != split_status::CANCELING)) {
// partition is not executing split or not stopping split
return;
}

if (!resp->__isset.is_split_stopped || !resp->is_split_stopped) {
// secondary has not stopped split
return;
}

_replica->_primary_states.split_stopped_secondary.insert(req->node);
auto count = 0;
for (auto &iter : _replica->_primary_states.statuses) {
if (iter.second == partition_status::PS_SECONDARY &&
_replica->_primary_states.split_stopped_secondary.find(iter.first) !=
_replica->_primary_states.split_stopped_secondary.end()) {
++count;
}
}
// all secondaries have already stop split succeed
if (count == _replica->_primary_states.membership.max_replica_count - 1) {
_replica->_primary_states.cleanup_split_states();
parent_send_notify_stop_request(req->meta_split_status);
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::parent_send_notify_stop_request(
split_status::type meta_split_status) // on primary parent
{
FAIL_POINT_INJECT_F("replica_parent_send_notify_stop_request", [](dsn::string_view) {});
// TODO(hyc): TBD
}

} // namespace replication
} // namespace dsn
9 changes: 9 additions & 0 deletions src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ class replica_split_manager : replica_base
// when child copy mutation synchronously, parent replica handle child ack
void on_copy_mutation_reply(dsn::error_code ec, ballot b, decree d);

// parent partition pause or cancel split
void parent_stop_split(split_status::type meta_split_status);

// called by `on_group_check_reply` in `replica_check.cpp`
// if group all replica pause/cancel split, send notify request to meta server
void primary_parent_handle_stop_split(const std::shared_ptr<group_check_request> &req,
const std::shared_ptr<group_check_response> &resp);
void parent_send_notify_stop_request(split_status::type meta_split_status);

//
// helper functions
//
Expand Down
Loading

0 comments on commit cb07440

Please sign in to comment.