From 59982325218b37f0007664facb0913d22dc5045c Mon Sep 17 00:00:00 2001 From: qinzuoyan Date: Thu, 17 Jan 2019 12:21:39 +0800 Subject: [PATCH 1/2] replica-server: add counters of recent_read_fail_count and recent_write_fail_count --- src/dist/replication/lib/replica.cpp | 38 ++------ src/dist/replication/lib/replica.h | 6 +- src/dist/replication/lib/replica_2pc.cpp | 12 +-- src/dist/replication/lib/replica_config.cpp | 4 +- src/dist/replication/lib/replica_failover.cpp | 6 +- src/dist/replication/lib/replica_stub.cpp | 95 ++++++++++++------- src/dist/replication/lib/replica_stub.h | 13 ++- 7 files changed, 96 insertions(+), 78 deletions(-) diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 2a9498d8c3..0db6cd48a6 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -42,7 +42,7 @@ namespace replication { replica::replica( replica_stub *stub, gpid gpid, const app_info &app, const char *dir, bool need_restore) : serverlet("replica"), - replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address.to_string())), + replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str)), _app_info(app), _primary_states( gpid, stub->options().staleness_for_commit, stub->options().batch_write_disabled), @@ -140,8 +140,7 @@ void replica::on_client_read(task_code code, dsn::message_ex *request) { if (status() == partition_status::PS_INACTIVE || status() == partition_status::PS_POTENTIAL_SECONDARY) { - derror("%s: invalid status: partition_status=%s", name(), enum_to_string(status())); - response_client_message(true, request, ERR_INVALID_STATE); + response_client(READ, request, ERR_INVALID_STATE); return; } @@ -150,18 +149,16 @@ void replica::on_client_read(task_code code, dsn::message_ex *request) // a small window where the state is not the latest yet last_committed_decree() < _primary_states.last_prepare_decree_on_new_primary) { if (status() != partition_status::PS_PRIMARY) { - derror("%s: invalid status: partition_status=%s", name(), enum_to_string(status())); - response_client_message(true, request, ERR_INVALID_STATE); + response_client(READ, request, ERR_INVALID_STATE); return; } if (last_committed_decree() < _primary_states.last_prepare_decree_on_new_primary) { - derror("%s: last_committed_decree(%" PRId64 - ") < last_prepare_decree_on_new_primary(%" PRId64 ")", - name(), - last_committed_decree(), - _primary_states.last_prepare_decree_on_new_primary); - response_client_message(true, request, ERR_INVALID_STATE); + derror_replica("last_committed_decree(%" PRId64 + ") < last_prepare_decree_on_new_primary(%" PRId64 ")", + last_committed_decree(), + _primary_states.last_prepare_decree_on_new_primary); + response_client(READ, request, ERR_INVALID_STATE); return; } } @@ -170,24 +167,9 @@ void replica::on_client_read(task_code code, dsn::message_ex *request) _app->on_request(request); } -void replica::response_client_message(bool is_read, dsn::message_ex *request, error_code error) +void replica::response_client(bool is_read, dsn::message_ex *request, error_code error) { - if (nullptr == request) { - return; - } - - dsn_log_level_t level = LOG_LEVEL_INFORMATION; - if (_stub->_verbose_client_log && error != ERR_OK) { - level = LOG_LEVEL_ERROR; - } - dlog(level, - "%s: reply client %s to %s, err = %s", - name(), - is_read ? "read" : "write", - request->header->from_address.to_string(), - error.to_string()); - - dsn_rpc_reply(request->create_response(), error); + _stub->response_client(get_gpid(), is_read, request, status(), error); } // error_code replica::check_and_fix_private_log_completeness() diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 8643d3cc9d..4266a954d8 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -68,6 +68,10 @@ class test_checker; class replica : public serverlet, public ref_counter, public replica_base { +public: + static const bool READ = true; + static const bool WRITE = false; + public: ~replica(void); @@ -160,7 +164,7 @@ class replica : public serverlet, public ref_counter, public replica_ba private: // common helpers void init_state(); - void response_client_message(bool is_read, dsn::message_ex *request, error_code error); + void response_client(bool is_read, dsn::message_ex *request, error_code error); void execute_mutation(mutation_ptr &mu); mutation_ptr new_mutation(decree decree); diff --git a/src/dist/replication/lib/replica_2pc.cpp b/src/dist/replication/lib/replica_2pc.cpp index f182ee19dd..ed49c37db9 100644 --- a/src/dist/replication/lib/replica_2pc.cpp +++ b/src/dist/replication/lib/replica_2pc.cpp @@ -55,18 +55,18 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign task_spec *spec = task_spec::get(code); if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) { - response_client_message(false, request, ERR_OPERATION_DISABLED); + response_client(WRITE, request, ERR_OPERATION_DISABLED); return; } if (partition_status::PS_PRIMARY != status()) { - response_client_message(false, request, ERR_INVALID_STATE); + response_client(WRITE, request, ERR_INVALID_STATE); return; } if (static_cast(_primary_states.membership.secondaries.size()) + 1 < _options->mutation_2pc_min_replica_count) { - response_client_message(false, request, ERR_NOT_ENOUGH_MEMBER); + response_client(WRITE, request, ERR_NOT_ENOUGH_MEMBER); return; } @@ -88,12 +88,12 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign tasking::enqueue(LPC_WRITE_THROTTLING_DELAY, &_tracker, [ this, req = message_ptr(request) ]() { - response_client_message(false, req, ERR_BUSY); + response_client(WRITE, req, ERR_BUSY); }, get_gpid().thread_hash(), std::chrono::milliseconds(delay_ms)); } else { - response_client_message(false, request, ERR_BUSY); + response_client(WRITE, request, ERR_BUSY); } _counter_recent_write_throttling_reject_count->increment(); } @@ -227,7 +227,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation) ErrOut: for (auto &r : mu->client_requests) { - response_client_message(false, r, err); + response_client(WRITE, r, err); } return; } diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 5cf640a07e..6562da72de 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -102,7 +102,7 @@ void replica::assign_primary(configuration_update_request &proposal) dassert(proposal.node == _stub->_primary_address, "%s VS %s", proposal.node.to_string(), - _stub->_primary_address.to_string()); + _stub->_primary_address_str); if (status() == partition_status::PS_PRIMARY) { dwarn("%s: invalid assgin primary proposal as the node is in %s", @@ -978,7 +978,7 @@ bool replica::update_local_configuration(const replica_configuration &config, _primary_states.write_queue.clear(queued); for (auto &m : queued) { for (auto &r : m->client_requests) { - response_client_message(false, r, ERR_NOT_ENOUGH_MEMBER); + response_client(WRITE, r, ERR_NOT_ENOUGH_MEMBER); } } } diff --git a/src/dist/replication/lib/replica_failover.cpp b/src/dist/replication/lib/replica_failover.cpp index c4131f0317..68d167f998 100644 --- a/src/dist/replication/lib/replica_failover.cpp +++ b/src/dist/replication/lib/replica_failover.cpp @@ -70,10 +70,8 @@ void replica::handle_remote_failure(partition_status::type st, dassert(status() == partition_status::PS_PRIMARY, "invalid partition_status, status = %s", enum_to_string(status())); - dassert(node != _stub->_primary_address, - "%s VS %s", - node.to_string(), - _stub->_primary_address.to_string()); + dassert( + node != _stub->_primary_address, "%s VS %s", node.to_string(), _stub->_primary_address_str); switch (st) { case partition_status::PS_SECONDARY: diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 11b3ecbee5..6ee461f8fb 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -75,6 +75,7 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, _failure_detector = nullptr; _state = NS_Disconnected; _log = nullptr; + _primary_address_str[0] = '\0'; install_perf_counters(); } @@ -266,6 +267,15 @@ void replica_stub::install_perf_counters() "cold.backup.max.upload.file.size", COUNTER_TYPE_NUMBER, "current cold backup max upload file size"); + + _counter_recent_read_fail_count.init_app_counter("eon.replica_stub", + "recent.read.fail.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "read fail count in the recent period"); + _counter_recent_write_fail_count.init_app_counter("eon.replica_stub", + "recent.write.fail.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "write fail count in the recent period"); } void replica_stub::initialize(bool clear /* = false*/) @@ -278,7 +288,8 @@ void replica_stub::initialize(bool clear /* = false*/) void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/) { _primary_address = dsn_primary_address(); - ddebug("primary_address = %s", _primary_address.to_string()); + strcpy(_primary_address_str, _primary_address.to_string()); + ddebug("primary_address = %s", _primary_address_str); set_options(opts); std::ostringstream oss; @@ -691,11 +702,19 @@ void replica_stub::on_client_write(gpid id, dsn::message_ex *request) // ignore and do not reply return; } + if (_verbose_client_log && request) { + ddebug("%s@%s: client = %s, code = %s, timeout = %d", + id.to_string(), + _primary_address_str, + request->header->from_address.to_string(), + request->header->rpc_name, + request->header->client.timeout_ms); + } replica_ptr rep = get_replica(id); if (rep != nullptr) { rep->on_client_write(request->rpc_code(), request); } else { - response_client_error(id, false, request, ERR_OBJECT_NOT_FOUND); + response_client(id, false, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND); } } @@ -705,11 +724,19 @@ void replica_stub::on_client_read(gpid id, dsn::message_ex *request) // ignore and do not reply return; } + if (_verbose_client_log && request) { + ddebug("%s@%s: client = %s, code = %s, timeout = %d", + id.to_string(), + _primary_address_str, + request->header->from_address.to_string(), + request->header->rpc_name, + request->header->client.timeout_ms); + } replica_ptr rep = get_replica(id); if (rep != nullptr) { rep->on_client_read(request->rpc_code(), request); } else { - response_client_error(id, true, request, ERR_OBJECT_NOT_FOUND); + response_client(id, true, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND); } } @@ -718,7 +745,7 @@ void replica_stub::on_config_proposal(const configuration_update_request &propos if (!is_connected()) { dwarn("%s@%s: received config proposal %s for %s: not connected, ignore", proposal.config.pid.to_string(), - _primary_address.to_string(), + _primary_address_str, enum_to_string(proposal.type), proposal.node.to_string()); return; @@ -726,7 +753,7 @@ void replica_stub::on_config_proposal(const configuration_update_request &propos ddebug("%s@%s: received config proposal %s for %s", proposal.config.pid.to_string(), - _primary_address.to_string(), + _primary_address_str, enum_to_string(proposal.type), proposal.node.to_string()); @@ -884,14 +911,14 @@ void replica_stub::on_group_check(const group_check_request &request, if (!is_connected()) { dwarn("%s@%s: received group check: not connected, ignore", request.config.pid.to_string(), - _primary_address.to_string()); + _primary_address_str); return; } ddebug("%s@%s: received group check, primary = %s, ballot = %" PRId64 ", status = %s, last_committed_decree = %" PRId64, request.config.pid.to_string(), - _primary_address.to_string(), + _primary_address_str, request.config.primary.to_string(), request.config.ballot, enum_to_string(request.config.status), @@ -958,7 +985,7 @@ void replica_stub::on_add_learner(const group_check_request &request) if (!is_connected()) { dwarn("%s@%s: received add learner: not connected, ignore", request.config.pid.to_string(), - _primary_address.to_string(), + _primary_address_str, request.config.primary.to_string()); return; } @@ -966,7 +993,7 @@ void replica_stub::on_add_learner(const group_check_request &request) ddebug("%s@%s: received add learner, primary = %s, ballot = %" PRId64 ", status = %s, last_committed_decree = %" PRId64, request.config.pid.to_string(), - _primary_address.to_string(), + _primary_address_str, request.config.primary.to_string(), request.config.ballot, enum_to_string(request.config.status), @@ -1202,12 +1229,12 @@ void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_, ddebug("%s@%s: replica not exists on replica server, which is primary, remove it " "from meta server", req.config.pid.to_string(), - _primary_address.to_string()); + _primary_address_str); remove_replica_on_meta_server(req.info, req.config); } else { ddebug("%s@%s: replica not exists on replica server, which is not primary, just ignore", req.config.pid.to_string(), - _primary_address.to_string()); + _primary_address_str); } } } @@ -1299,31 +1326,31 @@ void replica_stub::on_meta_server_disconnected_scatter(replica_stub_ptr this_, g } } -void replica_stub::response_client_error(gpid id, - bool is_read, - dsn::message_ex *request, - error_code error) +void replica_stub::response_client(gpid id, + bool is_read, + dsn::message_ex *request, + partition_status::type status, + error_code error) { - if (nullptr == request) { - return; - } - - if (error == ERR_OK) { - dinfo("%s@%s: reply client %s to %s, err = %s", - id.to_string(), - _primary_address.to_string(), - is_read ? "read" : "write", - request->header->from_address.to_string(), - error.to_string()); - } else { - derror("%s@%s: reply client %s to %s, err = %s", + if (error != ERR_OK) { + if (is_read) + _counter_recent_read_fail_count->increment(); + else + _counter_recent_write_fail_count->increment(); + derror("%s@%s: %s fail: client = %s, code = %s, timeout = %d, status = %s, error = %s", id.to_string(), - _primary_address.to_string(), + _primary_address_str, is_read ? "read" : "write", - request->header->from_address.to_string(), + request == nullptr ? "null" : request->header->from_address.to_string(), + request == nullptr ? "null" : request->header->rpc_name, + request == nullptr ? 0 : request->header->client.timeout_ms, + enum_to_string(status), error.to_string()); } - dsn_rpc_reply(request->create_response(), error); + + if (request != nullptr) { + dsn_rpc_reply(request->create_response(), error); + } } void replica_stub::init_gc_for_test() @@ -1701,7 +1728,7 @@ void replica_stub::open_replica(const app_info &app, // process below ddebug("%s@%s: start to load replica %s group check, dir = %s", id.to_string(), - _primary_address.to_string(), + _primary_address_str, req ? "with" : "without", dir.c_str()); rep = replica::load(this, dir.c_str()); @@ -1733,7 +1760,7 @@ void replica_stub::open_replica(const app_info &app, if (rep == nullptr) { ddebug("%s@%s: open replica failed, erase from opening replicas", id.to_string(), - _primary_address.to_string()); + _primary_address_str); zauto_write_lock l(_replicas_lock); auto ret = _opening_replicas.erase(id); dassert(ret > 0, "replica %s is not in _opening_replicas", id.to_string()); @@ -2084,7 +2111,7 @@ replica_stub::exec_command_on_replica(const std::vector &args, std::stringstream query_state; query_state << processed << " processed, " << not_found << " not found"; for (auto &kv : results) { - query_state << "\n " << kv.first.to_string() << "@" << _primary_address.to_string(); + query_state << "\n " << kv.first.to_string() << "@" << _primary_address_str; if (kv.second.first != partition_status::PS_INVALID) query_state << "@" << (kv.second.first == partition_status::PS_PRIMARY ? "P" : "S"); query_state << " : " << kv.second.second; diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index b04534907e..eaf83d26e8 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -199,6 +199,12 @@ class replica_stub : public serverlet, public ref_counter replica_life_cycle get_replica_life_cycle(gpid id); void on_gc_replica(replica_stub_ptr this_, gpid id); + void response_client(gpid id, + bool is_read, + dsn::message_ex *request, + partition_status::type status, + error_code error); + private: friend class ::dsn::replication::replication_checker; friend class ::dsn::replication::test::test_checker; @@ -219,6 +225,7 @@ class replica_stub : public serverlet, public ref_counter mutation_log_ptr _log; ::dsn::rpc_address _primary_address; + char _primary_address_str[64]; ::dsn::dist::slave_failure_detector_with_multimaster *_failure_detector; mutable zlock _state_lock; @@ -312,10 +319,10 @@ class replica_stub : public serverlet, public ref_counter perf_counter_wrapper _counter_cold_backup_max_duration_time_ms; perf_counter_wrapper _counter_cold_backup_max_upload_file_size; - dsn::task_tracker _tracker; + perf_counter_wrapper _counter_recent_read_fail_count; + perf_counter_wrapper _counter_recent_write_fail_count; -private: - void response_client_error(gpid id, bool is_read, dsn::message_ex *request, error_code error); + dsn::task_tracker _tracker; }; //------------ inline impl ---------------------- } From 26c05e4350ec2ad6ff171916ccb8c104ba1d0ffc Mon Sep 17 00:00:00 2001 From: qinzuoyan Date: Thu, 17 Jan 2019 16:42:53 +0800 Subject: [PATCH 2/2] fix according to code review --- src/dist/replication/lib/replica.cpp | 15 ++++++++++----- src/dist/replication/lib/replica.h | 7 ++----- src/dist/replication/lib/replica_2pc.cpp | 12 ++++++------ src/dist/replication/lib/replica_config.cpp | 2 +- 4 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 0db6cd48a6..1a74d59248 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -140,7 +140,7 @@ void replica::on_client_read(task_code code, dsn::message_ex *request) { if (status() == partition_status::PS_INACTIVE || status() == partition_status::PS_POTENTIAL_SECONDARY) { - response_client(READ, request, ERR_INVALID_STATE); + response_client_read(request, ERR_INVALID_STATE); return; } @@ -149,7 +149,7 @@ void replica::on_client_read(task_code code, dsn::message_ex *request) // a small window where the state is not the latest yet last_committed_decree() < _primary_states.last_prepare_decree_on_new_primary) { if (status() != partition_status::PS_PRIMARY) { - response_client(READ, request, ERR_INVALID_STATE); + response_client_read(request, ERR_INVALID_STATE); return; } @@ -158,7 +158,7 @@ void replica::on_client_read(task_code code, dsn::message_ex *request) ") < last_prepare_decree_on_new_primary(%" PRId64 ")", last_committed_decree(), _primary_states.last_prepare_decree_on_new_primary); - response_client(READ, request, ERR_INVALID_STATE); + response_client_read(request, ERR_INVALID_STATE); return; } } @@ -167,9 +167,14 @@ void replica::on_client_read(task_code code, dsn::message_ex *request) _app->on_request(request); } -void replica::response_client(bool is_read, dsn::message_ex *request, error_code error) +void replica::response_client_read(dsn::message_ex *request, error_code error) { - _stub->response_client(get_gpid(), is_read, request, status(), error); + _stub->response_client(get_gpid(), true, request, status(), error); +} + +void replica::response_client_write(dsn::message_ex *request, error_code error) +{ + _stub->response_client(get_gpid(), false, request, status(), error); } // error_code replica::check_and_fix_private_log_completeness() diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 4266a954d8..3c536f01b3 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -68,10 +68,6 @@ class test_checker; class replica : public serverlet, public ref_counter, public replica_base { -public: - static const bool READ = true; - static const bool WRITE = false; - public: ~replica(void); @@ -164,7 +160,8 @@ class replica : public serverlet, public ref_counter, public replica_ba private: // common helpers void init_state(); - void response_client(bool is_read, dsn::message_ex *request, error_code error); + void response_client_read(dsn::message_ex *request, error_code error); + void response_client_write(dsn::message_ex *request, error_code error); void execute_mutation(mutation_ptr &mu); mutation_ptr new_mutation(decree decree); diff --git a/src/dist/replication/lib/replica_2pc.cpp b/src/dist/replication/lib/replica_2pc.cpp index ed49c37db9..336f0012fe 100644 --- a/src/dist/replication/lib/replica_2pc.cpp +++ b/src/dist/replication/lib/replica_2pc.cpp @@ -55,18 +55,18 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign task_spec *spec = task_spec::get(code); if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) { - response_client(WRITE, request, ERR_OPERATION_DISABLED); + response_client_write(request, ERR_OPERATION_DISABLED); return; } if (partition_status::PS_PRIMARY != status()) { - response_client(WRITE, request, ERR_INVALID_STATE); + response_client_write(request, ERR_INVALID_STATE); return; } if (static_cast(_primary_states.membership.secondaries.size()) + 1 < _options->mutation_2pc_min_replica_count) { - response_client(WRITE, request, ERR_NOT_ENOUGH_MEMBER); + response_client_write(request, ERR_NOT_ENOUGH_MEMBER); return; } @@ -88,12 +88,12 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign tasking::enqueue(LPC_WRITE_THROTTLING_DELAY, &_tracker, [ this, req = message_ptr(request) ]() { - response_client(WRITE, req, ERR_BUSY); + response_client_write(req, ERR_BUSY); }, get_gpid().thread_hash(), std::chrono::milliseconds(delay_ms)); } else { - response_client(WRITE, request, ERR_BUSY); + response_client_write(request, ERR_BUSY); } _counter_recent_write_throttling_reject_count->increment(); } @@ -227,7 +227,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation) ErrOut: for (auto &r : mu->client_requests) { - response_client(WRITE, r, err); + response_client_write(r, err); } return; } diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 6562da72de..93babb3e02 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -978,7 +978,7 @@ bool replica::update_local_configuration(const replica_configuration &config, _primary_states.write_queue.clear(queued); for (auto &m : queued) { for (auto &r : m->client_requests) { - response_client(WRITE, r, ERR_NOT_ENOUGH_MEMBER); + response_client_write(r, ERR_NOT_ENOUGH_MEMBER); } } }