Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

split: parent replica create child replica #291

Merged
merged 22 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_PRIVATE, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_SHARED, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_QUERY_CONFIGURATION_ALL, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_MEM_RELEASE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

// THREAD_POOL_META_SERVER
Expand Down Expand Up @@ -142,6 +143,8 @@ MAKE_EVENT_CODE_AIO(LPC_REPLICA_COPY_LAST_CHECKPOINT_DONE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_COLD_BACKUP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_COLD_BACKUP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_EXEC_COMMAND_ON_REPLICA, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_PARTITION_SPLIT, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ERROR, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LOW, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_REPLICATION_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH)
Expand Down
14 changes: 12 additions & 2 deletions include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 32 additions & 3 deletions src/dist/replication/common/replication_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/dist/replication/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ set(MY_PROJ_LIBS
dsn_nfs
dsn_cli
dsn_http
dsn_runtime
)

# Extra files that will be installed
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/duplication/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ set(MY_PROJ_LIBS dsn_meta_server
fmt
)

set(MY_BOOST_LIBS Boost::system Boost::filesystem)
set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)

set(MY_BINPLACES
config-test.ini
Expand Down
29 changes: 27 additions & 2 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// routines for replica stub
//
static replica *load(replica_stub *stub, const char *dir);
static replica *
newr(replica_stub *stub, gpid gpid, const app_info &app, bool restore_if_necessary);
// {parent_dir} is used in partition split for get_child_dir in replica_stub
static replica *newr(replica_stub *stub,
gpid gpid,
const app_info &app,
bool restore_if_necessary,
const std::string &parent_dir = "");

// return true when the mutation is valid for the current replica
bool replay_mutation(mutation_ptr &mu, bool is_private);
Expand Down Expand Up @@ -309,6 +313,17 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

std::string query_compact_state() const;

/////////////////////////////////////////////////////////////////
// partition split
// parent partition create child
void on_add_child(const group_check_request &request);

// child replica initialize config and state info
void init_child_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot);

// parent reset child information when partition split failed
void clean_up_parent_split_context();

private:
friend class ::dsn::replication::replication_checker;
friend class ::dsn::replication::test::test_checker;
Expand All @@ -318,6 +333,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_learn_test;
friend class replica_duplicator_manager;
friend class load_mutation;
friend class replica_split_test;

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down Expand Up @@ -364,6 +380,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
potential_secondary_context _potential_secondary_states;
// policy_name --> cold_backup_context
std::map<std::string, cold_backup_context_ptr> _cold_backup_contexts;
partition_split_context _split_states;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved

// timer task that running in replication-thread
dsn::task_ptr _collect_info_timer;
Expand Down Expand Up @@ -391,6 +408,14 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// duplication
std::unique_ptr<replica_duplicator_manager> _duplication_mgr;

// partition split
// _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition
// _child_gpid.app_id = 0 if parent partition not during partition split and child partition
dsn::gpid _child_gpid{0, 0};
hycdong marked this conversation as resolved.
Show resolved Hide resolved
// ballot when starting partition split coz split will stop if ballot changed
// _child_init_ballot = 0 if partition not during partition split
ballot _child_init_ballot{0};

// perf counters
perf_counter_wrapper _counter_private_log_size;
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
Expand Down
9 changes: 8 additions & 1 deletion src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1287,5 +1287,12 @@ void cold_backup_context::file_upload_complete(const std::string &filename)
_cur_upload_file_cnt -= 1;
_file_status[filename] = file_status::FileUploadComplete;
}

bool partition_split_context::cleanup(bool force)
{
parent_gpid.set_app_id(0);
hycdong marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
} // end namespace

} // namespace replication
} // namespace dsn
11 changes: 11 additions & 0 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,17 @@ class cold_backup_context : public ref_counter

typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;

class partition_split_context
{
public:
partition_split_context() {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_split_context() = default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_split_context还不完整,还有一些成员变量没有定义,构造函数也应该是类似于下面的方式,不会用到default的构造函数,所以就没加 = default

partition_split_context() : param1(0), param2(false)

// TODO(heyuchen): force will be used in further pull request
bool cleanup(bool force);

public:
gpid parent_gpid;
hycdong marked this conversation as resolved.
Show resolved Hide resolved
};

//---------------inline impl----------------------------------------------------------------

inline partition_status::type primary_context::get_node_status(::dsn::rpc_address addr) const
Expand Down
14 changes: 11 additions & 3 deletions src/dist/replication/lib/replica_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,18 @@ error_code replica::initialize_on_new()
return init_app_and_prepare_list(true);
}

/*static*/ replica *
replica::newr(replica_stub *stub, gpid gpid, const app_info &app, bool restore_if_necessary)
/*static*/ replica *replica::newr(replica_stub *stub,
gpid gpid,
const app_info &app,
bool restore_if_necessary,
const std::string &parent_dir)
{
std::string dir = stub->get_replica_dir(app.app_type.c_str(), gpid);
std::string dir;
if (parent_dir.empty()) {
dir = stub->get_replica_dir(app.app_type.c_str(), gpid);
} else {
dir = stub->get_child_dir(app.app_type.c_str(), gpid, parent_dir);
}
replica *rep = new replica(stub, gpid, app, dir.c_str(), restore_if_necessary);
error_code err;
if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) {
Expand Down
Loading