Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add throttling for read size and read qps #728

Merged
merged 17 commits into from
Jan 26, 2021
2 changes: 2 additions & 0 deletions include/dsn/dist/replication/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class replica_envs
static const std::string MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION;
static const std::string BUSINESS_INFO;
static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS;
static const std::string READ_QPS_THROTTLING;
static const std::string READ_SIZE_THROTTLING;
};

} // namespace replication
Expand Down
3 changes: 2 additions & 1 deletion include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(RPC_REPLICATION_WRITE_EMPTY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PER_REPLICA_CHECKPOINT_TIMER, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PER_REPLICA_COLLECT_INFO_TIMER, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_WRITE_THROTTLING_DELAY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_write_THROTTLING_DELAY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_GROUP_CHECK, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CM_DISCONNECTED_SCATTER, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_QUERY_NODE_CONFIGURATION_SCATTER, TASK_PRIORITY_HIGH)
Expand Down Expand Up @@ -167,6 +167,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH)
// THREAD_POOL_LOCAL_APP
#define CURRENT_THREAD_POOL THREAD_POOL_LOCAL_APP
MAKE_EVENT_CODE(LPC_WRITE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_read_THROTTLING_DELAY, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

// THREAD_POOL_REPLICATION_LONG
Expand Down
2 changes: 2 additions & 0 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,8 @@ const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS(
const std::string replica_envs::BUSINESS_INFO("business.info");
const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS(
"replica_access_controller.allowed_users");
const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling");
const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size");

const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info");
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
Expand Down
12 changes: 8 additions & 4 deletions src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ bool check_rocksdb_iteration(const std::string &env_value, std::string &hint_mes
return true;
}

bool check_write_throttling(const std::string &env_value, std::string &hint_message)
bool check_throttling(const std::string &env_value, std::string &hint_message)
{
std::vector<std::string> sargs;
utils::split_args(env_value.c_str(), sargs, ',');
Expand Down Expand Up @@ -148,9 +148,9 @@ void app_env_validator::register_all_validators()
{replica_envs::SLOW_QUERY_THRESHOLD,
std::bind(&check_slow_query, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::WRITE_QPS_THROTTLING,
std::bind(&check_write_throttling, std::placeholders::_1, std::placeholders::_2)},
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::WRITE_SIZE_THROTTLING,
std::bind(&check_write_throttling, std::placeholders::_1, std::placeholders::_2)},
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS,
std::bind(&check_rocksdb_iteration, std::placeholders::_1, std::placeholders::_2)},
// TODO(zhaoliwei): not implemented
Expand All @@ -168,7 +168,11 @@ void app_env_validator::register_all_validators()
{replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, nullptr},
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr}};
{replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
{replica_envs::READ_QPS_THROTTLING,
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::READ_SIZE_THROTTLING,
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)}};
}

} // namespace replication
Expand Down
14 changes: 13 additions & 1 deletion src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ replica::replica(
_counter_recent_write_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str = fmt::format("recent.read.throttling.delay.count@{}", gpid);
_counter_recent_read_throttling_delay_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str = fmt::format("recent.read.throttling.reject.count@{}", gpid);
_counter_recent_read_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str = fmt::format("dup.disabled_non_idempotent_write_count@{}", _app_info.app_name);
_counter_dup_disabled_non_idempotent_write_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());
Expand Down Expand Up @@ -162,7 +170,7 @@ replica::~replica(void)
dinfo("%s: replica destroyed", name());
}

