Skip to content

Commit

Permalink
replica-server: support table level write throttling (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
qinzuoyan authored Dec 10, 2018
1 parent 9f98a00 commit 62ad5b7
Show file tree
Hide file tree
Showing 13 changed files with 360 additions and 13 deletions.
1 change: 1 addition & 0 deletions include/dsn/c/api_layer1.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ extern DSN_API uint64_t dsn_now_ns();

__inline uint64_t dsn_now_us() { return dsn_now_ns() / 1000; }
__inline uint64_t dsn_now_ms() { return dsn_now_ns() / 1000000; }
__inline uint64_t dsn_now_s() { return dsn_now_ns() / 1000000000; }

/*@}*/

Expand Down
2 changes: 1 addition & 1 deletion include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(RPC_REPLICATION_WRITE_EMPTY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PER_REPLICA_CHECKPOINT_TIMER, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PER_REPLICA_COLLECT_INFO_TIMER, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_MUTATION_PENDING_TIMER, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_WRITE_THROTTLING_DELAY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_GROUP_CHECK, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CM_DISCONNECTED_SCATTER, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_QUERY_NODE_CONFIGURATION_SCATTER, TASK_PRIORITY_HIGH)
Expand Down
2 changes: 1 addition & 1 deletion include/dsn/perf_counter/perf_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class perf_counters : public dsn::utils::singleton<perf_counters>
std::vector<bool> *found);

// this function collects all counters to perf_counter_info which matches
// any of regular expression in args and returns the json representation
// any of the regular expressions in args and returns the json representation
// of perf_counter_info
std::string list_snapshot_by_regexp(const std::vector<std::string> &args);

