Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

replica-server: add counters of recent_read_fail_count and recent_wri… #218

Merged
merged 2 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 10 additions & 28 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace replication {
replica::replica(
replica_stub *stub, gpid gpid, const app_info &app, const char *dir, bool need_restore)
: serverlet<replica>("replica"),
replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address.to_string())),
replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str)),
_app_info(app),
_primary_states(
gpid, stub->options().staleness_for_commit, stub->options().batch_write_disabled),
Expand Down Expand Up @@ -140,8 +140,7 @@ void replica::on_client_read(task_code code, dsn::message_ex *request)
{
if (status() == partition_status::PS_INACTIVE ||
status() == partition_status::PS_POTENTIAL_SECONDARY) {
derror("%s: invalid status: partition_status=%s", name(), enum_to_string(status()));
response_client_message(true, request, ERR_INVALID_STATE);
response_client(READ, request, ERR_INVALID_STATE);
return;
}

Expand All @@ -150,18 +149,16 @@ void replica::on_client_read(task_code code, dsn::message_ex *request)
// a small window where the state is not the latest yet
last_committed_decree() < _primary_states.last_prepare_decree_on_new_primary) {
if (status() != partition_status::PS_PRIMARY) {
derror("%s: invalid status: partition_status=%s", name(), enum_to_string(status()));
response_client_message(true, request, ERR_INVALID_STATE);
response_client(READ, request, ERR_INVALID_STATE);
return;
}

if (last_committed_decree() < _primary_states.last_prepare_decree_on_new_primary) {
derror("%s: last_committed_decree(%" PRId64
") < last_prepare_decree_on_new_primary(%" PRId64 ")",
name(),
last_committed_decree(),
_primary_states.last_prepare_decree_on_new_primary);
response_client_message(true, request, ERR_INVALID_STATE);
derror_replica("last_committed_decree(%" PRId64
") < last_prepare_decree_on_new_primary(%" PRId64 ")",
last_committed_decree(),
_primary_states.last_prepare_decree_on_new_primary);
response_client(READ, request, ERR_INVALID_STATE);
return;
}
}
Expand All @@ -170,24 +167,9 @@ void replica::on_client_read(task_code code, dsn::message_ex *request)
_app->on_request(request);
}

void replica::response_client_message(bool is_read, dsn::message_ex *request, error_code error)
void replica::response_client(bool is_read, dsn::message_ex *request, error_code error)
{
if (nullptr == request) {
return;
}

dsn_log_level_t level = LOG_LEVEL_INFORMATION;
if (_stub->_verbose_client_log && error != ERR_OK) {
level = LOG_LEVEL_ERROR;
}
dlog(level,
"%s: reply client %s to %s, err = %s",
name(),
is_read ? "read" : "write",
request->header->from_address.to_string(),
error.to_string());

dsn_rpc_reply(request->create_response(), error);
_stub->response_client(get_gpid(), is_read, request, status(), error);
}

// error_code replica::check_and_fix_private_log_completeness()
Expand Down
6 changes: 5 additions & 1 deletion src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class test_checker;

class replica : public serverlet<replica>, public ref_counter, public replica_base
{
public:
static const bool READ = true;
static const bool WRITE = false;

public:
~replica(void);

Expand Down Expand Up @@ -160,7 +164,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
private:
// common helpers
void init_state();
void response_client_message(bool is_read, dsn::message_ex *request, error_code error);
void response_client(bool is_read, dsn::message_ex *request, error_code error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

定义两个函数 respond_client_for_read / respond_client_for_write 更直观,内部调用 respond_client

void execute_mutation(mutation_ptr &mu);
mutation_ptr new_mutation(decree decree);

Expand Down
12 changes: 6 additions & 6 deletions src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign

task_spec *spec = task_spec::get(code);
if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) {
response_client_message(false, request, ERR_OPERATION_DISABLED);
response_client(WRITE, request, ERR_OPERATION_DISABLED);
return;
}

if (partition_status::PS_PRIMARY != status()) {
response_client_message(false, request, ERR_INVALID_STATE);
response_client(WRITE, request, ERR_INVALID_STATE);
return;
}

if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_options->mutation_2pc_min_replica_count) {
response_client_message(false, request, ERR_NOT_ENOUGH_MEMBER);
response_client(WRITE, request, ERR_NOT_ENOUGH_MEMBER);
return;
}

Expand All @@ -88,12 +88,12 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() {
response_client_message(false, req, ERR_BUSY);
response_client(WRITE, req, ERR_BUSY);
},
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
} else {
response_client_message(false, request, ERR_BUSY);
response_client(WRITE, request, ERR_BUSY);
}
_counter_recent_write_throttling_reject_count->increment();
}
Expand Down Expand Up @@ -227,7 +227,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation)

