diff --git a/include/dsn/dist/replication/replica_envs.h b/include/dsn/dist/replication/replica_envs.h index 3b195fa3e0..a1b8e4afef 100644 --- a/include/dsn/dist/replication/replica_envs.h +++ b/include/dsn/dist/replication/replica_envs.h @@ -55,6 +55,8 @@ class replica_envs static const std::string MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION; static const std::string BUSINESS_INFO; static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS; + static const std::string READ_QPS_THROTTLING; + static const std::string READ_SIZE_THROTTLING; }; } // namespace replication diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 39c378b7e4..554424091a 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -123,7 +123,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(RPC_REPLICATION_WRITE_EMPTY, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_PER_REPLICA_CHECKPOINT_TIMER, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_PER_REPLICA_COLLECT_INFO_TIMER, TASK_PRIORITY_COMMON) -MAKE_EVENT_CODE(LPC_WRITE_THROTTLING_DELAY, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE(LPC_write_THROTTLING_DELAY, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_GROUP_CHECK, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_CM_DISCONNECTED_SCATTER, TASK_PRIORITY_HIGH) MAKE_EVENT_CODE(LPC_QUERY_NODE_CONFIGURATION_SCATTER, TASK_PRIORITY_HIGH) @@ -167,6 +167,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH) // THREAD_POOL_LOCAL_APP #define CURRENT_THREAD_POOL THREAD_POOL_LOCAL_APP MAKE_EVENT_CODE(LPC_WRITE, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE(LPC_read_THROTTLING_DELAY, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL // THREAD_POOL_REPLICATION_LONG diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp index 298a2e6c91..e711046f6d 100644 --- a/src/common/replication_common.cpp +++ b/src/common/replication_common.cpp @@ -626,6 +626,8 @@ const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS( const std::string replica_envs::BUSINESS_INFO("business.info"); const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS( "replica_access_controller.allowed_users"); +const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling"); +const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size"); const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info"); const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10; diff --git a/src/meta/app_env_validator.cpp b/src/meta/app_env_validator.cpp index 6fea80937c..c7021c6fff 100644 --- a/src/meta/app_env_validator.cpp +++ b/src/meta/app_env_validator.cpp @@ -63,7 +63,7 @@ bool check_rocksdb_iteration(const std::string &env_value, std::string &hint_mes return true; } -bool check_write_throttling(const std::string &env_value, std::string &hint_message) +bool check_throttling(const std::string &env_value, std::string &hint_message) { std::vector sargs; utils::split_args(env_value.c_str(), sargs, ','); @@ -148,9 +148,9 @@ void app_env_validator::register_all_validators() {replica_envs::SLOW_QUERY_THRESHOLD, std::bind(&check_slow_query, std::placeholders::_1, std::placeholders::_2)}, {replica_envs::WRITE_QPS_THROTTLING, - std::bind(&check_write_throttling, std::placeholders::_1, std::placeholders::_2)}, + std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)}, {replica_envs::WRITE_SIZE_THROTTLING, - std::bind(&check_write_throttling, std::placeholders::_1, std::placeholders::_2)}, + std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)}, {replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS, std::bind(&check_rocksdb_iteration, std::placeholders::_1, std::placeholders::_2)}, // TODO(zhaoliwei): not implemented @@ -168,7 +168,11 @@ void app_env_validator::register_all_validators() {replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, nullptr}, {replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr}, {replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr}, - {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr}}; + {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr}, + {replica_envs::READ_QPS_THROTTLING, + std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)}, + {replica_envs::READ_SIZE_THROTTLING, + std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)}}; } } // namespace replication diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index dd41e1de71..9f8b27966d 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -90,6 +90,14 @@ replica::replica( _counter_recent_write_throttling_reject_count.init_app_counter( "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); + counter_str = fmt::format("recent.read.throttling.delay.count@{}", gpid); + _counter_recent_read_throttling_delay_count.init_app_counter( + "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); + + counter_str = fmt::format("recent.read.throttling.reject.count@{}", gpid); + _counter_recent_read_throttling_reject_count.init_app_counter( + "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); + counter_str = fmt::format("dup.disabled_non_idempotent_write_count@{}", _app_info.app_name); _counter_dup_disabled_non_idempotent_write_count.init_app_counter( "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); @@ -162,7 +170,7 @@ replica::~replica(void) dinfo("%s: replica destroyed", name()); } -void replica::on_client_read(dsn::message_ex *request) +void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling) { if (!_access_controller->allowed(request)) { response_client_read(request, ERR_ACL_DENY); @@ -174,6 +182,10 @@ void replica::on_client_read(dsn::message_ex *request) return; } + if (!ignore_throttling && throttle_read_request(request)) { + return; + } + if (!request->is_backup_request()) { // only backup request is allowed to read from a stale replica diff --git a/src/replica/replica.h b/src/replica/replica.h index c86f4e68f0..61526a7df2 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -119,24 +119,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // requests from clients // void on_client_write(message_ex *request, bool ignore_throttling = false); - void on_client_read(message_ex *request); - - // - // Throttling - // - - /// throttle write requests - /// \return true if request is throttled. - /// \see replica::on_client_write - bool throttle_request(throttling_controller &c, message_ex *request, int32_t req_units); - /// update throttling controllers - /// \see replica::update_app_envs - void update_throttle_envs(const std::map &envs); - void update_throttle_env_internal(const std::map &envs, - const std::string &key, - throttling_controller &cntl); - // update allowed users for access controller - void update_ac_allowed_users(const std::map &envs); + void on_client_read(message_ex *request, bool ignore_throttling = false); // // messages and tools from/for meta server @@ -414,6 +397,23 @@ class replica : public serverlet, public ref_counter, public replica_ba uint32_t query_data_version() const; + // + // Throttling + // + + /// return true if request is throttled. + bool throttle_write_request(message_ex *request); + bool throttle_read_request(message_ex *request); + /// update throttling controllers + /// \see replica::update_app_envs + void update_throttle_envs(const std::map &envs); + void update_throttle_env_internal(const std::map &envs, + const std::string &key, + throttling_controller &cntl); + + // update allowed users for access controller + void update_ac_allowed_users(const std::map &envs); + private: friend class ::dsn::replication::test::test_checker; friend class ::dsn::replication::mutation_queue; @@ -501,6 +501,8 @@ class replica : public serverlet, public ref_counter, public replica_ba bool _deny_client_write; // if deny all write requests throttling_controller _write_qps_throttling_controller; // throttling by requests-per-second throttling_controller _write_size_throttling_controller; // throttling by bytes-per-second + throttling_controller _read_qps_throttling_controller; + throttling_controller _read_size_throttling_controller; // duplication std::unique_ptr _duplication_mgr; @@ -525,6 +527,8 @@ class replica : public serverlet, public ref_counter, public replica_ba perf_counter_wrapper _counter_private_log_size; perf_counter_wrapper _counter_recent_write_throttling_delay_count; perf_counter_wrapper _counter_recent_write_throttling_reject_count; + perf_counter_wrapper _counter_recent_read_throttling_delay_count; + perf_counter_wrapper _counter_recent_read_throttling_reject_count; std::vector _counters_table_level_latency; perf_counter_wrapper _counter_dup_disabled_non_idempotent_write_count; perf_counter_wrapper _counter_backup_request_qps; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index b4c3eb4b04..5670300c44 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -120,13 +120,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } - if (!ignore_throttling) { - if (throttle_request(_write_qps_throttling_controller, request, 1)) { - return; - } - if (throttle_request(_write_size_throttling_controller, request, request->body_size())) { - return; - } + if (!ignore_throttling && throttle_write_request(request)) { + return; } dinfo("%s: got write request from %s", name(), request->header->from_address.to_string()); diff --git a/src/replica/replica_throttle.cpp b/src/replica/replica_throttle.cpp index 47fb3aaca4..6bb5f506eb 100644 --- a/src/replica/replica_throttle.cpp +++ b/src/replica/replica_throttle.cpp @@ -14,40 +14,49 @@ namespace dsn { namespace replication { -bool replica::throttle_request(throttling_controller &controller, - message_ex *request, - int32_t request_units) +#define THROTTLE_REQUEST(op_type, throttling_type, request, request_units) \ + do { \ + int64_t delay_ms = 0; \ + auto type = _##op_type##_##throttling_type##_throttling_controller.control( \ + request->header->client.timeout_ms, request_units, delay_ms); \ + if (type != throttling_controller::PASS) { \ + if (type == throttling_controller::DELAY) { \ + tasking::enqueue( \ + LPC_##op_type##_THROTTLING_DELAY, \ + &_tracker, \ + [ this, req = message_ptr(request) ]() { on_client_##op_type(req, true); }, \ + get_gpid().thread_hash(), \ + std::chrono::milliseconds(delay_ms)); \ + _counter_recent_##op_type##_throttling_delay_count->increment(); \ + } else { /** type == throttling_controller::REJECT **/ \ + if (delay_ms > 0) { \ + tasking::enqueue(LPC_##op_type##_THROTTLING_DELAY, \ + &_tracker, \ + [ this, req = message_ptr(request) ]() { \ + response_client_##op_type(req, ERR_BUSY); \ + }, \ + get_gpid().thread_hash(), \ + std::chrono::milliseconds(delay_ms)); \ + } else { \ + response_client_##op_type(request, ERR_BUSY); \ + } \ + _counter_recent_##op_type##_throttling_reject_count->increment(); \ + } \ + return true; \ + } \ + } while (0) + +bool replica::throttle_write_request(message_ex *request) { - if (!controller.enabled()) { - return false; - } + THROTTLE_REQUEST(write, qps, request, 1); + THROTTLE_REQUEST(write, size, request, request->body_size()); + return false; +} - int64_t delay_ms = 0; - auto type = controller.control(request->header->client.timeout_ms, request_units, delay_ms); - if (type != throttling_controller::PASS) { - if (type == throttling_controller::DELAY) { - tasking::enqueue(LPC_WRITE_THROTTLING_DELAY, - &_tracker, - [ this, req = message_ptr(request) ]() { on_client_write(req, true); }, - get_gpid().thread_hash(), - std::chrono::milliseconds(delay_ms)); - _counter_recent_write_throttling_delay_count->increment(); - } else { // type == throttling_controller::REJECT - if (delay_ms > 0) { - tasking::enqueue(LPC_WRITE_THROTTLING_DELAY, - &_tracker, - [ this, req = message_ptr(request) ]() { - response_client_write(req, ERR_BUSY); - }, - get_gpid().thread_hash(), - std::chrono::milliseconds(delay_ms)); - } else { - response_client_write(request, ERR_BUSY); - } - _counter_recent_write_throttling_reject_count->increment(); - } - return true; - } +bool replica::throttle_read_request(message_ex *request) +{ + THROTTLE_REQUEST(read, qps, request, 1); + THROTTLE_REQUEST(read, size, request, request->body_size()); return false; } @@ -57,6 +66,10 @@ void replica::update_throttle_envs(const std::map &env envs, replica_envs::WRITE_QPS_THROTTLING, _write_qps_throttling_controller); update_throttle_env_internal( envs, replica_envs::WRITE_SIZE_THROTTLING, _write_size_throttling_controller); + update_throttle_env_internal( + envs, replica_envs::READ_QPS_THROTTLING, _read_qps_throttling_controller); + update_throttle_env_internal( + envs, replica_envs::READ_SIZE_THROTTLING, _read_size_throttling_controller); } void replica::update_throttle_env_internal(const std::map &envs, diff --git a/src/utils/throttling_controller.cpp b/src/utils/throttling_controller.cpp index 7015bf9e20..df0e32852a 100644 --- a/src/utils/throttling_controller.cpp +++ b/src/utils/throttling_controller.cpp @@ -152,6 +152,11 @@ void throttling_controller::reset(bool &changed, std::string &old_env_value) throttling_controller::throttling_type throttling_controller::control( const int64_t client_timeout_ms, int32_t request_units, int64_t &delay_ms) { + // return PASS if throttling controller is not enabled + if (!_enabled) { + return PASS; + } + int64_t now_s = dsn_now_s(); if (now_s != _last_request_time) { _cur_units = 0; diff --git a/src/utils/throttling_controller.h b/src/utils/throttling_controller.h index 77799c45fa..a740621f01 100644 --- a/src/utils/throttling_controller.h +++ b/src/utils/throttling_controller.h @@ -72,9 +72,6 @@ class throttling_controller // reset to no throttling. void reset(/*out*/ bool &changed, /*out*/ std::string &old_env_value); - // if throttling is enabled. - bool enabled() const { return _enabled; } - // return the current env value. const std::string &env_value() const { return _env_value; }