void replica::on_client_read(dsn::message_ex *request)
void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
{
if (!_access_controller->allowed(request)) {
response_client_read(request, ERR_ACL_DENY);
Expand All @@ -174,6 +182,10 @@ void replica::on_client_read(dsn::message_ex *request)
return;
}

if (!ignore_throttling && throttle_read_request(request)) {
return;
}

if (!request->is_backup_request()) {
// only backup request is allowed to read from a stale replica

Expand Down
40 changes: 22 additions & 18 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// requests from clients
//
void on_client_write(message_ex *request, bool ignore_throttling = false);
void on_client_read(message_ex *request);

//
// Throttling
//

/// throttle write requests
/// \return true if request is throttled.
/// \see replica::on_client_write
bool throttle_request(throttling_controller &c, message_ex *request, int32_t req_units);
/// update throttling controllers
/// \see replica::update_app_envs
void update_throttle_envs(const std::map<std::string, std::string> &envs);
void update_throttle_env_internal(const std::map<std::string, std::string> &envs,
const std::string &key,
throttling_controller &cntl);
// update allowed users for access controller
void update_ac_allowed_users(const std::map<std::string, std::string> &envs);
void on_client_read(message_ex *request, bool ignore_throttling = false);

//
// messages and tools from/for meta server
Expand Down Expand Up @@ -414,6 +397,23 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

uint32_t query_data_version() const;

//
// Throttling
//

/// return true if request is throttled.
bool throttle_write_request(message_ex *request);
bool throttle_read_request(message_ex *request);
/// update throttling controllers
/// \see replica::update_app_envs
void update_throttle_envs(const std::map<std::string, std::string> &envs);
void update_throttle_env_internal(const std::map<std::string, std::string> &envs,
const std::string &key,
throttling_controller &cntl);

// update allowed users for access controller
void update_ac_allowed_users(const std::map<std::string, std::string> &envs);

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand Down Expand Up @@ -501,6 +501,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
bool _deny_client_write; // if deny all write requests
throttling_controller _write_qps_throttling_controller; // throttling by requests-per-second
throttling_controller _write_size_throttling_controller; // throttling by bytes-per-second
throttling_controller _read_qps_throttling_controller;
throttling_controller _read_size_throttling_controller;

// duplication
std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
Expand All @@ -525,6 +527,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
perf_counter_wrapper _counter_private_log_size;
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
perf_counter_wrapper _counter_recent_write_throttling_reject_count;
perf_counter_wrapper _counter_recent_read_throttling_delay_count;
perf_counter_wrapper _counter_recent_read_throttling_reject_count;
std::vector<perf_counter *> _counters_table_level_latency;
perf_counter_wrapper _counter_dup_disabled_non_idempotent_write_count;
perf_counter_wrapper _counter_backup_request_qps;
Expand Down
9 changes: 2 additions & 7 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (!ignore_throttling) {
if (throttle_request(_write_qps_throttling_controller, request, 1)) {
return;
}
if (throttle_request(_write_size_throttling_controller, request, request->body_size())) {
return;
}
if (!ignore_throttling && throttle_write_request(request)) {
return;
}

dinfo("%s: got write request from %s", name(), request->header->from_address.to_string());
Expand Down
77 changes: 45 additions & 32 deletions src/replica/replica_throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,49 @@
namespace dsn {
namespace replication {

bool replica::throttle_request(throttling_controller &controller,
message_ex *request,
int32_t request_units)
#define THROTTLE_REQUEST(op_type, throttling_type, request, request_units) \
do { \
int64_t delay_ms = 0; \
auto type = _##op_type##_##throttling_type##_throttling_controller.control( \
request->header->client.timeout_ms, request_units, delay_ms); \
if (type != throttling_controller::PASS) { \
if (type == throttling_controller::DELAY) { \
tasking::enqueue( \
LPC_##op_type##_THROTTLING_DELAY, \
&_tracker, \
[ this, req = message_ptr(request) ]() { on_client_##op_type(req, true); }, \
get_gpid().thread_hash(), \
std::chrono::milliseconds(delay_ms)); \
_counter_recent_##op_type##_throttling_delay_count->increment(); \
} else { /** type == throttling_controller::REJECT **/ \
if (delay_ms > 0) { \
tasking::enqueue(LPC_##op_type##_THROTTLING_DELAY, \
&_tracker, \
[ this, req = message_ptr(request) ]() { \
response_client_##op_type(req, ERR_BUSY); \
}, \
get_gpid().thread_hash(), \
std::chrono::milliseconds(delay_ms)); \
} else { \
response_client_##op_type(request, ERR_BUSY); \
hycdong marked this conversation as resolved.
Show resolved Hide resolved
} \
_counter_recent_##op_type##_throttling_reject_count->increment(); \
} \
return true; \
} \
} while (0)

bool replica::throttle_write_request(message_ex *request)
{
if (!controller.enabled()) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
THROTTLE_REQUEST(write, qps, request, 1);
THROTTLE_REQUEST(write, size, request, request->body_size());
return false;
}

int64_t delay_ms = 0;
auto type = controller.control(request->header->client.timeout_ms, request_units, delay_ms);
if (type != throttling_controller::PASS) {
if (type == throttling_controller::DELAY) {
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() { on_client_write(req, true); },
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
_counter_recent_write_throttling_delay_count->increment();
} else { // type == throttling_controller::REJECT
if (delay_ms > 0) {
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() {
response_client_write(req, ERR_BUSY);
},
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
} else {
response_client_write(request, ERR_BUSY);
}
_counter_recent_write_throttling_reject_count->increment();
}
return true;
}
bool replica::throttle_read_request(message_ex *request)
{
THROTTLE_REQUEST(read, qps, request, 1);
THROTTLE_REQUEST(read, size, request, request->body_size());
return false;
}

Expand All @@ -57,6 +66,10 @@ void replica::update_throttle_envs(const std::map<std::string, std::string> &env
envs, replica_envs::WRITE_QPS_THROTTLING, _write_qps_throttling_controller);
update_throttle_env_internal(
envs, replica_envs::WRITE_SIZE_THROTTLING, _write_size_throttling_controller);
update_throttle_env_internal(
envs, replica_envs::READ_QPS_THROTTLING, _read_qps_throttling_controller);
update_throttle_env_internal(
envs, replica_envs::READ_SIZE_THROTTLING, _read_size_throttling_controller);
}

void replica::update_throttle_env_internal(const std::map<std::string, std::string> &envs,
Expand Down
5 changes: 5 additions & 0 deletions src/utils/throttling_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ void throttling_controller::reset(bool &changed, std::string &old_env_value)
throttling_controller::throttling_type throttling_controller::control(
const int64_t client_timeout_ms, int32_t request_units, int64_t &delay_ms)
{
// return PASS if throttling controller is not enabled
if (!_enabled) {
return PASS;
}

int64_t now_s = dsn_now_s();
if (now_s != _last_request_time) {
_cur_units = 0;
Expand Down
3 changes: 0 additions & 3 deletions src/utils/throttling_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ class throttling_controller
// reset to no throttling.
void reset(/*out*/ bool &changed, /*out*/ std::string &old_env_value);

// if throttling is enabled.
bool enabled() const { return _enabled; }

// return the current env value.
const std::string &env_value() const { return _env_value; }

Expand Down