ErrOut:
for (auto &r : mu->client_requests) {
response_client_message(false, r, err);
response_client(WRITE, r, err);
}
return;
}
Expand Down
4 changes: 2 additions & 2 deletions src/dist/replication/lib/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void replica::assign_primary(configuration_update_request &proposal)
dassert(proposal.node == _stub->_primary_address,
"%s VS %s",
proposal.node.to_string(),
_stub->_primary_address.to_string());
_stub->_primary_address_str);

if (status() == partition_status::PS_PRIMARY) {
dwarn("%s: invalid assgin primary proposal as the node is in %s",
Expand Down Expand Up @@ -978,7 +978,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
_primary_states.write_queue.clear(queued);
for (auto &m : queued) {
for (auto &r : m->client_requests) {
response_client_message(false, r, ERR_NOT_ENOUGH_MEMBER);
response_client(WRITE, r, ERR_NOT_ENOUGH_MEMBER);
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/dist/replication/lib/replica_failover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ void replica::handle_remote_failure(partition_status::type st,
dassert(status() == partition_status::PS_PRIMARY,
"invalid partition_status, status = %s",
enum_to_string(status()));
dassert(node != _stub->_primary_address,
"%s VS %s",
node.to_string(),
_stub->_primary_address.to_string());
dassert(
node != _stub->_primary_address, "%s VS %s", node.to_string(), _stub->_primary_address_str);

switch (st) {
case partition_status::PS_SECONDARY:
Expand Down
95 changes: 61 additions & 34 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_failure_detector = nullptr;
_state = NS_Disconnected;
_log = nullptr;
_primary_address_str[0] = '\0';
install_perf_counters();
}

Expand Down Expand Up @@ -266,6 +267,15 @@ void replica_stub::install_perf_counters()
"cold.backup.max.upload.file.size",
COUNTER_TYPE_NUMBER,
"current cold backup max upload file size");

_counter_recent_read_fail_count.init_app_counter("eon.replica_stub",
"recent.read.fail.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"read fail count in the recent period");
_counter_recent_write_fail_count.init_app_counter("eon.replica_stub",
"recent.write.fail.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"write fail count in the recent period");
}

void replica_stub::initialize(bool clear /* = false*/)
Expand All @@ -278,7 +288,8 @@ void replica_stub::initialize(bool clear /* = false*/)
void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/)
{
_primary_address = dsn_primary_address();
ddebug("primary_address = %s", _primary_address.to_string());
strcpy(_primary_address_str, _primary_address.to_string());
ddebug("primary_address = %s", _primary_address_str);

set_options(opts);
std::ostringstream oss;
Expand Down Expand Up @@ -691,11 +702,19 @@ void replica_stub::on_client_write(gpid id, dsn::message_ex *request)
// ignore and do not reply
return;
}
if (_verbose_client_log && request) {
ddebug("%s@%s: client = %s, code = %s, timeout = %d",
id.to_string(),
_primary_address_str,
request->header->from_address.to_string(),
request->header->rpc_name,
request->header->client.timeout_ms);
}
replica_ptr rep = get_replica(id);
if (rep != nullptr) {
rep->on_client_write(request->rpc_code(), request);
} else {
response_client_error(id, false, request, ERR_OBJECT_NOT_FOUND);
response_client(id, false, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND);
}
}

Expand All @@ -705,11 +724,19 @@ void replica_stub::on_client_read(gpid id, dsn::message_ex *request)
// ignore and do not reply
return;
}
if (_verbose_client_log && request) {
ddebug("%s@%s: client = %s, code = %s, timeout = %d",
id.to_string(),
_primary_address_str,
request->header->from_address.to_string(),
request->header->rpc_name,
request->header->client.timeout_ms);
}
replica_ptr rep = get_replica(id);
if (rep != nullptr) {
rep->on_client_read(request->rpc_code(), request);
} else {
response_client_error(id, true, request, ERR_OBJECT_NOT_FOUND);
response_client(id, true, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND);
}
}

Expand All @@ -718,15 +745,15 @@ void replica_stub::on_config_proposal(const configuration_update_request &propos
if (!is_connected()) {
dwarn("%s@%s: received config proposal %s for %s: not connected, ignore",
proposal.config.pid.to_string(),
_primary_address.to_string(),
_primary_address_str,
enum_to_string(proposal.type),
proposal.node.to_string());
return;
}

ddebug("%s@%s: received config proposal %s for %s",
proposal.config.pid.to_string(),
_primary_address.to_string(),
_primary_address_str,
enum_to_string(proposal.type),
proposal.node.to_string());

Expand Down Expand Up @@ -884,14 +911,14 @@ void replica_stub::on_group_check(const group_check_request &request,
if (!is_connected()) {
dwarn("%s@%s: received group check: not connected, ignore",
request.config.pid.to_string(),
_primary_address.to_string());
_primary_address_str);
return;
}

ddebug("%s@%s: received group check, primary = %s, ballot = %" PRId64
", status = %s, last_committed_decree = %" PRId64,
request.config.pid.to_string(),
_primary_address.to_string(),
_primary_address_str,
request.config.primary.to_string(),
request.config.ballot,
enum_to_string(request.config.status),
Expand Down Expand Up @@ -958,15 +985,15 @@ void replica_stub::on_add_learner(const group_check_request &request)
if (!is_connected()) {
dwarn("%s@%s: received add learner: not connected, ignore",
request.config.pid.to_string(),
_primary_address.to_string(),
_primary_address_str,
request.config.primary.to_string());
return;
}

ddebug("%s@%s: received add learner, primary = %s, ballot = %" PRId64
", status = %s, last_committed_decree = %" PRId64,
request.config.pid.to_string(),
_primary_address.to_string(),
_primary_address_str,
request.config.primary.to_string(),
request.config.ballot,
enum_to_string(request.config.status),
Expand Down Expand Up @@ -1202,12 +1229,12 @@ void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_,
ddebug("%s@%s: replica not exists on replica server, which is primary, remove it "
"from meta server",
req.config.pid.to_string(),
_primary_address.to_string());
_primary_address_str);
remove_replica_on_meta_server(req.info, req.config);
} else {
ddebug("%s@%s: replica not exists on replica server, which is not primary, just ignore",
req.config.pid.to_string(),
_primary_address.to_string());
_primary_address_str);
}
}
}
Expand Down Expand Up @@ -1299,31 +1326,31 @@ void replica_stub::on_meta_server_disconnected_scatter(replica_stub_ptr this_, g
}
}

