Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(throttle): support size-based write throttling (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and qinzuoyan committed Sep 18, 2019
1 parent 4049f32 commit f3d0031
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 107 deletions.
3 changes: 2 additions & 1 deletion src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ const std::string backup_restore_constant::BACKUP_ID("restore.backup_id");
const std::string backup_restore_constant::SKIP_BAD_PARTITION("restore.skip_bad_partition");

const std::string replica_envs::DENY_CLIENT_WRITE("replica.deny_client_write");
const std::string replica_envs::WRITE_THROTTLING("replica.write_throttling");
const std::string replica_envs::WRITE_QPS_THROTTLING("replica.write_throttling");
const std::string replica_envs::WRITE_SIZE_THROTTLING("replica.write_throttling_by_size");

namespace cold_backup {
std::string get_policy_path(const std::string &root, const std::string &policy_name)
Expand Down
3 changes: 2 additions & 1 deletion src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ class replica_envs
{
public:
static const std::string DENY_CLIENT_WRITE;
static const std::string WRITE_THROTTLING;
static const std::string WRITE_QPS_THROTTLING;
static const std::string WRITE_SIZE_THROTTLING;
};

namespace cold_backup {
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ replica::~replica(void)
dinfo("%s: replica destroyed", name());
}

void replica::on_client_read(task_code code, dsn::message_ex *request)
void replica::on_client_read(dsn::message_ex *request)
{
if (status() == partition_status::PS_INACTIVE ||
status() == partition_status::PS_POTENTIAL_SECONDARY) {
Expand Down
27 changes: 22 additions & 5 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,23 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
//
// requests from clients
//
void on_client_write(task_code code, dsn::message_ex *request, bool ignore_throttling = false);
void on_client_read(task_code code, dsn::message_ex *request);
void on_client_write(message_ex *request, bool ignore_throttling = false);
void on_client_read(message_ex *request);

//
// Throttling
//

/// throttle write requests
/// \return true if request is throttled.
/// \see replica::on_client_write
bool throttle_request(throttling_controller &c, message_ex *request, int32_t req_units);
/// update throttling controllers
/// \see replica::update_app_envs
void update_throttle_envs(const std::map<std::string, std::string> &envs);
void update_throttle_env_internal(const std::map<std::string, std::string> &envs,
const std::string &key,
throttling_controller &cntl);

//
// messages and tools from/for meta server
Expand Down Expand Up @@ -343,6 +358,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class ::dsn::replication::mutation_queue;
friend class ::dsn::replication::replica_stub;
friend class mock_replica;
friend class throttling_controller_test;
friend class replica_learn_test;
friend class replica_duplicator_manager;
friend class load_mutation;
Expand Down Expand Up @@ -416,7 +432,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
bool _inactive_is_transient; // upgrade to P/S is allowed only iff true
bool _is_initializing; // when initializing, switching to primary need to update ballot
bool _deny_client_write; // if deny all write requests
throttling_controller _write_throttling_controller;
throttling_controller _write_qps_throttling_controller; // throttling by requests-per-second
throttling_controller _write_size_throttling_controller; // throttling by bytes-per-second

// duplication
std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
Expand All @@ -439,5 +456,5 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
dsn::thread_access_checker _checker;
};
typedef dsn::ref_ptr<replica> replica_ptr;
}
} // namespace
} // namespace replication
} // namespace dsn
51 changes: 10 additions & 41 deletions src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* two-phase commit in replication
*
* Revision history:
* Mar., 2015, @imzhenyu (Zhenyu Guo), first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "replica.h"
#include "mutation.h"
#include "mutation_log.h"
Expand All @@ -42,7 +33,7 @@
namespace dsn {
namespace replication {

void replica::on_client_write(task_code code, dsn::message_ex *request, bool ignore_throttling)
void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
{
_checker.only_one_thread_access();

Expand All @@ -53,7 +44,7 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign
return;
}

task_spec *spec = task_spec::get(code);
task_spec *spec = task_spec::get(request->rpc_code());
if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) {
response_client_write(request, ERR_OPERATION_DISABLED);
return;
Expand All @@ -70,39 +61,17 @@ void replica::on_client_write(task_code code, dsn::message_ex *request, bool ign
return;
}

if (_write_throttling_controller.enabled() && !ignore_throttling) {
int64_t delay_ms = 0;
auto type = _write_throttling_controller.control(request, delay_ms);
if (type != throttling_controller::PASS) {
if (type == throttling_controller::DELAY) {
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, code, req = message_ptr(request) ]() {
on_client_write(code, req, true);
},
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
_counter_recent_write_throttling_delay_count->increment();
} else { // type == throttling_controller::REJECT
if (delay_ms > 0) {
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() {
response_client_write(req, ERR_BUSY);
},
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
} else {
response_client_write(request, ERR_BUSY);
}
_counter_recent_write_throttling_reject_count->increment();
}
if (!ignore_throttling) {
if (throttle_request(_write_qps_throttling_controller, request, 1)) {
return;
}
if (throttle_request(_write_size_throttling_controller, request, request->body_size())) {
return;
}
}

dinfo("%s: got write request from %s", name(), request->header->from_address.to_string());
auto mu = _primary_states.write_queue.add_work(code, request, this);
auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this);
if (mu) {
init_prepare(mu, false);
}
Expand Down Expand Up @@ -696,5 +665,5 @@ void replica::cleanup_preparing_mutations(bool wait)
}
}
}
}
} // namespace
} // namespace replication
} // namespace dsn
33 changes: 4 additions & 29 deletions src/dist/replication/lib/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,33 +557,7 @@ void replica::update_app_envs_internal(const std::map<std::string, std::string>
_deny_client_write = deny_client_write;
}