Expand Down
2 changes: 2 additions & 0 deletions src/core/core/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ void partition_resolver_simple::on_access_failure(int partition_index, error_cod
// provider
&&
err != ERR_OPERATION_DISABLED // operation disabled
&&
err != ERR_BUSY // busy (rpc busy or throttling busy)
) {
ddebug("clear partition configuration cache %d.%d due to access failure %s",
_app_id,
Expand Down
1 change: 1 addition & 0 deletions src/core/perf_counter/perf_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

#include <dsn/tool-api/command_manager.h>
#include <dsn/tool-api/task.h>
#include <dsn/utility/string_view.h>

#include "perf_counter_atomic.h"
#include "builtin_counters.h"
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ const std::string backup_restore_constant::BACKUP_ID("restore.backup_id");
const std::string backup_restore_constant::SKIP_BAD_PARTITION("restore.skip_bad_partition");

const std::string replica_envs::DENY_CLIENT_WRITE("replica.deny_client_write");
const std::string replica_envs::WRITE_THROTTLING("replica.write_throttling");

namespace cold_backup {
std::string get_policy_path(const std::string &root, const std::string &policy_name)
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class replica_envs
{
public:
static const std::string DENY_CLIENT_WRITE;
static const std::string WRITE_THROTTLING;
};

namespace cold_backup {
Expand Down
18 changes: 14 additions & 4 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/rand.h>
#include <dsn/utility/string_conv.h>
#include <dsn/utility/strings.h>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -61,11 +63,18 @@ replica::replica(
init_state();
_config.pid = gpid;

std::stringstream ss;
ss << "private.log.size(MB)"
<< "@" << gpid.get_app_id() << "." << gpid.get_partition_index();
std::string counter_str = fmt::format("private.log.size(MB)@{}", gpid);
_counter_private_log_size.init_app_counter(
"eon.replica", ss.str().c_str(), COUNTER_TYPE_NUMBER, "private log size(MB)");
"eon.replica", counter_str.c_str(), COUNTER_TYPE_NUMBER, counter_str.c_str());

counter_str = fmt::format("recent.write.throttling.delay.count@{}", gpid);
_counter_recent_write_throttling_delay_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str = fmt::format("recent.write.throttling.reject.count@{}", gpid);
_counter_recent_write_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

if (need_restore) {
// add an extra env for restore
_extra_envs.insert(
Expand Down Expand Up @@ -97,6 +106,7 @@ void replica::init_state()
{
_inactive_is_transient = false;
_is_initializing = false;
_deny_client_write = false;
_prepare_list =
new prepare_list(this,
0,
Expand Down
8 changes: 6 additions & 2 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "mutation_log.h"
#include "prepare_list.h"
#include "replica_context.h"
#include "throttling_controller.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -93,7 +94,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
//
// requests from clients
//
void on_client_write(task_code code, dsn::message_ex *request);
void on_client_write(task_code code, dsn::message_ex *request, bool ignore_throttling = false);
void on_client_read(task_code code, dsn::message_ex *request);

//
Expand Down Expand Up @@ -373,10 +374,13 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

bool _inactive_is_transient; // upgrade to P/S is allowed only iff true
bool _is_initializing; // when initializing, switching to primary need to update ballot
volatile bool _deny_client_write = false;
bool _deny_client_write; // if deny all write requests
throttling_controller _write_throttling_controller;

// perf counters
perf_counter_wrapper _counter_private_log_size;
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
perf_counter_wrapper _counter_recent_write_throttling_reject_count;

dsn::task_tracker _tracker;
// the thread access checker
Expand Down
33 changes: 32 additions & 1 deletion src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
namespace dsn {
namespace replication {

void replica::on_client_write(task_code code, dsn::message_ex *request)
void replica::on_client_write(task_code code, dsn::message_ex *request, bool ignore_throttling)
{
_checker.only_one_thread_access();

Expand Down Expand Up @@ -70,6 +70,37 @@ void replica::on_client_write(task_code code, dsn::message_ex *request)
return;
}

if (_write_throttling_controller.enabled() && !ignore_throttling) {
int64_t delay_ms = 0;
auto type = _write_throttling_controller.control(request, delay_ms);
if (type != throttling_controller::PASS) {
if (type == throttling_controller::DELAY) {
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, code, req = message_ptr(request) ]() {
on_client_write(code, req, true);
},
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
_counter_recent_write_throttling_delay_count->increment();
} else { // type == throttling_controller::REJECT
if (delay_ms > 0) {
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() {
response_client_message(false, req, ERR_BUSY);
},
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
} else {
response_client_message(false, request, ERR_BUSY);
}
_counter_recent_write_throttling_reject_count->increment();
}
return;
}
}

dinfo("%s: got write request from %s", name(), request->header->from_address.to_string());
auto mu = _primary_states.write_queue.add_work(code, request, this);
if (mu) {
Expand Down
42 changes: 38 additions & 4 deletions src/dist/replication/lib/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,48 @@ bool replica::update_app_envs(const std::map<std::string, std::string> &envs)

void replica::update_app_envs_internal(const std::map<std::string, std::string> &envs)
{
// DENY_CLIENT_WRITE
bool deny_client_write = false;
auto find = envs.find(replica_envs::DENY_CLIENT_WRITE);
if (find != envs.end() && buf2bool(find->second, deny_client_write)) {
if (deny_client_write != _deny_client_write) {
_deny_client_write = deny_client_write;
ddebug_replica("switch _deny_client_write to {}", _deny_client_write);
if (find != envs.end()) {
if (!buf2bool(find->second, deny_client_write)) {
dwarn_replica(
"invalid value of env {}: \"{}\"", replica_envs::DENY_CLIENT_WRITE, find->second);
}
}
if (deny_client_write != _deny_client_write) {
ddebug_replica(
"switch _deny_client_write from {} to {}", _deny_client_write, deny_client_write);
_deny_client_write = deny_client_write;
}

// WRITE_THROTTLING
bool throttling_changed = false;
std::string old_throttling;
std::string parse_error;
find = envs.find(replica_envs::WRITE_THROTTLING);
if (find != envs.end()) {
if (!_write_throttling_controller.parse_from_env(find->second,
_app_info.partition_count,
parse_error,
throttling_changed,
old_throttling)) {
dwarn_replica("parse env failed, key = \"{}\", value = \"{}\", error = \"{}\"",
replica_envs::WRITE_THROTTLING,
find->second,
parse_error);
// reset if parse failed
_write_throttling_controller.reset(throttling_changed, old_throttling);
}
} else {
// reset if env not found
_write_throttling_controller.reset(throttling_changed, old_throttling);
}
if (throttling_changed) {
ddebug_replica("switch _write_throttling_controller from \"{}\" to \"{}\"",
old_throttling,
_write_throttling_controller.env_value());
}
}

bool replica::query_app_envs(/*out*/ std::map<std::string, std::string> &envs)
Expand Down
Loading

0 comments on commit 62ad5b7

Please sign in to comment.