diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index 108ff77d50..6e5c46685b 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -590,7 +590,8 @@ const std::string backup_restore_constant::BACKUP_ID("restore.backup_id"); const std::string backup_restore_constant::SKIP_BAD_PARTITION("restore.skip_bad_partition"); const std::string replica_envs::DENY_CLIENT_WRITE("replica.deny_client_write"); -const std::string replica_envs::WRITE_THROTTLING("replica.write_throttling"); +const std::string replica_envs::WRITE_QPS_THROTTLING("replica.write_throttling"); +const std::string replica_envs::WRITE_SIZE_THROTTLING("replica.write_throttling_by_size"); namespace cold_backup { std::string get_policy_path(const std::string &root, const std::string &policy_name) diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 0e5b633a70..1630dcdad2 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -152,7 +152,8 @@ class replica_envs { public: static const std::string DENY_CLIENT_WRITE; - static const std::string WRITE_THROTTLING; + static const std::string WRITE_QPS_THROTTLING; + static const std::string WRITE_SIZE_THROTTLING; }; namespace cold_backup { diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index a22b5af9cf..28b67e52b0 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -136,7 +136,7 @@ replica::~replica(void) dinfo("%s: replica destroyed", name()); } -void replica::on_client_read(task_code code, dsn::message_ex *request) +void replica::on_client_read(dsn::message_ex *request) { if (status() == partition_status::PS_INACTIVE || status() == partition_status::PS_POTENTIAL_SECONDARY) { diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 60d0f87472..88a801c90e 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -99,8 +99,23 @@ class replica : public serverlet, public ref_counter, public replica_ba // // requests from clients // - void on_client_write(task_code code, dsn::message_ex *request, bool ignore_throttling = false); - void on_client_read(task_code code, dsn::message_ex *request); + void on_client_write(message_ex *request, bool ignore_throttling = false); + void on_client_read(message_ex *request); + + // + // Throttling + // + + /// throttle write requests + /// \return true if request is throttled. + /// \see replica::on_client_write + bool throttle_request(throttling_controller &c, message_ex *request, int32_t req_units); + /// update throttling controllers + /// \see replica::update_app_envs + void update_throttle_envs(const std::map &envs); + void update_throttle_env_internal(const std::map &envs, + const std::string &key, + throttling_controller &cntl); // // messages and tools from/for meta server @@ -343,6 +358,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class ::dsn::replication::mutation_queue; friend class ::dsn::replication::replica_stub; friend class mock_replica; + friend class throttling_controller_test; friend class replica_learn_test; friend class replica_duplicator_manager; friend class load_mutation; @@ -416,7 +432,8 @@ class replica : public serverlet, public ref_counter, public replica_ba bool _inactive_is_transient; // upgrade to P/S is allowed only iff true bool _is_initializing; // when initializing, switching to primary need to update ballot bool _deny_client_write; // if deny all write requests - throttling_controller _write_throttling_controller; + throttling_controller _write_qps_throttling_controller; // throttling by requests-per-second + throttling_controller _write_size_throttling_controller; // throttling by bytes-per-second // duplication std::unique_ptr _duplication_mgr; @@ -439,5 +456,5 @@ class replica : public serverlet, public ref_counter, public replica_ba dsn::thread_access_checker _checker; }; typedef dsn::ref_ptr replica_ptr; -} -} // namespace +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_2pc.cpp b/src/dist/replication/lib/replica_2pc.cpp index 336f0012fe..a95ca0bb73 100644 --- a/src/dist/replication/lib/replica_2pc.cpp +++ b/src/dist/replication/lib/replica_2pc.cpp @@ -24,15 +24,6 @@ * THE SOFTWARE. */ -/* - * Description: - * two-phase commit in replication - * - * Revision history: - * Mar., 2015, @imzhenyu (Zhenyu Guo), first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #include "replica.h" #include "mutation.h" #include "mutation_log.h" @@ -42,7 +33,7 @@ namespace dsn { namespace replication { -void replica::on_client_write(task_code code, dsn::message_ex *request, bool ignore_throttling) +void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) { _checker.only_one_thread_access(); @@ -53,7 +44,7 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign return; } - task_spec *spec = task_spec::get(code); + task_spec *spec = task_spec::get(request->rpc_code()); if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) { response_client_write(request, ERR_OPERATION_DISABLED); return; @@ -70,39 +61,17 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign return; } - if (_write_throttling_controller.enabled() && !ignore_throttling) { - int64_t delay_ms = 0; - auto type = _write_throttling_controller.control(request, delay_ms); - if (type != throttling_controller::PASS) { - if (type == throttling_controller::DELAY) { - tasking::enqueue(LPC_WRITE_THROTTLING_DELAY, - &_tracker, - [ this, code, req = message_ptr(request) ]() { - on_client_write(code, req, true); - }, - get_gpid().thread_hash(), - std::chrono::milliseconds(delay_ms)); - _counter_recent_write_throttling_delay_count->increment(); - } else { // type == throttling_controller::REJECT - if (delay_ms > 0) { - tasking::enqueue(LPC_WRITE_THROTTLING_DELAY, - &_tracker, - [ this, req = message_ptr(request) ]() { - response_client_write(req, ERR_BUSY); - }, - get_gpid().thread_hash(), - std::chrono::milliseconds(delay_ms)); - } else { - response_client_write(request, ERR_BUSY); - } - _counter_recent_write_throttling_reject_count->increment(); - } + if (!ignore_throttling) { + if (throttle_request(_write_qps_throttling_controller, request, 1)) { + return; + } + if (throttle_request(_write_size_throttling_controller, request, request->body_size())) { return; } } dinfo("%s: got write request from %s", name(), request->header->from_address.to_string()); - auto mu = _primary_states.write_queue.add_work(code, request, this); + auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this); if (mu) { init_prepare(mu, false); } @@ -696,5 +665,5 @@ void replica::cleanup_preparing_mutations(bool wait) } } } -} -} // namespace +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 25ed005740..df1381daed 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -557,33 +557,7 @@ void replica::update_app_envs_internal(const std::map _deny_client_write = deny_client_write; } - // WRITE_THROTTLING - bool throttling_changed = false; - std::string old_throttling; - std::string parse_error; - find = envs.find(replica_envs::WRITE_THROTTLING); - if (find != envs.end()) { - if (!_write_throttling_controller.parse_from_env(find->second, - _app_info.partition_count, - parse_error, - throttling_changed, - old_throttling)) { - dwarn_replica("parse env failed, key = \"{}\", value = \"{}\", error = \"{}\"", - replica_envs::WRITE_THROTTLING, - find->second, - parse_error); - // reset if parse failed - _write_throttling_controller.reset(throttling_changed, old_throttling); - } - } else { - // reset if env not found - _write_throttling_controller.reset(throttling_changed, old_throttling); - } - if (throttling_changed) { - ddebug_replica("switch _write_throttling_controller from \"{}\" to \"{}\"", - old_throttling, - _write_throttling_controller.env_value()); - } + update_throttle_envs(envs); } void replica::query_app_envs(/*out*/ std::map &envs) @@ -1070,5 +1044,6 @@ void replica::replay_prepare_list() init_prepare(mu, true); } } -} -} // namespace + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index c91fac2343..c6cc7bf3a1 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -745,7 +745,7 @@ void replica_stub::on_client_write(gpid id, dsn::message_ex *request) } replica_ptr rep = get_replica(id); if (rep != nullptr) { - rep->on_client_write(request->rpc_code(), request); + rep->on_client_write(request); } else { response_client(id, false, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND); } @@ -767,7 +767,7 @@ void replica_stub::on_client_read(gpid id, dsn::message_ex *request) } replica_ptr rep = get_replica(id); if (rep != nullptr) { - rep->on_client_read(request->rpc_code(), request); + rep->on_client_read(request); } else { response_client(id, true, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND); } diff --git a/src/dist/replication/lib/replica_throttle.cpp b/src/dist/replication/lib/replica_throttle.cpp new file mode 100644 index 0000000000..0ba6dc84e6 --- /dev/null +++ b/src/dist/replication/lib/replica_throttle.cpp @@ -0,0 +1,92 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "replica.h" +#include "mutation.h" +#include "mutation_log.h" +#include "replica_stub.h" + +#include +#include + +namespace dsn { +namespace replication { + +bool replica::throttle_request(throttling_controller &controller, + message_ex *request, + int32_t request_units) +{ + if (!controller.enabled()) { + return false; + } + + int64_t delay_ms = 0; + auto type = controller.control(request, request_units, delay_ms); + if (type != throttling_controller::PASS) { + if (type == throttling_controller::DELAY) { + tasking::enqueue(LPC_WRITE_THROTTLING_DELAY, + &_tracker, + [ this, req = message_ptr(request) ]() { on_client_write(req, true); }, + get_gpid().thread_hash(), + std::chrono::milliseconds(delay_ms)); + _counter_recent_write_throttling_delay_count->increment(); + } else { // type == throttling_controller::REJECT + if (delay_ms > 0) { + tasking::enqueue(LPC_WRITE_THROTTLING_DELAY, + &_tracker, + [ this, req = message_ptr(request) ]() { + response_client_write(req, ERR_BUSY); + }, + get_gpid().thread_hash(), + std::chrono::milliseconds(delay_ms)); + } else { + response_client_write(request, ERR_BUSY); + } + _counter_recent_write_throttling_reject_count->increment(); + } + return true; + } + return false; +} + +void replica::update_throttle_envs(const std::map &envs) +{ + update_throttle_env_internal( + envs, replica_envs::WRITE_QPS_THROTTLING, _write_qps_throttling_controller); + update_throttle_env_internal( + envs, replica_envs::WRITE_SIZE_THROTTLING, _write_size_throttling_controller); +} + +void replica::update_throttle_env_internal(const std::map &envs, + const std::string &key, + throttling_controller &cntl) +{ + bool throttling_changed = false; + std::string old_throttling; + std::string parse_error; + auto find = envs.find(key); + if (find != envs.end()) { + if (!cntl.parse_from_env(find->second, + _app_info.partition_count, + parse_error, + throttling_changed, + old_throttling)) { + dwarn_replica("parse env failed, key = \"{}\", value = \"{}\", error = \"{}\"", + key, + find->second, + parse_error); + // reset if parse failed + cntl.reset(throttling_changed, old_throttling); + } + } else { + // reset if env not found + cntl.reset(throttling_changed, old_throttling); + } + if (throttling_changed) { + ddebug_replica("switch {} from \"{}\" to \"{}\"", key, old_throttling, cntl.env_value()); + } +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/throttling_controller.cpp b/src/dist/replication/lib/throttling_controller.cpp index 7455adc20b..6ca3e3aa98 100644 --- a/src/dist/replication/lib/throttling_controller.cpp +++ b/src/dist/replication/lib/throttling_controller.cpp @@ -37,12 +37,12 @@ namespace replication { throttling_controller::throttling_controller() : _enabled(false), _partition_count(0), - _delay_qps(0), + _delay_units(0), _delay_ms(0), - _reject_qps(0), + _reject_units(0), _reject_delay_ms(0), _last_request_time(0), - _cur_request_count(0) + _cur_units(0) { } @@ -62,10 +62,10 @@ bool throttling_controller::parse_from_env(const std::string &env_value, return false; } bool delay_parsed = false; - int32_t delay_qps = 0; + int64_t delay_units = 0; int64_t delay_ms = 0; bool reject_parsed = false; - int32_t reject_qps = 0; + int64_t reject_units = 0; int64_t reject_delay_ms = 0; for (std::string &s : sargs) { std::vector sargs1; @@ -74,11 +74,25 @@ bool throttling_controller::parse_from_env(const std::string &env_value, parse_error = "invalid field count, should be 3"; return false; } - int32_t qps = 0; - if (!buf2int32(sargs1[0], qps) || qps < 0) { - parse_error = "invalid qps, should be non-negative int"; + + int64_t unit_multiplier = 1; + if (!sargs1[0].empty()) { + if (*sargs1[0].rbegin() == 'M') { + unit_multiplier = 1000 * 1000; + } else if (*sargs1[0].rbegin() == 'K') { + unit_multiplier = 1000; + } + if (unit_multiplier != 1) { + sargs1[0].pop_back(); + } + } + int64_t units = 0; + if (!buf2int64(sargs1[0], units) || units < 0) { + parse_error = "invalid units, should be non-negative int"; return false; } + units *= unit_multiplier; + int64_t ms = 0; if (!buf2int64(sargs1[2], ms) || ms < 0) { parse_error = "invalid delay ms, should be non-negative int"; @@ -90,7 +104,7 @@ bool throttling_controller::parse_from_env(const std::string &env_value, return false; } delay_parsed = true; - delay_qps = qps / partition_count + 1; + delay_units = units / partition_count + 1; delay_ms = ms; } else if (sargs1[1] == "reject") { if (reject_parsed) { @@ -98,7 +112,7 @@ bool throttling_controller::parse_from_env(const std::string &env_value, return false; } reject_parsed = true; - reject_qps = qps / partition_count + 1; + reject_units = units / partition_count + 1; reject_delay_ms = ms; } else { parse_error = "invalid throttling type"; @@ -110,9 +124,9 @@ bool throttling_controller::parse_from_env(const std::string &env_value, _enabled = true; _env_value = env_value; _partition_count = partition_count; - _delay_qps = delay_qps; + _delay_units = delay_units; _delay_ms = delay_ms; - _reject_qps = reject_qps; + _reject_units = reject_units; _reject_delay_ms = reject_delay_ms; return true; } @@ -125,28 +139,28 @@ void throttling_controller::reset(bool &changed, std::string &old_env_value) _enabled = false; _env_value.clear(); _partition_count = 0; - _delay_qps = 0; + _delay_units = 0; _delay_ms = 0; - _reject_qps = 0; + _reject_units = 0; _reject_delay_ms = 0; _last_request_time = 0; - _cur_request_count = 0; + _cur_units = 0; } else { changed = false; } } -throttling_controller::throttling_type throttling_controller::control(const message_ex *request, - int64_t &delay_ms) +throttling_controller::throttling_type +throttling_controller::control(const message_ex *request, int32_t request_units, int64_t &delay_ms) { int64_t now_s = dsn_now_s(); if (now_s != _last_request_time) { - _cur_request_count = 0; + _cur_units = 0; _last_request_time = now_s; } - _cur_request_count++; - if (_reject_qps > 0 && _cur_request_count > _reject_qps) { - _cur_request_count--; + _cur_units += request_units; + if (_reject_units > 0 && _cur_units > _reject_units) { + _cur_units -= request_units; int64_t client_timeout = request->header->client.timeout_ms; if (client_timeout > 0) { delay_ms = std::min(_reject_delay_ms, client_timeout / 2); @@ -155,7 +169,7 @@ throttling_controller::throttling_type throttling_controller::control(const mess } return REJECT; } - if (_delay_qps > 0 && _cur_request_count > _delay_qps) { + if (_delay_units > 0 && _cur_units > _delay_units) { int64_t client_timeout = request->header->client.timeout_ms; if (client_timeout > 0) { delay_ms = std::min(_delay_ms, client_timeout / 2); diff --git a/src/dist/replication/lib/throttling_controller.h b/src/dist/replication/lib/throttling_controller.h index 0058ee1244..c5b208aca8 100644 --- a/src/dist/replication/lib/throttling_controller.h +++ b/src/dist/replication/lib/throttling_controller.h @@ -35,7 +35,12 @@ class message_ex; namespace replication { -// used for replica throttling. +// Used for replica throttling. +// Different throttling strategies may use different 'request_units', which is +// the cost of each request. For QPS-based throttling, request_units=1. +// For size-based throttling, request_units is the bytes size of the incoming +// request. +// // not thread safe class throttling_controller { @@ -50,6 +55,10 @@ class throttling_controller public: throttling_controller(); + // Configures throttling strategy dynamically from app-envs. + // The result of `delay_units` and `reject_units` are ensured greater than 0. + // If user-given parameter is 0*delay*100, then delay_units=1, likewise for reject_units. + // // return true if parse succeed. // return false if parse failed for the reason of invalid env_value. // if return false, the original value will not be changed. @@ -73,18 +82,21 @@ class throttling_controller // do throttling control, return throttling type. // 'delay_ms' is set when the return type is not PASS. - throttling_type control(const message_ex *request, /*out*/ int64_t &delay_ms); + throttling_type + control(const message_ex *request, int32_t request_units, /*out*/ int64_t &delay_ms); private: + friend class throttling_controller_test; + bool _enabled; std::string _env_value; int32_t _partition_count; - int32_t _delay_qps; // should >= 0 + int64_t _delay_units; // should >= 0 int64_t _delay_ms; // should >= 0 - int32_t _reject_qps; // should >= 0 + int64_t _reject_units; // should >= 0 int64_t _reject_delay_ms; // should >= 0 int64_t _last_request_time; - int32_t _cur_request_count; + int64_t _cur_units; }; } // namespace replication diff --git a/src/dist/replication/test/replica_test/unit_test/throttling_controller_test.cpp b/src/dist/replication/test/replica_test/unit_test/throttling_controller_test.cpp new file mode 100644 index 0000000000..ff2dd09cb2 --- /dev/null +++ b/src/dist/replication/test/replica_test/unit_test/throttling_controller_test.cpp @@ -0,0 +1,118 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "dist/replication/lib/throttling_controller.h" + +#include + +namespace dsn { +namespace replication { + +class throttling_controller_test : public ::testing::Test +{ +public: + void test_parse_env_basic() + { + throttling_controller cntl; + std::string parse_err; + bool env_changed = false; + std::string old_value; + ASSERT_TRUE(cntl.parse_from_env("20000*delay*100", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(cntl._cur_units, 0); + ASSERT_EQ(cntl._enabled, true); + ASSERT_EQ(cntl._delay_ms, 100); + ASSERT_EQ(cntl._delay_units, 5000 + 1); + ASSERT_EQ(cntl._reject_delay_ms, 0); + ASSERT_EQ(cntl._reject_units, 0); + ASSERT_EQ(cntl._env_value, "20000*delay*100"); + ASSERT_EQ(cntl._partition_count, 4); + ASSERT_EQ(env_changed, true); + ASSERT_EQ(old_value, ""); + ASSERT_EQ(parse_err, ""); + + ASSERT_TRUE(cntl.parse_from_env( + "20000*delay*100,20000*reject*100", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(cntl._cur_units, 0); + ASSERT_EQ(cntl._enabled, true); + ASSERT_EQ(cntl._delay_ms, 100); + ASSERT_EQ(cntl._delay_units, 5000 + 1); + ASSERT_EQ(cntl._reject_delay_ms, 100); + ASSERT_EQ(cntl._reject_units, 5000 + 1); + ASSERT_EQ(cntl._env_value, "20000*delay*100,20000*reject*100"); + ASSERT_EQ(cntl._partition_count, 4); + ASSERT_EQ(env_changed, true); + ASSERT_EQ(old_value, "20000*delay*100"); + ASSERT_EQ(parse_err, ""); + + // invalid argument + + ASSERT_FALSE(cntl.parse_from_env("*delay*100", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(env_changed, false); + ASSERT_NE(parse_err, ""); + ASSERT_EQ(cntl._enabled, true); // ensure invalid env won't stop throttling + ASSERT_EQ(cntl._delay_ms, 100); + ASSERT_EQ(cntl._delay_units, 5000 + 1); + ASSERT_EQ(cntl._reject_delay_ms, 100); + ASSERT_EQ(cntl._reject_units, 5000 + 1); + + ASSERT_FALSE(cntl.parse_from_env("", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(env_changed, false); + ASSERT_NE(parse_err, ""); + ASSERT_EQ(cntl._enabled, true); + } + + void test_parse_env_multiplier() + { + throttling_controller cntl; + std::string parse_err; + bool env_changed = false; + std::string old_value; + + struct test_case_1 + { + std::string env; + + int64_t delay_units; + int64_t delay_ms; + int64_t reject_units; + int64_t reject_ms; + } test_cases_1[] = { + {"20K*delay*100", 5000 + 1, 100, 0, 0}, + {"20M*delay*100", 5000 * 1000 + 1, 100, 0, 0}, + {"20M*delay*100,20M*reject*100", 5000 * 1000 + 1, 100, 5000 * 1000 + 1, 100}, + + // throttling size exceeds int32_t max value + {"80000M*delay*100", int64_t(20) * 1000 * 1000 * 1000 + 1, 100, 0, 0}, + }; + for (const auto &tc : test_cases_1) { + ASSERT_TRUE(cntl.parse_from_env(tc.env, 4, parse_err, env_changed, old_value)); + ASSERT_EQ(cntl._enabled, true); + ASSERT_EQ(cntl._delay_units, tc.delay_units) << tc.env; + ASSERT_EQ(cntl._delay_ms, tc.delay_ms) << tc.env; + ASSERT_EQ(cntl._reject_units, tc.reject_units) << tc.env; + ASSERT_EQ(cntl._reject_delay_ms, tc.reject_ms) << tc.env; + ASSERT_EQ(env_changed, true); + ASSERT_EQ(parse_err, ""); + } + + // invalid argument + + std::string test_cases_2[] = { + "20m*delay*100", "20B*delay*100", "20KB*delay*100", "20Mb*delay*100", "20MB*delay*100", + }; + for (const std::string &tc : test_cases_2) { + ASSERT_FALSE(cntl.parse_from_env(tc, 4, parse_err, env_changed, old_value)); + ASSERT_EQ(cntl._enabled, true); + ASSERT_EQ(env_changed, false); + ASSERT_NE(parse_err, ""); + } + } +}; + +TEST_F(throttling_controller_test, parse_env_basic) { test_parse_env_basic(); } + +TEST_F(throttling_controller_test, parse_env_multiplier) { test_parse_env_multiplier(); } + +} // namespace replication +} // namespace dsn