From 82dbc5b2afb888e745d6a7e284798d43bc71555e Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 8 Aug 2019 17:32:13 +0800 Subject: [PATCH 01/17] split: parent replica create child replica --- .../dsn/dist/replication/replication.codes.h | 2 + .../dsn/dist/replication/replication_types.h | 14 +- .../replication/common/replication_types.cpp | 35 ++++- src/dist/replication/lib/replica.cpp | 4 + src/dist/replication/lib/replica.h | 26 +++- src/dist/replication/lib/replica_context.cpp | 9 +- src/dist/replication/lib/replica_context.h | 10 ++ src/dist/replication/lib/replica_init.cpp | 9 +- src/dist/replication/lib/replica_split.cpp | 106 ++++++++++++++ src/dist/replication/lib/replica_stub.cpp | 136 +++++++++++++++++- src/dist/replication/lib/replica_stub.h | 43 +++++- src/dist/replication/replication.thrift | 6 +- .../replica_test/unit_test/CMakeLists.txt | 7 +- .../test/replica_test/unit_test/mock_utils.h | 84 ++++++++++- .../unit_test/replica_split_test.cpp | 86 +++++++++++ 15 files changed, 558 insertions(+), 19 deletions(-) create mode 100644 src/dist/replication/lib/replica_split.cpp create mode 100644 src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index eb2dc52fb5..2953d1849a 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -142,6 +142,8 @@ MAKE_EVENT_CODE_AIO(LPC_REPLICA_COPY_LAST_CHECKPOINT_DONE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_COLD_BACKUP, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_COLD_BACKUP, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_EXEC_COMMAND_ON_REPLICA, TASK_PRIORITY_LOW) +MAKE_EVENT_CODE(LPC_PARTITION_SPLIT, TASK_PRIORITY_LOW) +MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ERROR, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_LOW, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_REPLICATION_COMMON, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH) diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 4099bbdbdc..1c8a67e751 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -29,7 +29,8 @@ struct partition_status PS_ERROR = 2, PS_PRIMARY = 3, PS_SECONDARY = 4, - PS_POTENTIAL_SECONDARY = 5 + PS_POTENTIAL_SECONDARY = 5, + PS_PARTITION_SPLIT = 6 }; }; @@ -1254,7 +1255,8 @@ typedef struct _group_check_request__isset node(false), config(false), last_committed_decree(false), - confirmed_decree(false) + confirmed_decree(false), + child_gpid(false) { } bool app : 1; @@ -1262,6 +1264,7 @@ typedef struct _group_check_request__isset bool config : 1; bool last_committed_decree : 1; bool confirmed_decree : 1; + bool child_gpid : 1; } _group_check_request__isset; class group_check_request @@ -1279,6 +1282,7 @@ class group_check_request replica_configuration config; int64_t last_committed_decree; int64_t confirmed_decree; + ::dsn::gpid child_gpid; _group_check_request__isset __isset; @@ -1292,6 +1296,8 @@ class group_check_request void __set_confirmed_decree(const int64_t val); + void __set_child_gpid(const ::dsn::gpid &val); + bool operator==(const group_check_request &rhs) const { if (!(app == rhs.app)) @@ -1306,6 +1312,10 @@ class group_check_request return false; else if (__isset.confirmed_decree && !(confirmed_decree == rhs.confirmed_decree)) return false; + if (__isset.child_gpid != rhs.__isset.child_gpid) + return false; + else if (__isset.child_gpid && !(child_gpid == rhs.child_gpid)) + return false; return true; } bool operator!=(const group_check_request &rhs) const { return !(*this == rhs); } diff --git a/src/dist/replication/common/replication_types.cpp b/src/dist/replication/common/replication_types.cpp index c9dee615a8..474d185408 100644 --- a/src/dist/replication/common/replication_types.cpp +++ b/src/dist/replication/common/replication_types.cpp @@ -19,15 +19,17 @@ int _kpartition_statusValues[] = {partition_status::PS_INVALID, partition_status::PS_ERROR, partition_status::PS_PRIMARY, partition_status::PS_SECONDARY, - partition_status::PS_POTENTIAL_SECONDARY}; + partition_status::PS_POTENTIAL_SECONDARY, + partition_status::PS_PARTITION_SPLIT}; const char *_kpartition_statusNames[] = {"PS_INVALID", "PS_INACTIVE", "PS_ERROR", "PS_PRIMARY", "PS_SECONDARY", - "PS_POTENTIAL_SECONDARY"}; + "PS_POTENTIAL_SECONDARY", + "PS_PARTITION_SPLIT"}; const std::map _partition_status_VALUES_TO_NAMES( - ::apache::thrift::TEnumIterator(6, _kpartition_statusValues, _kpartition_statusNames), + ::apache::thrift::TEnumIterator(7, _kpartition_statusValues, _kpartition_statusNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _kread_semanticValues[] = {read_semantic::ReadInvalid, @@ -2350,6 +2352,12 @@ void group_check_request::__set_confirmed_decree(const int64_t val) __isset.confirmed_decree = true; } +void group_check_request::__set_child_gpid(const ::dsn::gpid &val) +{ + this->child_gpid = val; + __isset.child_gpid = true; +} + uint32_t group_check_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -2409,6 +2417,14 @@ uint32_t group_check_request::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; + case 6: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->child_gpid.read(iprot); + this->__isset.child_gpid = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -2448,6 +2464,11 @@ uint32_t group_check_request::write(::apache::thrift::protocol::TProtocol *oprot xfer += oprot->writeI64(this->confirmed_decree); xfer += oprot->writeFieldEnd(); } + if (this->__isset.child_gpid) { + xfer += oprot->writeFieldBegin("child_gpid", ::apache::thrift::protocol::T_STRUCT, 6); + xfer += this->child_gpid.write(oprot); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -2461,6 +2482,7 @@ void swap(group_check_request &a, group_check_request &b) swap(a.config, b.config); swap(a.last_committed_decree, b.last_committed_decree); swap(a.confirmed_decree, b.confirmed_decree); + swap(a.child_gpid, b.child_gpid); swap(a.__isset, b.__isset); } @@ -2471,6 +2493,7 @@ group_check_request::group_check_request(const group_check_request &other67) config = other67.config; last_committed_decree = other67.last_committed_decree; confirmed_decree = other67.confirmed_decree; + child_gpid = other67.child_gpid; __isset = other67.__isset; } group_check_request::group_check_request(group_check_request &&other68) @@ -2480,6 +2503,7 @@ group_check_request::group_check_request(group_check_request &&other68) config = std::move(other68.config); last_committed_decree = std::move(other68.last_committed_decree); confirmed_decree = std::move(other68.confirmed_decree); + child_gpid = std::move(other68.child_gpid); __isset = std::move(other68.__isset); } group_check_request &group_check_request::operator=(const group_check_request &other69) @@ -2489,6 +2513,7 @@ group_check_request &group_check_request::operator=(const group_check_request &o config = other69.config; last_committed_decree = other69.last_committed_decree; confirmed_decree = other69.confirmed_decree; + child_gpid = other69.child_gpid; __isset = other69.__isset; return *this; } @@ -2499,6 +2524,7 @@ group_check_request &group_check_request::operator=(group_check_request &&other7 config = std::move(other70.config); last_committed_decree = std::move(other70.last_committed_decree); confirmed_decree = std::move(other70.confirmed_decree); + child_gpid = std::move(other70.child_gpid); __isset = std::move(other70.__isset); return *this; } @@ -2516,6 +2542,9 @@ void group_check_request::printTo(std::ostream &out) const out << ", " << "confirmed_decree="; (__isset.confirmed_decree ? (out << to_string(confirmed_decree)) : (out << "")); + out << ", " + << "child_gpid="; + (__isset.child_gpid ? (out << to_string(child_gpid)) : (out << "")); out << ")"; } diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 54cfdc4f84..cd0f6b20ee 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -60,6 +60,7 @@ replica::replica( dassert(stub != nullptr, ""); _stub = stub; _dir = dir; + _options = &stub->options(); init_state(); _config.pid = gpid; @@ -81,6 +82,9 @@ replica::replica( _extra_envs.insert( std::make_pair(backup_restore_constant::FORCE_RESTORE, std::string("true"))); } + + _child_gpid = dsn::gpid(0, 0); + _child_init_ballot = 0; } void replica::update_last_checkpoint_generate_time() diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 3cd95ee318..ef7f350b63 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -76,8 +76,12 @@ class replica : public serverlet, public ref_counter, public replica_ba // routines for replica stub // static replica *load(replica_stub *stub, const char *dir); - static replica * - newr(replica_stub *stub, gpid gpid, const app_info &app, bool restore_if_necessary); + // {parent_dir} is used in partition split for get_replica_dir in replica_stub + static replica *newr(replica_stub *stub, + gpid gpid, + const app_info &app, + bool restore_if_necessary, + const std::string &parent_dir = ""); // return true when the mutation is valid for the current replica bool replay_mutation(mutation_ptr &mu, bool is_private); @@ -309,6 +313,15 @@ class replica : public serverlet, public ref_counter, public replica_ba std::string query_compact_state() const; + ///////////////////////////////////////////////////////////////// + // partition split + // parent partition create child + mock_virtual void on_add_child(const group_check_request &request); + + // child replica initialize config and state info + mock_virtual void + init_child_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot); + private: friend class ::dsn::replication::replication_checker; friend class ::dsn::replication::test::test_checker; @@ -364,6 +377,7 @@ class replica : public serverlet, public ref_counter, public replica_ba potential_secondary_context _potential_secondary_states; // policy_name --> cold_backup_context std::map _cold_backup_contexts; + partition_split_context _split_states; // timer task that running in replication-thread dsn::task_ptr _collect_info_timer; @@ -391,6 +405,14 @@ class replica : public serverlet, public ref_counter, public replica_ba // duplication std::unique_ptr _duplication_mgr; + // partition split + // _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition + // _child_gpid.app_id = 0 for partitions not during partition split + dsn::gpid _child_gpid; + // ballot when starting partition split coz split will stop if ballot changed + // _child_init_ballot=0 if partition not during partition split + ballot _child_init_ballot; + // perf counters perf_counter_wrapper _counter_private_log_size; perf_counter_wrapper _counter_recent_write_throttling_delay_count; diff --git a/src/dist/replication/lib/replica_context.cpp b/src/dist/replication/lib/replica_context.cpp index 5a5610328e..d089c5dafe 100644 --- a/src/dist/replication/lib/replica_context.cpp +++ b/src/dist/replication/lib/replica_context.cpp @@ -1287,5 +1287,12 @@ void cold_backup_context::file_upload_complete(const std::string &filename) _cur_upload_file_cnt -= 1; _file_status[filename] = file_status::FileUploadComplete; } + +bool partition_split_context::cleanup(bool force) +{ + parent_gpid.set_app_id(0); + return true; } -} // end namespace + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index b673a06a0d..23c800178f 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -523,6 +523,16 @@ class cold_backup_context : public ref_counter typedef dsn::ref_ptr cold_backup_context_ptr; +class partition_split_context +{ +public: + partition_split_context() {} + bool cleanup(bool force); + +public: + gpid parent_gpid; +}; + //---------------inline impl---------------------------------------------------------------- inline partition_status::type primary_context::get_node_status(::dsn::rpc_address addr) const diff --git a/src/dist/replication/lib/replica_init.cpp b/src/dist/replication/lib/replica_init.cpp index 85e955846e..d7bbaa0517 100644 --- a/src/dist/replication/lib/replica_init.cpp +++ b/src/dist/replication/lib/replica_init.cpp @@ -74,10 +74,13 @@ error_code replica::initialize_on_new() return init_app_and_prepare_list(true); } -/*static*/ replica * -replica::newr(replica_stub *stub, gpid gpid, const app_info &app, bool restore_if_necessary) +/*static*/ replica *replica::newr(replica_stub *stub, + gpid gpid, + const app_info &app, + bool restore_if_necessary, + const std::string &parent_dir) { - std::string dir = stub->get_replica_dir(app.app_type.c_str(), gpid); + std::string dir = stub->get_replica_dir(app.app_type.c_str(), gpid, true, parent_dir); replica *rep = new replica(stub, gpid, app, dir.c_str(), restore_if_necessary); error_code err; if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) { diff --git a/src/dist/replication/lib/replica_split.cpp b/src/dist/replication/lib/replica_split.cpp new file mode 100644 index 0000000000..73a3f9302b --- /dev/null +++ b/src/dist/replication/lib/replica_split.cpp @@ -0,0 +1,106 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include + +#include "replica.h" +#include "replica_stub.h" + +namespace dsn { +namespace replication { + +void replica::on_add_child(const group_check_request &request) // on parent partition +{ + if (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY && + (status() != partition_status::PS_INACTIVE || !_inactive_is_transient)) { + dwarn_f("{} receive add child request with wrong status {}, ignore this request", + name(), + enum_to_string(status())); + return; + } + + if (request.config.ballot != get_ballot()) { + dwarn_f("{} receive add child request with different ballot, local ballot({}) VS request " + "ballot({}), ignore this request", + name(), + get_ballot(), + request.config.ballot); + return; + } + + gpid child_gpid = request.child_gpid; + if (_child_gpid == child_gpid) { + dwarn_f("{}: child replica({}.{}) is already existed, might be partition splitting, ignore " + "this request", + name(), + child_gpid.get_app_id(), + child_gpid.get_partition_index()); + return; + } + + if (child_gpid.get_partition_index() < _app_info.partition_count) { + dwarn_f("{} receive old add child request, child gpid is ({}.{}), " + "local partition count is {}, ignore this request", + name(), + child_gpid.get_app_id(), + child_gpid.get_partition_index(), + _app_info.partition_count); + return; + } + + _child_gpid = child_gpid; + _child_init_ballot = get_ballot(); + + ddebug_f("{} process add child({}.{}), primary is {}, ballot is {}, " + "status is {}, last_committed_decree is {}", + name(), + child_gpid.get_app_id(), + child_gpid.get_partition_index(), + request.config.primary.to_string(), + request.config.ballot, + enum_to_string(request.config.status), + request.last_committed_decree); + + tasking::enqueue(LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica_stub::create_child_replica, + _stub, + _config.primary, + _app_info, + _child_init_ballot, + _child_gpid, + get_gpid(), + _dir), + get_gpid().thread_hash()); +} + +void replica::init_child_replica(gpid parent_gpid, + rpc_address primary_address, + ballot init_ballot) // on child partition +{ + if (status() != partition_status::PS_INACTIVE) { + dwarn_f("{}: wrong status {}", name(), enum_to_string(status())); + _stub->split_replica_exec(LPC_PARTITION_SPLIT_ERROR, parent_gpid, [](replica_ptr r) { + r->_child_gpid.set_app_id(0); + }); + return; + } + + // update replica config + _config.ballot = init_ballot; + _config.primary = primary_address; + _config.status = partition_status::PS_PARTITION_SPLIT; + + // init split states + _split_states.parent_gpid = parent_gpid; + + ddebug_f("{}: init ballot is {}, parent gpid is ({}.{})", + name(), + init_ballot, + parent_gpid.get_app_id(), + parent_gpid.get_partition_index()); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index e8e00853bf..69ab7a012b 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #ifdef DSN_ENABLE_GPERF #include #endif @@ -2261,19 +2262,33 @@ void replica_stub::close() } } -std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new) +std::string replica_stub::get_replica_dir(const char *app_type, + gpid id, + bool create_new, + const std::string &parent_dir) { + auto check_data_dir = [](const std::string &parent_dir, std::string &data_dir) { + // parent_dir = // + // return true if parent_dir's is euqal to + return (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/"); + }; + char buffer[256]; sprintf(buffer, "%s.%s", id.to_string(), app_type); std::string ret_dir; for (auto &dir : _options.data_dirs) { std::string cur_dir = utils::filesystem::path_combine(dir, buffer); - if (utils::filesystem::directory_exists(cur_dir)) { + // if creating child replica during partition split, we should gurantee child replica and + // parent replica share the same data dir + if (utils::filesystem::directory_exists(cur_dir) || check_data_dir(parent_dir, dir)) { if (!ret_dir.empty()) { dassert( false, "replica dir conflict: %s <--> %s", cur_dir.c_str(), ret_dir.c_str()); } ret_dir = cur_dir; + if (check_data_dir(parent_dir, dir)) { + _fs_manager.add_replica(id, ret_dir); + } } } if (ret_dir.empty() && create_new) { @@ -2281,5 +2296,122 @@ std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool cr } return ret_dir; } + +// +// partition split +// + +void replica_stub::create_child_replica(rpc_address primary_address, + app_info app, + ballot init_ballot, + gpid child_gpid, + gpid parent_gpid, + const std::string &parent_dir) +{ + replica_ptr child_replica = get_replica_permit_create_new(child_gpid, &app, parent_dir); + if (child_replica != nullptr) { + ddebug_f("create child replica ({}.{}) succeed", + child_gpid.get_app_id(), + child_gpid.get_partition_index()); + child_replica->init_child_replica(parent_gpid, primary_address, init_ballot); + } else { + dwarn_f("failed to create child replica ({}.{}), ignore it and wait next run", + child_gpid.get_app_id(), + child_gpid.get_partition_index()); + split_replica_exec(LPC_PARTITION_SPLIT_ERROR, parent_gpid, [](replica_ptr r) { + r->_child_gpid.set_app_id(0); + }); + } +} + +replica_ptr +replica_stub::get_replica_permit_create_new(gpid pid, app_info *app, const std::string &parent_dir) +{ + zauto_write_lock l(_replicas_lock); + auto it = _replicas.find(pid); + if (it != _replicas.end()) { + return it->second; + } else { + if (_opening_replicas.find(pid) != _opening_replicas.end()) { + ddebug_f("Cannot create new replica({}.{}) coz it is under open", + pid.get_app_id(), + pid.get_partition_index()); + return nullptr; + } else if (_closing_replicas.find(pid) != _closing_replicas.end()) { + ddebug_f("Cannnot create new replica({}.{}) coz it is under close", + pid.get_app_id(), + pid.get_partition_index()); + return nullptr; + } else { + replica *rep = replica::newr(this, pid, *app, false, parent_dir); + if (rep != nullptr) { + auto pr = _replicas.insert(replicas::value_type(pid, rep)); + dassert_f(pr.second, "replica {} has been existed", rep->name()); + _counter_replicas_count->increment(); + _closed_replicas.erase(pid); + } + return rep; + } + } +} + +void replica_stub::split_replica_exec(task_code code, + gpid pid, + local_execution handler, + local_execution missing_handler, + gpid missing_handler_gpid, + std::chrono::milliseconds delay) +{ + replica_ptr rep = pid.get_app_id() == 0 ? nullptr : get_replica(pid); + replica_ptr rep2 = + missing_handler_gpid.get_app_id() == 0 ? nullptr : get_replica(missing_handler_gpid); + + if (!rep && !rep2) { + derror_f("replica({}.{}) and replica({}.{}) are not existed", + pid.get_app_id(), + pid.get_partition_index(), + missing_handler_gpid.get_app_id(), + missing_handler_gpid.get_partition_index()); + return; + } + + if (rep && handler) { + tasking::enqueue( + code, rep.get()->tracker(), [=]() { handler(rep); }, pid.thread_hash(), delay); + } else if (rep2 && missing_handler) { + ddebug_f("replica({}.{}) is invalid, replica({}.{} will execute its handler)", + pid.get_app_id(), + pid.get_partition_index(), + missing_handler_gpid.get_app_id(), + missing_handler_gpid.get_partition_index()); + tasking::enqueue(code, + rep2.get()->tracker(), + [=]() { missing_handler(rep2); }, + missing_handler_gpid.thread_hash(), + delay); + } else { + // no handler will be executed + if (rep) { + dwarn_f("replica({}.{}) does not define handler", + pid.get_app_id(), + pid.get_partition_index()); + } else { + dwarn_f("replica({}.{}) is invalid, replica({}.{}) does not define handler", + pid.get_app_id(), + pid.get_partition_index(), + missing_handler_gpid.get_app_id(), + missing_handler_gpid.get_partition_index()); + } + } +} + +void replica_stub::split_replica_exec(task_code code, + gpid pid, + local_execution handler, + std::chrono::milliseconds delay) +{ + split_replica_exec(code, pid, handler, nullptr, gpid(), delay); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 6e01f11bbe..52d7557ffc 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -140,7 +140,12 @@ class replica_stub : public serverlet, public ref_counter virtual rpc_address get_meta_server_address() const { return _failure_detector->get_servers(); } rpc_address primary_address() const { return _primary_address; } - std::string get_replica_dir(const char *app_type, gpid id, bool create_new = true); + // {parent_dir} is used for partition split to gurantee child replica data dir is as same as it + // parent + std::string get_replica_dir(const char *app_type, + gpid id, + bool create_new = true, + const std::string &parent_dir = ""); // // helper methods @@ -155,6 +160,42 @@ class replica_stub : public serverlet, public ref_counter bool allow_empty_args, std::function func); + // + // partition split + // + + // called by parent partition, executed by child partition + void create_child_replica(dsn::rpc_address primary_address, + app_info app, + ballot init_ballot, + gpid child_gpid, + gpid parent_gpid, + const std::string &parent_dir); + + // get replica, and create a new replica instance if not found + mock_virtual replica_ptr get_replica_permit_create_new(gpid pid, + app_info *app, + const std::string &parent_dir); + + typedef std::function local_execution; + // During partition split, we should handle both parent and child at the same time, + // especially child replica. For example, if child replica is invalid, we should execute error + // handler. This function can be helpful for this condition. + // - if replica() is existed and valid, then add into task queue, replica() + // will execute function after milliseconds. + // - else add into task queue, replica() will execute + // function after milliseconds. + void split_replica_exec(task_code code, + gpid pid, + local_execution handler, + local_execution missing_handler, + gpid missing_handler_gpid = gpid(0, 0), + std::chrono::milliseconds delay = std::chrono::milliseconds(0)); + void split_replica_exec(task_code code, + gpid pid, + local_execution handler, + std::chrono::milliseconds delay = std::chrono::milliseconds(0)); + private: enum replica_node_state { diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index 672051dcf9..263f1677d6 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -35,7 +35,8 @@ enum partition_status PS_ERROR, PS_PRIMARY, PS_SECONDARY, - PS_POTENTIAL_SECONDARY + PS_POTENTIAL_SECONDARY, + PS_PARTITION_SPLIT } struct replica_configuration @@ -164,6 +165,9 @@ struct group_check_request // and secondaries, so that secondaries can be allowed to GC // their WALs after this decree. 5:optional i64 confirmed_decree; + + // Used to deliver child gpid during partition split + 6:optional dsn.gpid child_gpid; } struct group_check_response diff --git a/src/dist/replication/test/replica_test/unit_test/CMakeLists.txt b/src/dist/replication/test/replica_test/unit_test/CMakeLists.txt index 4abe8b02b0..c4948f761c 100644 --- a/src/dist/replication/test/replica_test/unit_test/CMakeLists.txt +++ b/src/dist/replication/test/replica_test/unit_test/CMakeLists.txt @@ -3,6 +3,10 @@ set(MY_PROJ_NAME dsn.replica.test) #Source files under CURRENT project directory will be automatically included. #You can manually set MY_PROJ_SRC to include source files under other directories. set(MY_PROJ_SRC "") +file(GLOB MY_PROJ_SRC + ${PROJECT_SOURCE_DIR}/src/dist/replication/lib/*.cpp + ${PROJECT_SOURCE_DIR}/src/dist/replication/lib/duplication/*.cpp + ) #Search mode for source files under CURRENT project directory ? #"GLOB_RECURSE" for recursive search @@ -12,12 +16,13 @@ set(MY_SRC_SEARCH_MODE "GLOB") set(MY_PROJ_INC_PATH "") set(MY_PROJ_LIBS dsn_meta_server - dsn_replica_server dsn.replication.zookeeper_provider dsn_replication_common dsn.block_service.local dsn.block_service.fds dsn.failure_detector + dsn.failure_detector.multimaster + dsn_nfs dsn_http dsn_runtime zookeeper_mt diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index d31d219f4d..963067e1f5 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -36,7 +36,38 @@ namespace dsn { namespace replication { -class mock_replica : public replica +class mock_base +{ +public: + void reset() { _mock_functions.clear(); } + bool find(std::string name) { return (_mock_functions.find(name) != _mock_functions.end()); } + void insert(std::string name) { _mock_functions.insert(name); } + void erase(std::string name) { _mock_functions.erase(name); } +protected: + std::set _mock_functions; +}; + +#define DEFINE_MOCK1(base_class, func_name, type1) \ + void func_name(type1 param1) \ + { \ + if (find(#func_name)) { \ + ddebug("mock function %s is called", #func_name); \ + } else { \ + base_class::func_name(param1); \ + } \ + } + +#define DEFINE_MOCK3(base_class, func_name, type1, type2, type3) \ + void func_name(type1 param1, type2 param2, type3 param3) \ + { \ + if (find(#func_name)) { \ + ddebug("mock function %s is called", #func_name); \ + } else { \ + base_class::func_name(param1, param2, param3); \ + } \ + } + +class mock_replica : public replica, public mock_base { public: mock_replica(replica_stub *stub, gpid gpid, const app_info &app, const char *dir) @@ -45,6 +76,15 @@ class mock_replica : public replica } ~mock_replica() override {} + + /// mock functions + DEFINE_MOCK1(replica, on_add_child, const group_check_request &) + DEFINE_MOCK3(replica, init_child_replica, gpid, rpc_address, ballot) + + /// helper functions + void set_replica_config(replica_configuration &config) { _config = config; } + void set_partition_status(partition_status::type status) { _config.status = status; } + void set_child_gpid(gpid pid) { _child_gpid = pid; } }; inline std::unique_ptr create_mock_replica(replica_stub *stub, @@ -63,9 +103,47 @@ inline std::unique_ptr create_mock_replica(replica_stub *stub, class mock_replica_stub : public replica_stub { public: - mock_replica_stub() = default; + mock_replica_stub() : replica_stub() {} + + ~mock_replica_stub() override {} - ~mock_replica_stub() override = default; + rpc_address get_meta_server_address() const { return rpc_address(); } + + replica_ptr + get_replica_permit_create_new(gpid pid, app_info *app, const std::string &parent_dir) + { + mock_replica *rep = generate_replica(*app, pid, parent_dir); + rep->insert("init_child_replica"); + return rep; + } + + /// helper functions + std::unique_ptr + generate_replica(app_info info, + gpid pid, + partition_status::type status = partition_status::PS_INACTIVE, + ballot b = 5) + { + replica_configuration config; + config.ballot = b; + config.pid = pid; + config.status = status; + + std::unique_ptr rep = + make_unique(this, pid, std::move(info), "./"); + rep->set_replica_config(config); + _replicas.insert(replicas::value_type(pid, rep.get())); + + return rep; + } + + mock_replica *generate_replica(app_info info, gpid pid, const std::string &parent_dir) + { + mock_replica *rep = new mock_replica(this, pid, std::move(info), "./"); + rep->set_partition_status(partition_status::PS_INACTIVE); + _replicas.insert(replicas::value_type(pid, rep)); + return rep; + } }; } // namespace replication diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp new file mode 100644 index 0000000000..8c83a2f175 --- /dev/null +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -0,0 +1,86 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include + +#include "replica_test_base.h" + +namespace dsn { +namespace replication { + +class split_replica_test : public testing::Test +{ +public: + split_replica_test() { mock_app_info(); } + + void SetUp() + { + _stub = make_unique(); + _parent = _stub->generate_replica( + _app_info, _parent_pid, partition_status::PS_PRIMARY, _init_ballot); + _child = nullptr; + mock_group_check_request(); + } + + void TearDown() + { + _parent->set_partition_status(partition_status::PS_INACTIVE); + _parent->reset(); + } + + void mock_app_info() + { + _app_info.app_id = 2; + _app_info.app_name = "split_test"; + _app_info.app_type = "replica"; + _app_info.is_stateful = true; + _app_info.max_replica_count = 3; + _app_info.partition_count = 8; + } + + void mock_group_check_request() + { + _req.child_gpid = _child_pid; + _req.config.ballot = _init_ballot; + _req.config.status = partition_status::PS_PRIMARY; + } + +public: + std::unique_ptr _stub; + + std::unique_ptr _parent; + std::unique_ptr _child; + + dsn::app_info _app_info; + dsn::gpid _parent_pid = gpid(2, 1); + dsn::gpid _child_pid = gpid(2, 9); + uint32_t _old_partition_count = 8; + ballot _init_ballot = 3; + + group_check_request _req; +}; + +TEST_F(split_replica_test, add_child_wrong_ballot) +{ + _req.config.ballot = 5; + _parent->on_add_child(_req); + ASSERT_EQ(_child, nullptr); +} + +TEST_F(split_replica_test, add_child_wrong_child_gpid) +{ + _parent->set_child_gpid(_child_pid); + _parent->on_add_child(_req); + ASSERT_EQ(_child, nullptr); +} + +TEST_F(split_replica_test, add_child_succeed) +{ + _parent->on_add_child(_req); + _parent->tracker()->wait_outstanding_tasks(); + ASSERT_NE(_stub->get_replica(_child_pid), nullptr); +} + +} // namespace replication +} // namespace dsn From 451253edaa86288f67074fb6497720113accc925 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 9 Aug 2019 11:24:39 +0800 Subject: [PATCH 02/17] small fix --- src/dist/replication/lib/replica.h | 2 +- src/dist/replication/lib/replica_stub.h | 2 +- .../replication/test/replica_test/unit_test/mock_utils.h | 2 +- .../test/replica_test/unit_test/replica_split_test.cpp | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index ef7f350b63..bfbcc73b16 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -70,7 +70,7 @@ class test_checker; class replica : public serverlet, public ref_counter, public replica_base { public: - ~replica(void); + mock_virtual ~replica(void); // // routines for replica stub diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 52d7557ffc..c915c0e2cb 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -70,7 +70,7 @@ class replica_stub : public serverlet, public ref_counter public: replica_stub(replica_state_subscriber subscriber = nullptr, bool is_long_subscriber = true); - ~replica_stub(void); + mock_virtual ~replica_stub(void); // // initialization diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index 963067e1f5..d73ecc8905 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -39,7 +39,7 @@ namespace replication { class mock_base { public: - void reset() { _mock_functions.clear(); } + void reset_all() { _mock_functions.clear(); } bool find(std::string name) { return (_mock_functions.find(name) != _mock_functions.end()); } void insert(std::string name) { _mock_functions.insert(name); } void erase(std::string name) { _mock_functions.erase(name); } diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 8c83a2f175..8a8908b080 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -12,21 +12,21 @@ namespace replication { class split_replica_test : public testing::Test { public: - split_replica_test() { mock_app_info(); } - void SetUp() { _stub = make_unique(); + mock_app_info(); _parent = _stub->generate_replica( _app_info, _parent_pid, partition_status::PS_PRIMARY, _init_ballot); - _child = nullptr; mock_group_check_request(); } void TearDown() { _parent->set_partition_status(partition_status::PS_INACTIVE); - _parent->reset(); + _parent->reset_all(); + _parent.reset(); + _stub.reset(); } void mock_app_info() From 4ca4947be83301c997ea7812a942af8582255754 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 9 Aug 2019 13:41:09 +0800 Subject: [PATCH 03/17] fix travis --- src/dist/replication/test/replica_test/unit_test/mock_utils.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index d73ecc8905..5003340cc9 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -132,8 +132,6 @@ class mock_replica_stub : public replica_stub std::unique_ptr rep = make_unique(this, pid, std::move(info), "./"); rep->set_replica_config(config); - _replicas.insert(replicas::value_type(pid, rep.get())); - return rep; } From 788665b226e50d1cdc5bcd2e09397db5707d0ebf Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 12 Aug 2019 16:07:14 +0800 Subject: [PATCH 04/17] refactor get_replica_dir and other small fix --- src/dist/replication/lib/replica.h | 4 +- src/dist/replication/lib/replica_stub.cpp | 41 +++++++++++-------- .../unit_test/replica_split_test.cpp | 1 + 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index bfbcc73b16..a07b738881 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -407,10 +407,10 @@ class replica : public serverlet, public ref_counter, public replica_ba // partition split // _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition - // _child_gpid.app_id = 0 for partitions not during partition split + // _child_gpid.app_id = 0 if parent partition not during partition split and child partition dsn::gpid _child_gpid; // ballot when starting partition split coz split will stop if ballot changed - // _child_init_ballot=0 if partition not during partition split + // _child_init_ballot = 0 if partition not during partition split ballot _child_init_ballot; // perf counters diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 69ab7a012b..9d25bada93 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2268,33 +2268,38 @@ std::string replica_stub::get_replica_dir(const char *app_type, const std::string &parent_dir) { auto check_data_dir = [](const std::string &parent_dir, std::string &data_dir) { - // parent_dir = // + // parent_dir = /. // return true if parent_dir's is euqal to return (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/"); }; - char buffer[256]; - sprintf(buffer, "%s.%s", id.to_string(), app_type); - std::string ret_dir; - for (auto &dir : _options.data_dirs) { - std::string cur_dir = utils::filesystem::path_combine(dir, buffer); - // if creating child replica during partition split, we should gurantee child replica and - // parent replica share the same data dir - if (utils::filesystem::directory_exists(cur_dir) || check_data_dir(parent_dir, dir)) { - if (!ret_dir.empty()) { + char gpid_str[256]; + sprintf(gpid_str, "%s.%s", id.to_string(), app_type); + std::string replica_dir; + bool is_dir_confict = false; + for (auto &data_dir : _options.data_dirs) { + std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); + if (utils::filesystem::directory_exists(dir) && parent_dir == "") { + if (is_dir_confict) { dassert( - false, "replica dir conflict: %s <--> %s", cur_dir.c_str(), ret_dir.c_str()); - } - ret_dir = cur_dir; - if (check_data_dir(parent_dir, dir)) { - _fs_manager.add_replica(id, ret_dir); + false, "replica dir conflict: %s <--> %s", dir.c_str(), replica_dir.c_str()); } + replica_dir = dir; + is_dir_confict = true; + } + + // if creating child replica during partition split, we should gurantee child replica and + // parent replica share the same data dir + if (check_data_dir(parent_dir, data_dir)) { + replica_dir = dir; + _fs_manager.add_replica(id, replica_dir); + break; } } - if (ret_dir.empty() && create_new) { - _fs_manager.allocate_dir(id, app_type, ret_dir); + if (replica_dir.empty() && create_new) { + _fs_manager.allocate_dir(id, app_type, replica_dir); } - return ret_dir; + return replica_dir; } // diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 8a8908b080..0e2db8d67b 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -77,6 +77,7 @@ TEST_F(split_replica_test, add_child_wrong_child_gpid) TEST_F(split_replica_test, add_child_succeed) { + _parent->insert("init_child_replica"); _parent->on_add_child(_req); _parent->tracker()->wait_outstanding_tasks(); ASSERT_NE(_stub->get_replica(_child_pid), nullptr); From 4ef854f9b98ef1c0bdb4deaf109585ab68af7a70 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 13 Aug 2019 11:27:47 +0800 Subject: [PATCH 05/17] format file after resolving conflict --- .../replication/test/replica_test/unit_test/mock_utils.h | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index fc8231499f..29dffa2dc4 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -36,7 +36,6 @@ namespace dsn { namespace replication { - class mock_base { public: @@ -102,7 +101,7 @@ class mock_replication_app_base : public replication_app_base std::map _envs; }; -class mock_replica : public replica : public mock_base +class mock_replica : public replica, public mock_base { public: mock_replica(replica_stub *stub, gpid gpid, const app_info &app, const char *dir) @@ -116,13 +115,13 @@ class mock_replica : public replica : public mock_base _config.status = partition_status::PS_INACTIVE; _app.reset(nullptr); } - + replica_duplicator_manager &get_replica_duplicator_manager() { return *_duplication_mgr; } void as_primary() { _config.status = partition_status::PS_PRIMARY; } void as_secondary() { _config.status = partition_status::PS_SECONDARY; } - + /// mock functions DEFINE_MOCK1(replica, on_add_child, const group_check_request &) DEFINE_MOCK3(replica, init_child_replica, gpid, rpc_address, ballot) @@ -150,7 +149,7 @@ class mock_replica_stub : public replica_stub { public: mock_replica_stub() : replica_stub() {} - + ~mock_replica_stub() override = default; void add_replica(replica *r) { _replicas[r->get_gpid()] = replica_ptr(r); } From 1ddf9d990a995fe9b37aa3eb1bffd39b5ca6b6c2 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 13 Aug 2019 18:02:18 +0800 Subject: [PATCH 06/17] use fail_point instead of mock_virtual in unit test and other small fix --- src/dist/replication/lib/CMakeLists.txt | 1 + .../lib/duplication/test/CMakeLists.txt | 2 +- src/dist/replication/lib/replica.cpp | 4 -- src/dist/replication/lib/replica.h | 12 ++-- src/dist/replication/lib/replica_split.cpp | 69 ++++++++++--------- src/dist/replication/lib/replica_stub.cpp | 32 +++++---- src/dist/replication/lib/replica_stub.h | 9 ++- .../storage_engine/simple_kv/CMakeLists.txt | 2 +- .../replica_test/unit_test/CMakeLists.txt | 10 +-- .../test/replica_test/unit_test/mock_utils.h | 53 +------------- .../unit_test/replica_split_test.cpp | 39 ++++++----- .../replication/test/simple_kv/CMakeLists.txt | 2 +- src/tests/dsn/CMakeLists.txt | 2 +- 13 files changed, 94 insertions(+), 143 deletions(-) diff --git a/src/dist/replication/lib/CMakeLists.txt b/src/dist/replication/lib/CMakeLists.txt index 606b5bff97..7844957fa7 100644 --- a/src/dist/replication/lib/CMakeLists.txt +++ b/src/dist/replication/lib/CMakeLists.txt @@ -24,6 +24,7 @@ set(MY_PROJ_LIBS dsn_nfs dsn_cli dsn_http + dsn_runtime ) # Extra files that will be installed diff --git a/src/dist/replication/lib/duplication/test/CMakeLists.txt b/src/dist/replication/lib/duplication/test/CMakeLists.txt index ae94f35aa3..71a0626c6e 100644 --- a/src/dist/replication/lib/duplication/test/CMakeLists.txt +++ b/src/dist/replication/lib/duplication/test/CMakeLists.txt @@ -22,7 +22,7 @@ set(MY_PROJ_LIBS dsn_meta_server fmt ) -set(MY_BOOST_LIBS Boost::system Boost::filesystem) +set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) set(MY_BINPLACES config-test.ini diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index c5e5a5080b..817de851a7 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -61,7 +61,6 @@ replica::replica( dassert(stub != nullptr, ""); _stub = stub; _dir = dir; - _options = &stub->options(); init_state(); _config.pid = gpid; @@ -83,9 +82,6 @@ replica::replica( _extra_envs.insert( std::make_pair(backup_restore_constant::FORCE_RESTORE, std::string("true"))); } - - _child_gpid = dsn::gpid(0, 0); - _child_init_ballot = 0; } void replica::update_last_checkpoint_generate_time() diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index a07b738881..c54111fa48 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -70,7 +70,7 @@ class test_checker; class replica : public serverlet, public ref_counter, public replica_base { public: - mock_virtual ~replica(void); + ~replica(void); // // routines for replica stub @@ -316,11 +316,10 @@ class replica : public serverlet, public ref_counter, public replica_ba ///////////////////////////////////////////////////////////////// // partition split // parent partition create child - mock_virtual void on_add_child(const group_check_request &request); + void on_add_child(const group_check_request &request); // child replica initialize config and state info - mock_virtual void - init_child_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot); + void init_child_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot); private: friend class ::dsn::replication::replication_checker; @@ -331,6 +330,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_learn_test; friend class replica_duplicator_manager; friend class load_mutation; + friend class replica_split_test; // replica configuration, updated by update_local_configuration ONLY replica_configuration _config; @@ -408,10 +408,10 @@ class replica : public serverlet, public ref_counter, public replica_ba // partition split // _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition // _child_gpid.app_id = 0 if parent partition not during partition split and child partition - dsn::gpid _child_gpid; + dsn::gpid _child_gpid{0, 0}; // ballot when starting partition split coz split will stop if ballot changed // _child_init_ballot = 0 if partition not during partition split - ballot _child_init_ballot; + ballot _child_init_ballot{0}; // perf counters perf_counter_wrapper _counter_private_log_size; diff --git a/src/dist/replication/lib/replica_split.cpp b/src/dist/replication/lib/replica_split.cpp index 73a3f9302b..bbc6a1577c 100644 --- a/src/dist/replication/lib/replica_split.cpp +++ b/src/dist/replication/lib/replica_split.cpp @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. #include +#include #include "replica.h" #include "replica_stub.h" @@ -10,57 +11,55 @@ namespace dsn { namespace replication { +// ThreadPool: THREAD_POOL_REPLICATION void replica::on_add_child(const group_check_request &request) // on parent partition { if (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY && (status() != partition_status::PS_INACTIVE || !_inactive_is_transient)) { - dwarn_f("{} receive add child request with wrong status {}, ignore this request", - name(), - enum_to_string(status())); + dwarn_replica("receive add child request with wrong status {}, ignore this request", + enum_to_string(status())); return; } if (request.config.ballot != get_ballot()) { - dwarn_f("{} receive add child request with different ballot, local ballot({}) VS request " - "ballot({}), ignore this request", - name(), - get_ballot(), - request.config.ballot); + dwarn_replica( + "receive add child request with different ballot, local ballot({}) VS request " + "ballot({}), ignore this request", + get_ballot(), + request.config.ballot); return; } gpid child_gpid = request.child_gpid; if (_child_gpid == child_gpid) { - dwarn_f("{}: child replica({}.{}) is already existed, might be partition splitting, ignore " - "this request", - name(), - child_gpid.get_app_id(), - child_gpid.get_partition_index()); + dwarn_replica( + "child replica({}.{}) is already existed, might be partition splitting, ignore " + "this request", + child_gpid.get_app_id(), + child_gpid.get_partition_index()); return; } if (child_gpid.get_partition_index() < _app_info.partition_count) { - dwarn_f("{} receive old add child request, child gpid is ({}.{}), " - "local partition count is {}, ignore this request", - name(), - child_gpid.get_app_id(), - child_gpid.get_partition_index(), - _app_info.partition_count); + dwarn_replica("receive old add child request, child gpid is ({}.{}), " + "local partition count is {}, ignore this request", + child_gpid.get_app_id(), + child_gpid.get_partition_index(), + _app_info.partition_count); return; } _child_gpid = child_gpid; _child_init_ballot = get_ballot(); - ddebug_f("{} process add child({}.{}), primary is {}, ballot is {}, " - "status is {}, last_committed_decree is {}", - name(), - child_gpid.get_app_id(), - child_gpid.get_partition_index(), - request.config.primary.to_string(), - request.config.ballot, - enum_to_string(request.config.status), - request.last_committed_decree); + ddebug_replica("process add child({}.{}), primary is {}, ballot is {}, " + "status is {}, last_committed_decree is {}", + child_gpid.get_app_id(), + child_gpid.get_partition_index(), + request.config.primary.to_string(), + request.config.ballot, + enum_to_string(request.config.status), + request.last_committed_decree); tasking::enqueue(LPC_PARTITION_SPLIT, tracker(), @@ -75,12 +74,15 @@ void replica::on_add_child(const group_check_request &request) // on parent part get_gpid().thread_hash()); } +// ThreadPool: THREAD_POOL_REPLICATION void replica::init_child_replica(gpid parent_gpid, rpc_address primary_address, ballot init_ballot) // on child partition { + FAIL_POINT_INJECT_F("replica_init_child_replica", [](dsn::string_view) {}); + if (status() != partition_status::PS_INACTIVE) { - dwarn_f("{}: wrong status {}", name(), enum_to_string(status())); + dwarn_replica("wrong status {}", enum_to_string(status())); _stub->split_replica_exec(LPC_PARTITION_SPLIT_ERROR, parent_gpid, [](replica_ptr r) { r->_child_gpid.set_app_id(0); }); @@ -95,11 +97,10 @@ void replica::init_child_replica(gpid parent_gpid, // init split states _split_states.parent_gpid = parent_gpid; - ddebug_f("{}: init ballot is {}, parent gpid is ({}.{})", - name(), - init_ballot, - parent_gpid.get_app_id(), - parent_gpid.get_partition_index()); + ddebug_replica("init ballot is {}, parent gpid is ({}.{})", + init_ballot, + parent_gpid.get_app_id(), + parent_gpid.get_partition_index()); } } // namespace replication diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 9d25bada93..bfff8f2449 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -50,6 +50,7 @@ #ifdef DSN_ENABLE_GPERF #include #endif +#include namespace dsn { namespace replication { @@ -2267,17 +2268,11 @@ std::string replica_stub::get_replica_dir(const char *app_type, bool create_new, const std::string &parent_dir) { - auto check_data_dir = [](const std::string &parent_dir, std::string &data_dir) { - // parent_dir = /. - // return true if parent_dir's is euqal to - return (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/"); - }; - - char gpid_str[256]; - sprintf(gpid_str, "%s.%s", id.to_string(), app_type); + // char gpid_str[256]; + std::string gpid_str = fmt::format("{}.{}", id.to_string(), app_type); std::string replica_dir; bool is_dir_confict = false; - for (auto &data_dir : _options.data_dirs) { + for (const std::string &data_dir : _options.data_dirs) { std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); if (utils::filesystem::directory_exists(dir) && parent_dir == "") { if (is_dir_confict) { @@ -2290,7 +2285,8 @@ std::string replica_stub::get_replica_dir(const char *app_type, // if creating child replica during partition split, we should gurantee child replica and // parent replica share the same data dir - if (check_data_dir(parent_dir, data_dir)) { + // parent_dir = /., check if parent_dir's is euqal to + if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { replica_dir = dir; _fs_manager.add_replica(id, replica_dir); break; @@ -2313,7 +2309,7 @@ void replica_stub::create_child_replica(rpc_address primary_address, gpid parent_gpid, const std::string &parent_dir) { - replica_ptr child_replica = get_replica_permit_create_new(child_gpid, &app, parent_dir); + replica_ptr child_replica = create_replica_if_not_found(child_gpid, &app, parent_dir); if (child_replica != nullptr) { ddebug_f("create child replica ({}.{}) succeed", child_gpid.get_app_id(), @@ -2330,20 +2326,28 @@ void replica_stub::create_child_replica(rpc_address primary_address, } replica_ptr -replica_stub::get_replica_permit_create_new(gpid pid, app_info *app, const std::string &parent_dir) +replica_stub::create_replica_if_not_found(gpid pid, app_info *app, const std::string &parent_dir) { + FAIL_POINT_INJECT_F("replica_stub_create_replica_if_not_found", + [=](dsn::string_view) -> replica_ptr { + replica *rep = new replica(this, pid, *app, "./", false); + rep->_config.status = partition_status::PS_INACTIVE; + _replicas.insert(replicas::value_type(pid, rep)); + return rep; + }); + zauto_write_lock l(_replicas_lock); auto it = _replicas.find(pid); if (it != _replicas.end()) { return it->second; } else { if (_opening_replicas.find(pid) != _opening_replicas.end()) { - ddebug_f("Cannot create new replica({}.{}) coz it is under open", + ddebug_f("failed create new replica({}.{}) because it is under open", pid.get_app_id(), pid.get_partition_index()); return nullptr; } else if (_closing_replicas.find(pid) != _closing_replicas.end()) { - ddebug_f("Cannnot create new replica({}.{}) coz it is under close", + ddebug_f("failed create new replica({}.{}) because it is under close", pid.get_app_id(), pid.get_partition_index()); return nullptr; diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index c915c0e2cb..2500f34a3b 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -70,7 +70,7 @@ class replica_stub : public serverlet, public ref_counter public: replica_stub(replica_state_subscriber subscriber = nullptr, bool is_long_subscriber = true); - mock_virtual ~replica_stub(void); + ~replica_stub(void); // // initialization @@ -172,10 +172,9 @@ class replica_stub : public serverlet, public ref_counter gpid parent_gpid, const std::string &parent_dir); - // get replica, and create a new replica instance if not found - mock_virtual replica_ptr get_replica_permit_create_new(gpid pid, - app_info *app, - const std::string &parent_dir); + // create a new replica instance if not found + // return nullptr when failed to create new replica + replica_ptr create_replica_if_not_found(gpid pid, app_info *app, const std::string &parent_dir); typedef std::function local_execution; // During partition split, we should handle both parent and child at the same time, diff --git a/src/dist/replication/storage_engine/simple_kv/CMakeLists.txt b/src/dist/replication/storage_engine/simple_kv/CMakeLists.txt index f6ba973b82..40d1e81247 100644 --- a/src/dist/replication/storage_engine/simple_kv/CMakeLists.txt +++ b/src/dist/replication/storage_engine/simple_kv/CMakeLists.txt @@ -13,7 +13,7 @@ set(MY_PROJ_INC_PATH "") set(MY_PROJ_LIBS dsn_replica_server dsn_meta_server dsn_replication_client dsn_runtime fmt) -set(MY_BOOST_LIBS Boost::system Boost::filesystem) +set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) set(INI_FILES "") file(GLOB diff --git a/src/dist/replication/test/replica_test/unit_test/CMakeLists.txt b/src/dist/replication/test/replica_test/unit_test/CMakeLists.txt index c4948f761c..59af9d6328 100644 --- a/src/dist/replication/test/replica_test/unit_test/CMakeLists.txt +++ b/src/dist/replication/test/replica_test/unit_test/CMakeLists.txt @@ -3,10 +3,6 @@ set(MY_PROJ_NAME dsn.replica.test) #Source files under CURRENT project directory will be automatically included. #You can manually set MY_PROJ_SRC to include source files under other directories. set(MY_PROJ_SRC "") -file(GLOB MY_PROJ_SRC - ${PROJECT_SOURCE_DIR}/src/dist/replication/lib/*.cpp - ${PROJECT_SOURCE_DIR}/src/dist/replication/lib/duplication/*.cpp - ) #Search mode for source files under CURRENT project directory ? #"GLOB_RECURSE" for recursive search @@ -16,13 +12,12 @@ set(MY_SRC_SEARCH_MODE "GLOB") set(MY_PROJ_INC_PATH "") set(MY_PROJ_LIBS dsn_meta_server + dsn_replica_server dsn.replication.zookeeper_provider dsn_replication_common dsn.block_service.local dsn.block_service.fds dsn.failure_detector - dsn.failure_detector.multimaster - dsn_nfs dsn_http dsn_runtime zookeeper_mt @@ -35,10 +30,9 @@ set(MY_PROJ_LIBS dsn_meta_server fmt gtest) -set(MY_BOOST_LIBS Boost::system Boost::filesystem) +set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) #Extra files that will be installed set(MY_BINPLACES clear.sh run.sh config-test.ini) -add_definitions(-DDSN_MOCK_TEST) dsn_add_test() diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index 29dffa2dc4..e32c414ee2 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -36,37 +36,6 @@ namespace dsn { namespace replication { -class mock_base -{ -public: - void reset_all() { _mock_functions.clear(); } - bool find(std::string name) { return (_mock_functions.find(name) != _mock_functions.end()); } - void insert(std::string name) { _mock_functions.insert(name); } - void erase(std::string name) { _mock_functions.erase(name); } -protected: - std::set _mock_functions; -}; - -#define DEFINE_MOCK1(base_class, func_name, type1) \ - void func_name(type1 param1) \ - { \ - if (find(#func_name)) { \ - ddebug("mock function %s is called", #func_name); \ - } else { \ - base_class::func_name(param1); \ - } \ - } - -#define DEFINE_MOCK3(base_class, func_name, type1, type2, type3) \ - void func_name(type1 param1, type2 param2, type3 param3) \ - { \ - if (find(#func_name)) { \ - ddebug("mock function %s is called", #func_name); \ - } else { \ - base_class::func_name(param1, param2, param3); \ - } \ - } - class mock_replication_app_base : public replication_app_base { public: @@ -101,7 +70,7 @@ class mock_replication_app_base : public replication_app_base std::map _envs; }; -class mock_replica : public replica, public mock_base +class mock_replica : public replica { public: mock_replica(replica_stub *stub, gpid gpid, const app_info &app, const char *dir) @@ -122,10 +91,6 @@ class mock_replica : public replica, public mock_base void as_secondary() { _config.status = partition_status::PS_SECONDARY; } - /// mock functions - DEFINE_MOCK1(replica, on_add_child, const group_check_request &) - DEFINE_MOCK3(replica, init_child_replica, gpid, rpc_address, ballot) - /// helper functions void set_replica_config(replica_configuration &config) { _config = config; } void set_partition_status(partition_status::type status) { _config.status = status; } @@ -180,14 +145,6 @@ class mock_replica_stub : public replica_stub std::map mock_replicas; - replica_ptr - get_replica_permit_create_new(gpid pid, app_info *app, const std::string &parent_dir) - { - mock_replica *rep = generate_replica(*app, pid, parent_dir); - rep->insert("init_child_replica"); - return rep; - } - /// helper functions std::unique_ptr generate_replica(app_info info, @@ -206,13 +163,7 @@ class mock_replica_stub : public replica_stub return rep; } - mock_replica *generate_replica(app_info info, gpid pid, const std::string &parent_dir) - { - mock_replica *rep = new mock_replica(this, pid, std::move(info), "./"); - rep->set_partition_status(partition_status::PS_INACTIVE); - _replicas.insert(replicas::value_type(pid, rep)); - return rep; - } + replica_ptr get_replica_by_pid(gpid pid) { return get_replica(pid); } }; } // namespace replication diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 0e2db8d67b..726c3a0aee 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -4,12 +4,13 @@ #include +#include #include "replica_test_base.h" namespace dsn { namespace replication { -class split_replica_test : public testing::Test +class replica_split_test : public testing::Test { public: void SetUp() @@ -21,13 +22,7 @@ class split_replica_test : public testing::Test mock_group_check_request(); } - void TearDown() - { - _parent->set_partition_status(partition_status::PS_INACTIVE); - _parent->reset_all(); - _parent.reset(); - _stub.reset(); - } + void TearDown() {} void mock_app_info() { @@ -46,6 +41,12 @@ class split_replica_test : public testing::Test _req.config.status = partition_status::PS_PRIMARY; } + void test_on_add_child() + { + _parent->on_add_child(_req); + _parent->tracker()->wait_outstanding_tasks(); + } + public: std::unique_ptr _stub; @@ -61,26 +62,30 @@ class split_replica_test : public testing::Test group_check_request _req; }; -TEST_F(split_replica_test, add_child_wrong_ballot) +TEST_F(replica_split_test, add_child_wrong_ballot) { _req.config.ballot = 5; - _parent->on_add_child(_req); + test_on_add_child(); ASSERT_EQ(_child, nullptr); } -TEST_F(split_replica_test, add_child_wrong_child_gpid) +TEST_F(replica_split_test, add_child_wrong_child_gpid) { _parent->set_child_gpid(_child_pid); - _parent->on_add_child(_req); + test_on_add_child(); ASSERT_EQ(_child, nullptr); } -TEST_F(split_replica_test, add_child_succeed) +TEST_F(replica_split_test, add_child_succeed) { - _parent->insert("init_child_replica"); - _parent->on_add_child(_req); - _parent->tracker()->wait_outstanding_tasks(); - ASSERT_NE(_stub->get_replica(_child_pid), nullptr); + fail::setup(); + fail::cfg("replica_stub_create_replica_if_not_found", "return()"); + fail::cfg("replica_init_child_replica", "return()"); + + test_on_add_child(); + ASSERT_NE(_stub->get_replica_by_pid(_child_pid), nullptr); + + fail::teardown(); } } // namespace replication diff --git a/src/dist/replication/test/simple_kv/CMakeLists.txt b/src/dist/replication/test/simple_kv/CMakeLists.txt index e0091e97b4..835d92d4da 100644 --- a/src/dist/replication/test/simple_kv/CMakeLists.txt +++ b/src/dist/replication/test/simple_kv/CMakeLists.txt @@ -29,7 +29,7 @@ set(MY_PROJ_LIBS dsn_replica_server gtest ) -set(MY_BOOST_LIBS Boost::system Boost::filesystem) +set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) # Extra files that will be installed FILE(GLOB CASE_FILES "case-*") diff --git a/src/tests/dsn/CMakeLists.txt b/src/tests/dsn/CMakeLists.txt index 58a3ab8498..c06a3806ee 100644 --- a/src/tests/dsn/CMakeLists.txt +++ b/src/tests/dsn/CMakeLists.txt @@ -36,7 +36,7 @@ set(MY_PROJ_LIBS ${MY_PROJ_LIBS} ) -set(MY_BOOST_LIBS Boost::system Boost::filesystem) +set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) # Extra files that will be installed set(MY_BINPLACES From b4904d03167f9b82d8c9e0e61a4b834ab322828f Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 13 Aug 2019 18:08:01 +0800 Subject: [PATCH 07/17] small fix --- src/dist/replication/lib/replica_stub.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index bfff8f2449..3b4fa8134f 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2268,7 +2268,6 @@ std::string replica_stub::get_replica_dir(const char *app_type, bool create_new, const std::string &parent_dir) { - // char gpid_str[256]; std::string gpid_str = fmt::format("{}.{}", id.to_string(), app_type); std::string replica_dir; bool is_dir_confict = false; From 85fa2344b0ba07a3405eb2b7dd6423c883310988 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 14 Aug 2019 15:46:13 +0800 Subject: [PATCH 08/17] refactor get_replica_dir --- src/dist/replication/lib/replica_stub.cpp | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 3b4fa8134f..9f40757585 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2270,25 +2270,24 @@ std::string replica_stub::get_replica_dir(const char *app_type, { std::string gpid_str = fmt::format("{}.{}", id.to_string(), app_type); std::string replica_dir; - bool is_dir_confict = false; + bool is_dir_exist = false; for (const std::string &data_dir : _options.data_dirs) { std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); - if (utils::filesystem::directory_exists(dir) && parent_dir == "") { - if (is_dir_confict) { - dassert( - false, "replica dir conflict: %s <--> %s", dir.c_str(), replica_dir.c_str()); - } - replica_dir = dir; - is_dir_confict = true; - } - // if creating child replica during partition split, we should gurantee child replica and // parent replica share the same data dir // parent_dir = /., check if parent_dir's is euqal to - if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { + if (parent_dir != "" && parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { replica_dir = dir; _fs_manager.add_replica(id, replica_dir); - break; + return replica_dir; + } + if (utils::filesystem::directory_exists(dir) && parent_dir == "") { + if (is_dir_exist) { + dassert( + false, "replica dir conflict: %s <--> %s", dir.c_str(), replica_dir.c_str()); + } + replica_dir = dir; + is_dir_exist = true; } } if (replica_dir.empty() && create_new) { From a6cf527bb66412dceaa391ff22240bccafcf8763 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 14 Aug 2019 17:42:37 +0800 Subject: [PATCH 09/17] refactor get_replica_dir --- src/dist/replication/lib/replica_stub.cpp | 29 +++++++++++++---------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 9f40757585..c0bc9321b3 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2273,24 +2273,27 @@ std::string replica_stub::get_replica_dir(const char *app_type, bool is_dir_exist = false; for (const std::string &data_dir : _options.data_dirs) { std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); - // if creating child replica during partition split, we should gurantee child replica and - // parent replica share the same data dir - // parent_dir = /., check if parent_dir's is euqal to - if (parent_dir != "" && parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { + if (parent_dir.empty()) { + if (utils::filesystem::directory_exists(dir)) { + if (is_dir_exist) { + dassert(false, + "replica dir conflict: %s <--> %s", + dir.c_str(), + replica_dir.c_str()); + } + replica_dir = dir; + is_dir_exist = true; + } + } else if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { + // during partition split, we should gurantee child replica and parent replica share the + // same data dir, = /., check if 's + // is euqal to replica_dir = dir; _fs_manager.add_replica(id, replica_dir); return replica_dir; } - if (utils::filesystem::directory_exists(dir) && parent_dir == "") { - if (is_dir_exist) { - dassert( - false, "replica dir conflict: %s <--> %s", dir.c_str(), replica_dir.c_str()); - } - replica_dir = dir; - is_dir_exist = true; - } } - if (replica_dir.empty() && create_new) { + if (replica_dir.empty() && create_new && parent_dir.empty()) { _fs_manager.allocate_dir(id, app_type, replica_dir); } return replica_dir; From d3f05fa6ab0f935a3fa968cfa2fb6620a6e2550d Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 15 Aug 2019 13:34:41 +0800 Subject: [PATCH 10/17] add get_child_dir --- src/dist/replication/lib/replica_init.cpp | 7 +++- src/dist/replication/lib/replica_stub.cpp | 50 +++++++++++++---------- src/dist/replication/lib/replica_stub.h | 11 +++-- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/dist/replication/lib/replica_init.cpp b/src/dist/replication/lib/replica_init.cpp index d7bbaa0517..6d618e81f4 100644 --- a/src/dist/replication/lib/replica_init.cpp +++ b/src/dist/replication/lib/replica_init.cpp @@ -80,7 +80,12 @@ error_code replica::initialize_on_new() bool restore_if_necessary, const std::string &parent_dir) { - std::string dir = stub->get_replica_dir(app.app_type.c_str(), gpid, true, parent_dir); + std::string dir; + if (parent_dir.empty()) { + dir = stub->get_replica_dir(app.app_type.c_str(), gpid); + } else { + dir = stub->get_child_dir(app.app_type.c_str(), gpid, parent_dir); + } replica *rep = new replica(stub, gpid, app, dir.c_str(), restore_if_necessary); error_code err; if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) { diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index c0bc9321b3..564281ea5f 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2263,46 +2263,52 @@ void replica_stub::close() } } -std::string replica_stub::get_replica_dir(const char *app_type, - gpid id, - bool create_new, - const std::string &parent_dir) +std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new) { std::string gpid_str = fmt::format("{}.{}", id.to_string(), app_type); std::string replica_dir; bool is_dir_exist = false; for (const std::string &data_dir : _options.data_dirs) { std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); - if (parent_dir.empty()) { - if (utils::filesystem::directory_exists(dir)) { - if (is_dir_exist) { - dassert(false, - "replica dir conflict: %s <--> %s", - dir.c_str(), - replica_dir.c_str()); - } - replica_dir = dir; - is_dir_exist = true; + if (utils::filesystem::directory_exists(dir)) { + if (is_dir_exist) { + dassert( + false, "replica dir conflict: %s <--> %s", dir.c_str(), replica_dir.c_str()); } - } else if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { - // during partition split, we should gurantee child replica and parent replica share the - // same data dir, = /., check if 's - // is euqal to replica_dir = dir; - _fs_manager.add_replica(id, replica_dir); - return replica_dir; + is_dir_exist = true; } } - if (replica_dir.empty() && create_new && parent_dir.empty()) { + if (replica_dir.empty() && create_new) { _fs_manager.allocate_dir(id, app_type, replica_dir); } return replica_dir; } +std::string +replica_stub::get_child_dir(const char *app_type, gpid id, const std::string &parent_dir) +{ + std::string gpid_str = fmt::format("{}.{}", id.to_string(), app_type); + std::string child_dir; + for (const std::string &data_dir : _options.data_dirs) { + std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); + // = /. + // check if 's is euqal to + if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { + child_dir = dir; + _fs_manager.add_replica(id, child_dir); + break; + } + } + if (child_dir.empty()) { + dassert_f("can not find parent_dir {} in data_dirs", parent_dir); + } + return child_dir; +} + // // partition split // - void replica_stub::create_child_replica(rpc_address primary_address, app_info app, ballot init_ballot, diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 2500f34a3b..a6a945f103 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -140,12 +140,11 @@ class replica_stub : public serverlet, public ref_counter virtual rpc_address get_meta_server_address() const { return _failure_detector->get_servers(); } rpc_address primary_address() const { return _primary_address; } - // {parent_dir} is used for partition split to gurantee child replica data dir is as same as it - // parent - std::string get_replica_dir(const char *app_type, - gpid id, - bool create_new = true, - const std::string &parent_dir = ""); + std::string get_replica_dir(const char *app_type, gpid id, bool create_new = true); + + // during partition split, we should gurantee child replica and parent replica share the + // same data dir + std::string get_child_dir(const char *app_type, gpid id, const std::string &parent_dir); // // helper methods From 508287da3d3f7f968951fdb407a4f3c13ebd8caf Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 15 Aug 2019 14:06:32 +0800 Subject: [PATCH 11/17] refactor split_replica_exec --- src/dist/replication/lib/replica_stub.cpp | 48 +++++++---------------- src/dist/replication/lib/replica_stub.h | 18 ++++----- 2 files changed, 23 insertions(+), 43 deletions(-) diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 564281ea5f..696a27d4f2 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2374,50 +2374,30 @@ replica_stub::create_replica_if_not_found(gpid pid, app_info *app, const std::st void replica_stub::split_replica_exec(task_code code, gpid pid, local_execution handler, - local_execution missing_handler, - gpid missing_handler_gpid, + local_execution error_handler, + gpid error_handler_gpid, std::chrono::milliseconds delay) { - replica_ptr rep = pid.get_app_id() == 0 ? nullptr : get_replica(pid); - replica_ptr rep2 = - missing_handler_gpid.get_app_id() == 0 ? nullptr : get_replica(missing_handler_gpid); + replica_ptr replica = pid.get_app_id() == 0 ? nullptr : get_replica(pid); + replica_ptr error_handler_replica = + error_handler_gpid.get_app_id() == 0 ? nullptr : get_replica(error_handler_gpid); - if (!rep && !rep2) { - derror_f("replica({}.{}) and replica({}.{}) are not existed", - pid.get_app_id(), - pid.get_partition_index(), - missing_handler_gpid.get_app_id(), - missing_handler_gpid.get_partition_index()); - return; - } - - if (rep && handler) { + if (replica && handler) { tasking::enqueue( - code, rep.get()->tracker(), [=]() { handler(rep); }, pid.thread_hash(), delay); - } else if (rep2 && missing_handler) { + code, replica.get()->tracker(), [=]() { handler(replica); }, pid.thread_hash(), delay); + } else if (error_handler_replica && error_handler) { ddebug_f("replica({}.{}) is invalid, replica({}.{} will execute its handler)", pid.get_app_id(), pid.get_partition_index(), - missing_handler_gpid.get_app_id(), - missing_handler_gpid.get_partition_index()); + error_handler_gpid.get_app_id(), + error_handler_gpid.get_partition_index()); tasking::enqueue(code, - rep2.get()->tracker(), - [=]() { missing_handler(rep2); }, - missing_handler_gpid.thread_hash(), + error_handler_replica.get()->tracker(), + [=]() { error_handler(error_handler_replica); }, + error_handler_gpid.thread_hash(), delay); } else { - // no handler will be executed - if (rep) { - dwarn_f("replica({}.{}) does not define handler", - pid.get_app_id(), - pid.get_partition_index()); - } else { - dwarn_f("replica({}.{}) is invalid, replica({}.{}) does not define handler", - pid.get_app_id(), - pid.get_partition_index(), - missing_handler_gpid.get_app_id(), - missing_handler_gpid.get_partition_index()); - } + // execute nothing } } diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index a6a945f103..d4600d0f07 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -176,18 +176,18 @@ class replica_stub : public serverlet, public ref_counter replica_ptr create_replica_if_not_found(gpid pid, app_info *app, const std::string &parent_dir); typedef std::function local_execution; - // During partition split, we should handle both parent and child at the same time, - // especially child replica. For example, if child replica is invalid, we should execute error - // handler. This function can be helpful for this condition. - // - if replica() is existed and valid, then add into task queue, replica() - // will execute function after milliseconds. - // - else add into task queue, replica() will execute - // function after milliseconds. + + // - if replica() is existed, replica() will execute function after + // milliseconds + // - else replica() will execute function after + // milliseconds. + // This function is helpful for partition split error handle. + // For example, if child replica is invalid, parent will cleanup split context. void split_replica_exec(task_code code, gpid pid, local_execution handler, - local_execution missing_handler, - gpid missing_handler_gpid = gpid(0, 0), + local_execution error_handler, + gpid error_handler_gpid, std::chrono::milliseconds delay = std::chrono::milliseconds(0)); void split_replica_exec(task_code code, gpid pid, From 8788128a8edaa79870ad4d19cf2e538e58f5b5f6 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 15 Aug 2019 16:43:08 +0800 Subject: [PATCH 12/17] small fix --- src/dist/replication/lib/replica_stub.cpp | 6 ++++++ src/dist/replication/lib/replica_stub.h | 8 ++++---- .../replication/test/replica_test/unit_test/mock_utils.h | 2 -- .../test/replica_test/unit_test/replica_split_test.cpp | 2 +- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 696a27d4f2..5195c2c301 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2398,6 +2398,12 @@ void replica_stub::split_replica_exec(task_code code, delay); } else { // execute nothing + dwarn_f("both replica({}.{}) and error handler replica({}.{}) are invalid, or replica not " + "define its handlers", + pid.get_app_id(), + pid.get_partition_index(), + error_handler_gpid.get_app_id(), + error_handler_gpid.get_partition_index()); } } diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index d4600d0f07..2d05ca0fe4 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -179,10 +179,10 @@ class replica_stub : public serverlet, public ref_counter // - if replica() is existed, replica() will execute function after // milliseconds - // - else replica() will execute function after - // milliseconds. - // This function is helpful for partition split error handle. - // For example, if child replica is invalid, parent will cleanup split context. + // - else replica() will execute function after + // milliseconds + // This function is helpful for partition split error handle + // For example, if child replica is invalid, parent will cleanup split context void split_replica_exec(task_code code, gpid pid, local_execution handler, diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index e32c414ee2..607fdbb1c7 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -162,8 +162,6 @@ class mock_replica_stub : public replica_stub rep->set_replica_config(config); return rep; } - - replica_ptr get_replica_by_pid(gpid pid) { return get_replica(pid); } }; } // namespace replication diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 726c3a0aee..2df6b8b66f 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -83,7 +83,7 @@ TEST_F(replica_split_test, add_child_succeed) fail::cfg("replica_init_child_replica", "return()"); test_on_add_child(); - ASSERT_NE(_stub->get_replica_by_pid(_child_pid), nullptr); + ASSERT_NE(_stub->get_replica(_child_pid), nullptr); fail::teardown(); } From cb1b6681a580377711958873ace301252c08484e Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 16 Aug 2019 13:55:00 +0800 Subject: [PATCH 13/17] small fix --- .../test/replica_test/unit_test/replica_split_test.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 2df6b8b66f..034bc4f5e7 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -64,16 +64,17 @@ class replica_split_test : public testing::Test TEST_F(replica_split_test, add_child_wrong_ballot) { - _req.config.ballot = 5; + ballot wrong_ballot = 5; + _req.config.ballot = wrong_ballot; test_on_add_child(); - ASSERT_EQ(_child, nullptr); + ASSERT_EQ(_stub->get_replica(_child_pid), nullptr); } -TEST_F(replica_split_test, add_child_wrong_child_gpid) +TEST_F(replica_split_test, add_child_with_child_existed) { _parent->set_child_gpid(_child_pid); test_on_add_child(); - ASSERT_EQ(_child, nullptr); + ASSERT_EQ(_stub->get_replica(_child_pid), nullptr); } TEST_F(replica_split_test, add_child_succeed) From 6eb8166445eaf07655a0e32b4204be3a27c52976 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 16 Aug 2019 14:21:11 +0800 Subject: [PATCH 14/17] update travis yml --- .travis.yml | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index f829556457..8bcf6bc87e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,19 +18,16 @@ addons: - clang-format-3.9 before_install: - - wget https://raw.githubusercontent.com/xiaomi/pegasus-common/master/build-depends.tar.gz + - wget https://media.githubusercontent.com/media/XiaoMi/pegasus-common/master/build-depends.tar.gz - tar xfz build-depends.tar.gz - rm -f build-depends.tar.gz - cd packages - ls | xargs sudo dpkg -i --force-depends - cd .. -install: - # - ./run.sh format - before_script: - cd thirdparty - - wget https://raw.githubusercontent.com/xiaomi/pegasus-common/master/pegasus-thirdparty-prebuild.tar.gz + - wget https://media.githubusercontent.com/media/XiaoMi/pegasus-common/master/pegasus-thirdparty-prebuild.tar.gz - tar xfz pegasus-thirdparty-prebuild.tar.gz - rm -f pegasus-thirdparty-prebuild.tar.gz - cd .. From 24d165fe357b3e0d30910d4f93d5bcc0ae9270e6 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 19 Aug 2019 14:42:04 +0800 Subject: [PATCH 15/17] fix according to code review --- .../dsn/dist/replication/replication.codes.h | 1 + src/dist/replication/lib/replica.h | 5 +- src/dist/replication/lib/replica_context.h | 1 + src/dist/replication/lib/replica_split.cpp | 39 +++--- src/dist/replication/lib/replica_stub.cpp | 118 +++++++++--------- src/dist/replication/lib/replica_stub.h | 31 ++--- .../unit_test/replica_split_test.cpp | 2 +- 7 files changed, 105 insertions(+), 92 deletions(-) diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 2953d1849a..a76f037710 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -67,6 +67,7 @@ MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_PRIVATE, TASK_PRIORITY_HIGH) MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_SHARED, TASK_PRIORITY_HIGH) MAKE_EVENT_CODE(LPC_QUERY_CONFIGURATION_ALL, TASK_PRIORITY_HIGH) MAKE_EVENT_CODE(LPC_MEM_RELEASE, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL // THREAD_POOL_META_SERVER diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index c54111fa48..6fa14c6919 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -76,7 +76,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // routines for replica stub // static replica *load(replica_stub *stub, const char *dir); - // {parent_dir} is used in partition split for get_replica_dir in replica_stub + // {parent_dir} is used in partition split for get_child_dir in replica_stub static replica *newr(replica_stub *stub, gpid gpid, const app_info &app, @@ -321,6 +321,9 @@ class replica : public serverlet, public ref_counter, public replica_ba // child replica initialize config and state info void init_child_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot); + // parent reset child information when partition split failed + void clean_up_parent_split_context(); + private: friend class ::dsn::replication::replication_checker; friend class ::dsn::replication::test::test_checker; diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index 23c800178f..a3ee9acd6c 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -527,6 +527,7 @@ class partition_split_context { public: partition_split_context() {} + // TODO(heyuchen): force will be used in further pull request bool cleanup(bool force); public: diff --git a/src/dist/replication/lib/replica_split.cpp b/src/dist/replication/lib/replica_split.cpp index bbc6a1577c..4f7c9f6edc 100644 --- a/src/dist/replication/lib/replica_split.cpp +++ b/src/dist/replication/lib/replica_split.cpp @@ -32,19 +32,16 @@ void replica::on_add_child(const group_check_request &request) // on parent part gpid child_gpid = request.child_gpid; if (_child_gpid == child_gpid) { - dwarn_replica( - "child replica({}.{}) is already existed, might be partition splitting, ignore " - "this request", - child_gpid.get_app_id(), - child_gpid.get_partition_index()); + dwarn_replica("child replica({}) is already existed, might be partition splitting, ignore " + "this request", + child_gpid); return; } if (child_gpid.get_partition_index() < _app_info.partition_count) { - dwarn_replica("receive old add child request, child gpid is ({}.{}), " + dwarn_replica("receive old add child request, child gpid is ({}), " "local partition count is {}, ignore this request", - child_gpid.get_app_id(), - child_gpid.get_partition_index(), + child_gpid, _app_info.partition_count); return; } @@ -52,16 +49,15 @@ void replica::on_add_child(const group_check_request &request) // on parent part _child_gpid = child_gpid; _child_init_ballot = get_ballot(); - ddebug_replica("process add child({}.{}), primary is {}, ballot is {}, " + ddebug_replica("process add child({}), primary is {}, ballot is {}, " "status is {}, last_committed_decree is {}", - child_gpid.get_app_id(), - child_gpid.get_partition_index(), + child_gpid, request.config.primary.to_string(), request.config.ballot, enum_to_string(request.config.status), request.last_committed_decree); - tasking::enqueue(LPC_PARTITION_SPLIT, + tasking::enqueue(LPC_CREATE_CHILD, tracker(), std::bind(&replica_stub::create_child_replica, _stub, @@ -79,13 +75,13 @@ void replica::init_child_replica(gpid parent_gpid, rpc_address primary_address, ballot init_ballot) // on child partition { - FAIL_POINT_INJECT_F("replica_init_child_replica", [](dsn::string_view) {}); + FAIL_POINT_INJECT_F("replica_init_child_replica", + [](dsn::string_view) { ddebug_f("mock init_child_replica succeed"); }); if (status() != partition_status::PS_INACTIVE) { dwarn_replica("wrong status {}", enum_to_string(status())); - _stub->split_replica_exec(LPC_PARTITION_SPLIT_ERROR, parent_gpid, [](replica_ptr r) { - r->_child_gpid.set_app_id(0); - }); + _stub->split_replica_error_handler(parent_gpid, + [](replica_ptr r) { r->_child_gpid.set_app_id(0); }); return; } @@ -97,10 +93,13 @@ void replica::init_child_replica(gpid parent_gpid, // init split states _split_states.parent_gpid = parent_gpid; - ddebug_replica("init ballot is {}, parent gpid is ({}.{})", - init_ballot, - parent_gpid.get_app_id(), - parent_gpid.get_partition_index()); + ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid); +} + +void replica::clean_up_parent_split_context() +{ + _child_gpid.set_app_id(0); + _child_init_ballot = 0; } } // namespace replication diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 5195c2c301..4204653929 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2265,7 +2265,7 @@ void replica_stub::close() std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new) { - std::string gpid_str = fmt::format("{}.{}", id.to_string(), app_type); + std::string gpid_str = fmt::format("{}.{}", id, app_type); std::string replica_dir; bool is_dir_exist = false; for (const std::string &data_dir : _options.data_dirs) { @@ -2286,9 +2286,9 @@ std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool cr } std::string -replica_stub::get_child_dir(const char *app_type, gpid id, const std::string &parent_dir) +replica_stub::get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir) { - std::string gpid_str = fmt::format("{}.{}", id.to_string(), app_type); + std::string gpid_str = fmt::format("{}.{}", child_pid.to_string(), app_type); std::string child_dir; for (const std::string &data_dir : _options.data_dirs) { std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); @@ -2296,13 +2296,11 @@ replica_stub::get_child_dir(const char *app_type, gpid id, const std::string &pa // check if 's is euqal to if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { child_dir = dir; - _fs_manager.add_replica(id, child_dir); + _fs_manager.add_replica(child_pid, child_dir); break; } } - if (child_dir.empty()) { - dassert_f("can not find parent_dir {} in data_dirs", parent_dir); - } + dassert_f(!child_dir.empty(), "can not find parent_dir {} in data_dirs", parent_dir); return child_dir; } @@ -2316,63 +2314,63 @@ void replica_stub::create_child_replica(rpc_address primary_address, gpid parent_gpid, const std::string &parent_dir) { - replica_ptr child_replica = create_replica_if_not_found(child_gpid, &app, parent_dir); + replica_ptr child_replica = create_child_replica_if_not_found(child_gpid, &app, parent_dir); if (child_replica != nullptr) { - ddebug_f("create child replica ({}.{}) succeed", - child_gpid.get_app_id(), - child_gpid.get_partition_index()); - child_replica->init_child_replica(parent_gpid, primary_address, init_ballot); + ddebug_f("create child replica ({}) succeed", child_gpid); + tasking::enqueue(LPC_PARTITION_SPLIT, + &_tracker, + std::bind(&replica::init_child_replica, + child_replica, + parent_gpid, + primary_address, + init_ballot), + child_gpid.thread_hash()); } else { - dwarn_f("failed to create child replica ({}.{}), ignore it and wait next run", - child_gpid.get_app_id(), - child_gpid.get_partition_index()); - split_replica_exec(LPC_PARTITION_SPLIT_ERROR, parent_gpid, [](replica_ptr r) { - r->_child_gpid.set_app_id(0); - }); + dwarn_f("failed to create child replica ({}), ignore it and wait next run", child_gpid); + split_replica_error_handler(parent_gpid, + [](replica_ptr r) { r->_child_gpid.set_app_id(0); }); } } -replica_ptr -replica_stub::create_replica_if_not_found(gpid pid, app_info *app, const std::string &parent_dir) +replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, + app_info *app, + const std::string &parent_dir) { - FAIL_POINT_INJECT_F("replica_stub_create_replica_if_not_found", + FAIL_POINT_INJECT_F("replica_stub_create_child_replica_if_not_found", [=](dsn::string_view) -> replica_ptr { - replica *rep = new replica(this, pid, *app, "./", false); + replica *rep = new replica(this, child_pid, *app, "./", false); rep->_config.status = partition_status::PS_INACTIVE; - _replicas.insert(replicas::value_type(pid, rep)); + _replicas.insert(replicas::value_type(child_pid, rep)); + ddebug_f("mock create_child_replica_if_not_found succeed"); return rep; }); zauto_write_lock l(_replicas_lock); - auto it = _replicas.find(pid); + auto it = _replicas.find(child_pid); if (it != _replicas.end()) { return it->second; } else { - if (_opening_replicas.find(pid) != _opening_replicas.end()) { - ddebug_f("failed create new replica({}.{}) because it is under open", - pid.get_app_id(), - pid.get_partition_index()); + if (_opening_replicas.find(child_pid) != _opening_replicas.end()) { + dwarn_f("failed create child replica({}) because it is under open", child_pid); return nullptr; - } else if (_closing_replicas.find(pid) != _closing_replicas.end()) { - ddebug_f("failed create new replica({}.{}) because it is under close", - pid.get_app_id(), - pid.get_partition_index()); + } else if (_closing_replicas.find(child_pid) != _closing_replicas.end()) { + dwarn_f("failed create child replica({}) because it is under close", child_pid); return nullptr; } else { - replica *rep = replica::newr(this, pid, *app, false, parent_dir); + replica *rep = replica::newr(this, child_pid, *app, false, parent_dir); if (rep != nullptr) { - auto pr = _replicas.insert(replicas::value_type(pid, rep)); - dassert_f(pr.second, "replica {} has been existed", rep->name()); + auto pr = _replicas.insert(replicas::value_type(child_pid, rep)); + dassert_f(pr.second, "child replica {} has been existed", rep->name()); _counter_replicas_count->increment(); - _closed_replicas.erase(pid); + _closed_replicas.erase(child_pid); } return rep; } } } -void replica_stub::split_replica_exec(task_code code, - gpid pid, +// ThreadPool: THREAD_POOL_REPLICATION +void replica_stub::split_replica_exec(gpid pid, local_execution handler, local_execution error_handler, gpid error_handler_gpid, @@ -2383,36 +2381,44 @@ void replica_stub::split_replica_exec(task_code code, error_handler_gpid.get_app_id() == 0 ? nullptr : get_replica(error_handler_gpid); if (replica && handler) { - tasking::enqueue( - code, replica.get()->tracker(), [=]() { handler(replica); }, pid.thread_hash(), delay); + tasking::enqueue(LPC_PARTITION_SPLIT, + replica.get()->tracker(), + [=]() { handler(replica); }, + pid.thread_hash(), + delay); } else if (error_handler_replica && error_handler) { - ddebug_f("replica({}.{}) is invalid, replica({}.{} will execute its handler)", - pid.get_app_id(), - pid.get_partition_index(), - error_handler_gpid.get_app_id(), - error_handler_gpid.get_partition_index()); - tasking::enqueue(code, + ddebug_f("replica({}) is invalid, replica({}) will execute its handler)", + pid, + error_handler_gpid); + tasking::enqueue(LPC_PARTITION_SPLIT_ERROR, error_handler_replica.get()->tracker(), [=]() { error_handler(error_handler_replica); }, error_handler_gpid.thread_hash(), delay); } else { // execute nothing - dwarn_f("both replica({}.{}) and error handler replica({}.{}) are invalid, or replica not " + dwarn_f("both replica({}) and error handler replica({}) are invalid, or replica not " "define its handlers", - pid.get_app_id(), - pid.get_partition_index(), - error_handler_gpid.get_app_id(), - error_handler_gpid.get_partition_index()); + pid, + error_handler_gpid); } } -void replica_stub::split_replica_exec(task_code code, - gpid pid, - local_execution handler, - std::chrono::milliseconds delay) +// ThreadPool: THREAD_POOL_REPLICATION +void replica_stub::split_replica_error_handler(gpid pid, + local_execution handler, + std::chrono::milliseconds delay) { - split_replica_exec(code, pid, handler, nullptr, gpid(), delay); + replica_ptr replica = pid.get_app_id() == 0 ? nullptr : get_replica(pid); + if (replica && handler) { + tasking::enqueue(LPC_PARTITION_SPLIT_ERROR, + replica.get()->tracker(), + [=]() { handler(replica); }, + pid.thread_hash(), + delay); + } else { + dwarn_f("replica({}) is invalid or not define its handler", pid); + } } } // namespace replication diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 2d05ca0fe4..54b9e797ea 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -144,7 +144,7 @@ class replica_stub : public serverlet, public ref_counter // during partition split, we should gurantee child replica and parent replica share the // same data dir - std::string get_child_dir(const char *app_type, gpid id, const std::string &parent_dir); + std::string get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir); // // helper methods @@ -173,26 +173,29 @@ class replica_stub : public serverlet, public ref_counter // create a new replica instance if not found // return nullptr when failed to create new replica - replica_ptr create_replica_if_not_found(gpid pid, app_info *app, const std::string &parent_dir); + replica_ptr + create_child_replica_if_not_found(gpid child_pid, app_info *app, const std::string &parent_dir); typedef std::function local_execution; - // - if replica() is existed, replica() will execute function after - // milliseconds - // - else replica() will execute function after - // milliseconds - // This function is helpful for partition split error handle - // For example, if child replica is invalid, parent will cleanup split context - void split_replica_exec(task_code code, - gpid pid, + // This function is used during partition split + // - case1. parent want child execute , child will execute if child existed, + // otherwise parent will execute + // - case2. child want parent execute , parent will execute if parent + // existed, otherwise child will execute + void split_replica_exec(gpid pid, local_execution handler, local_execution error_handler, gpid error_handler_gpid, std::chrono::milliseconds delay = std::chrono::milliseconds(0)); - void split_replica_exec(task_code code, - gpid pid, - local_execution handler, - std::chrono::milliseconds delay = std::chrono::milliseconds(0)); + + // This function is used for partition split error handler + // if partition split meet error, parent/child may want child/parent execute error handler + // if replica exist, execute , otherwise return + void + split_replica_error_handler(gpid pid, + local_execution handler, + std::chrono::milliseconds delay = std::chrono::milliseconds(0)); private: enum replica_node_state diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 034bc4f5e7..55b74c2b5e 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -80,7 +80,7 @@ TEST_F(replica_split_test, add_child_with_child_existed) TEST_F(replica_split_test, add_child_succeed) { fail::setup(); - fail::cfg("replica_stub_create_replica_if_not_found", "return()"); + fail::cfg("replica_stub_create_child_replica_if_not_found", "return()"); fail::cfg("replica_init_child_replica", "return()"); test_on_add_child(); From dc281d38eed76c180b1c3c087f3d7acc909ba084 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 20 Aug 2019 17:52:22 +0800 Subject: [PATCH 16/17] small refactor --- src/dist/replication/lib/replica_stub.cpp | 36 ++++++----------------- src/dist/replication/lib/replica_stub.h | 20 +++++-------- 2 files changed, 17 insertions(+), 39 deletions(-) diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 4204653929..e6c377cb9b 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2293,7 +2293,7 @@ replica_stub::get_child_dir(const char *app_type, gpid child_pid, const std::str for (const std::string &data_dir : _options.data_dirs) { std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); // = /. - // check if 's is euqal to + // check if 's is equal to if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { child_dir = dir; _fs_manager.add_replica(child_pid, child_dir); @@ -2373,51 +2373,33 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, void replica_stub::split_replica_exec(gpid pid, local_execution handler, local_execution error_handler, - gpid error_handler_gpid, - std::chrono::milliseconds delay) + gpid error_handler_gpid) { replica_ptr replica = pid.get_app_id() == 0 ? nullptr : get_replica(pid); replica_ptr error_handler_replica = error_handler_gpid.get_app_id() == 0 ? nullptr : get_replica(error_handler_gpid); if (replica && handler) { - tasking::enqueue(LPC_PARTITION_SPLIT, - replica.get()->tracker(), - [=]() { handler(replica); }, - pid.thread_hash(), - delay); + handler(replica); } else if (error_handler_replica && error_handler) { ddebug_f("replica({}) is invalid, replica({}) will execute its handler)", pid, error_handler_gpid); - tasking::enqueue(LPC_PARTITION_SPLIT_ERROR, - error_handler_replica.get()->tracker(), - [=]() { error_handler(error_handler_replica); }, - error_handler_gpid.thread_hash(), - delay); + error_handler(error_handler_replica); } else { - // execute nothing - dwarn_f("both replica({}) and error handler replica({}) are invalid, or replica not " - "define its handlers", - pid, - error_handler_gpid); + dwarn_f( + "both replica({}) and error handler replica({}) are invalid", pid, error_handler_gpid); } } // ThreadPool: THREAD_POOL_REPLICATION -void replica_stub::split_replica_error_handler(gpid pid, - local_execution handler, - std::chrono::milliseconds delay) +void replica_stub::split_replica_error_handler(gpid pid, local_execution handler) { replica_ptr replica = pid.get_app_id() == 0 ? nullptr : get_replica(pid); if (replica && handler) { - tasking::enqueue(LPC_PARTITION_SPLIT_ERROR, - replica.get()->tracker(), - [=]() { handler(replica); }, - pid.thread_hash(), - delay); + handler(replica); } else { - dwarn_f("replica({}) is invalid or not define its handler", pid); + dwarn_f("replica({}) is invalid", pid); } } diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 54b9e797ea..a11288a78d 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -178,24 +178,20 @@ class replica_stub : public serverlet, public ref_counter typedef std::function local_execution; - // This function is used during partition split - // - case1. parent want child execute , child will execute if child existed, - // otherwise parent will execute + // This function is used for partition split, caller(replica) + // - case1. parent want child execute , child will execute if child is + // valid(.app_id>0) and existed, otherwise parent will execute // - case2. child want parent execute , parent will execute if parent - // existed, otherwise child will execute + // exist, otherwise child will execute void split_replica_exec(gpid pid, local_execution handler, local_execution error_handler, - gpid error_handler_gpid, - std::chrono::milliseconds delay = std::chrono::milliseconds(0)); + gpid error_handler_gpid); - // This function is used for partition split error handler + // This function is used for partition split error handler, caller(replica) // if partition split meet error, parent/child may want child/parent execute error handler - // if replica exist, execute , otherwise return - void - split_replica_error_handler(gpid pid, - local_execution handler, - std::chrono::milliseconds delay = std::chrono::milliseconds(0)); + // if replica valid and exist, execute , otherwise return + void split_replica_error_handler(gpid pid, local_execution handler); private: enum replica_node_state From 12882e413ddacbc479d1fa75bfe077ac117b9590 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 21 Aug 2019 09:25:19 +0800 Subject: [PATCH 17/17] add comments --- src/dist/replication/lib/replica.h | 2 +- src/dist/replication/lib/replica_stub.cpp | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 6fa14c6919..a035f923c5 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -410,7 +410,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // partition split // _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition - // _child_gpid.app_id = 0 if parent partition not during partition split and child partition + // _child_gpid.app_id = 0 for parent partition not during partition split and child partition dsn::gpid _child_gpid{0, 0}; // ballot when starting partition split coz split will stop if ballot changed // _child_init_ballot = 0 if partition not during partition split diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index e6c377cb9b..6e3ba23cfc 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2375,6 +2375,7 @@ void replica_stub::split_replica_exec(gpid pid, local_execution error_handler, gpid error_handler_gpid) { + // app_id = 0 means child replica is invalid replica_ptr replica = pid.get_app_id() == 0 ? nullptr : get_replica(pid); replica_ptr error_handler_replica = error_handler_gpid.get_app_id() == 0 ? nullptr : get_replica(error_handler_gpid); @@ -2395,6 +2396,7 @@ void replica_stub::split_replica_exec(gpid pid, // ThreadPool: THREAD_POOL_REPLICATION void replica_stub::split_replica_error_handler(gpid pid, local_execution handler) { + // app_id = 0 means child replica is invalid replica_ptr replica = pid.get_app_id() == 0 ? nullptr : get_replica(pid); if (replica && handler) { handler(replica);