void replica_stub::response_client_error(gpid id,
bool is_read,
dsn::message_ex *request,
error_code error)
void replica_stub::response_client(gpid id,
bool is_read,
dsn::message_ex *request,
partition_status::type status,
error_code error)
{
if (nullptr == request) {
return;
}

if (error == ERR_OK) {
dinfo("%s@%s: reply client %s to %s, err = %s",
id.to_string(),
_primary_address.to_string(),
is_read ? "read" : "write",
request->header->from_address.to_string(),
error.to_string());
} else {
derror("%s@%s: reply client %s to %s, err = %s",
if (error != ERR_OK) {
if (is_read)
_counter_recent_read_fail_count->increment();
else
_counter_recent_write_fail_count->increment();
derror("%s@%s: %s fail: client = %s, code = %s, timeout = %d, status = %s, error = %s",
id.to_string(),
_primary_address.to_string(),
_primary_address_str,
is_read ? "read" : "write",
request->header->from_address.to_string(),
request == nullptr ? "null" : request->header->from_address.to_string(),
request == nullptr ? "null" : request->header->rpc_name,
request == nullptr ? 0 : request->header->client.timeout_ms,
enum_to_string(status),
error.to_string());
}
dsn_rpc_reply(request->create_response(), error);

if (request != nullptr) {
dsn_rpc_reply(request->create_response(), error);
}
}

void replica_stub::init_gc_for_test()
Expand Down Expand Up @@ -1701,7 +1728,7 @@ void replica_stub::open_replica(const app_info &app,
// process below
ddebug("%s@%s: start to load replica %s group check, dir = %s",
id.to_string(),
_primary_address.to_string(),
_primary_address_str,
req ? "with" : "without",
dir.c_str());
rep = replica::load(this, dir.c_str());
Expand Down Expand Up @@ -1733,7 +1760,7 @@ void replica_stub::open_replica(const app_info &app,
if (rep == nullptr) {
ddebug("%s@%s: open replica failed, erase from opening replicas",
id.to_string(),
_primary_address.to_string());
_primary_address_str);
zauto_write_lock l(_replicas_lock);
auto ret = _opening_replicas.erase(id);
dassert(ret > 0, "replica %s is not in _opening_replicas", id.to_string());
Expand Down Expand Up @@ -2084,7 +2111,7 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &args,
std::stringstream query_state;
query_state << processed << " processed, " << not_found << " not found";
for (auto &kv : results) {
query_state << "\n " << kv.first.to_string() << "@" << _primary_address.to_string();
query_state << "\n " << kv.first.to_string() << "@" << _primary_address_str;
if (kv.second.first != partition_status::PS_INVALID)
query_state << "@" << (kv.second.first == partition_status::PS_PRIMARY ? "P" : "S");
query_state << " : " << kv.second.second;
Expand Down
Loading