// WRITE_THROTTLING
bool throttling_changed = false;
std::string old_throttling;
std::string parse_error;
find = envs.find(replica_envs::WRITE_THROTTLING);
if (find != envs.end()) {
if (!_write_throttling_controller.parse_from_env(find->second,
_app_info.partition_count,
parse_error,
throttling_changed,
old_throttling)) {
dwarn_replica("parse env failed, key = \"{}\", value = \"{}\", error = \"{}\"",
replica_envs::WRITE_THROTTLING,
find->second,
parse_error);
// reset if parse failed
_write_throttling_controller.reset(throttling_changed, old_throttling);
}
} else {
// reset if env not found
_write_throttling_controller.reset(throttling_changed, old_throttling);
}
if (throttling_changed) {
ddebug_replica("switch _write_throttling_controller from \"{}\" to \"{}\"",
old_throttling,
_write_throttling_controller.env_value());
}
update_throttle_envs(envs);
}

void replica::query_app_envs(/*out*/ std::map<std::string, std::string> &envs)
Expand Down Expand Up @@ -1070,5 +1044,6 @@ void replica::replay_prepare_list()
init_prepare(mu, true);
}
}
}
} // namespace

} // namespace replication
} // namespace dsn
4 changes: 2 additions & 2 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ void replica_stub::on_client_write(gpid id, dsn::message_ex *request)
}
replica_ptr rep = get_replica(id);
if (rep != nullptr) {
rep->on_client_write(request->rpc_code(), request);
rep->on_client_write(request);
} else {
response_client(id, false, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND);
}
Expand All @@ -767,7 +767,7 @@ void replica_stub::on_client_read(gpid id, dsn::message_ex *request)
}
replica_ptr rep = get_replica(id);
if (rep != nullptr) {
rep->on_client_read(request->rpc_code(), request);
rep->on_client_read(request);
} else {
response_client(id, true, request, partition_status::PS_INVALID, ERR_OBJECT_NOT_FOUND);
}
Expand Down
92 changes: 92 additions & 0 deletions src/dist/replication/lib/replica_throttle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "replica.h"
#include "mutation.h"
#include "mutation_log.h"
#include "replica_stub.h"

#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace replication {

bool replica::throttle_request(throttling_controller &controller,
message_ex *request,
int32_t request_units)
{
if (!controller.enabled()) {
return false;
}

int64_t delay_ms = 0;
auto type = controller.control(request, request_units, delay_ms);
if (type != throttling_controller::PASS) {
if (type == throttling_controller::DELAY) {
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() { on_client_write(req, true); },
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
_counter_recent_write_throttling_delay_count->increment();
} else { // type == throttling_controller::REJECT
if (delay_ms > 0) {
tasking::enqueue(LPC_WRITE_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() {
response_client_write(req, ERR_BUSY);
},
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
} else {
response_client_write(request, ERR_BUSY);
}
_counter_recent_write_throttling_reject_count->increment();
}
return true;
}
return false;
}

void replica::update_throttle_envs(const std::map<std::string, std::string> &envs)
{
update_throttle_env_internal(
envs, replica_envs::WRITE_QPS_THROTTLING, _write_qps_throttling_controller);
update_throttle_env_internal(
envs, replica_envs::WRITE_SIZE_THROTTLING, _write_size_throttling_controller);
}

void replica::update_throttle_env_internal(const std::map<std::string, std::string> &envs,
const std::string &key,
throttling_controller &cntl)
{
bool throttling_changed = false;
std::string old_throttling;
std::string parse_error;
auto find = envs.find(key);
if (find != envs.end()) {
if (!cntl.parse_from_env(find->second,
_app_info.partition_count,
parse_error,
throttling_changed,
old_throttling)) {
dwarn_replica("parse env failed, key = \"{}\", value = \"{}\", error = \"{}\"",
key,
find->second,
parse_error);
// reset if parse failed
cntl.reset(throttling_changed, old_throttling);
}
} else {
// reset if env not found
cntl.reset(throttling_changed, old_throttling);
}
if (throttling_changed) {
ddebug_replica("switch {} from \"{}\" to \"{}\"", key, old_throttling, cntl.env_value());
}
}

} // namespace replication
} // namespace dsn
Loading

0 comments on commit f3d0031

Please sign in to comment.