Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
meta: support white list of replica-servers & add remote cmd to ctrl …
Browse files Browse the repository at this point in the history
…live_percentage (#226)
  • Loading branch information
HuangWei authored and qinzuoyan committed Mar 7, 2019
1 parent 0397602 commit ddaa1ad
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 8 deletions.
13 changes: 11 additions & 2 deletions include/dsn/dist/failure_detector/failure_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@
*/
#pragma once

#include <dsn/tool-api/zlocks.h>
#include <dsn/dist/failure_detector/fd.client.h>
#include <dsn/dist/failure_detector/fd.server.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/tool-api/zlocks.h>

namespace dsn {
namespace fd {
Expand Down Expand Up @@ -93,12 +93,15 @@ class failure_detector : public failure_detector_service,
{
public:
failure_detector();
virtual ~failure_detector() {}
virtual ~failure_detector() { unregister_ctrl_commands(); }

virtual void on_ping(const beacon_msg &beacon, ::dsn::rpc_replier<beacon_ack> &reply);

virtual void end_ping(::dsn::error_code err, const beacon_ack &ack, void *context);

virtual void register_ctrl_commands();
virtual void unregister_ctrl_commands();

public:
error_code start(uint32_t check_interval_seconds,
uint32_t beacon_interval_seconds,
Expand Down Expand Up @@ -136,6 +139,8 @@ class failure_detector : public failure_detector_service,

bool remove_from_allow_list(::dsn::rpc_address node);

void set_allow_list(const std::vector<std::string> &replica_addrs);

int worker_count() const { return static_cast<int>(_workers.size()); }

int master_count() const { return static_cast<int>(_masters.size()); }
Expand All @@ -153,6 +158,8 @@ class failure_detector : public failure_detector_service,
private:
void check_all_records();

std::string get_allow_list(const std::vector<std::string> &args) const;

private:
class master_record
{
Expand Down Expand Up @@ -214,6 +221,8 @@ class failure_detector : public failure_detector_service,

perf_counter_wrapper _recent_beacon_fail_count;

dsn_handle_t _get_allow_list = nullptr;

protected:
mutable zlock _lock;
dsn::task_tracker _tracker;
Expand Down
49 changes: 48 additions & 1 deletion src/dist/failure_detector/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/

#include <dsn/dist/failure_detector.h>
#include <dsn/tool-api/command_manager.h>
#include <chrono>
#include <ctime>

Expand All @@ -55,6 +56,33 @@ failure_detector::failure_detector()
_is_started = false;
}

void failure_detector::register_ctrl_commands()
{
_get_allow_list = dsn::command_manager::instance().register_app_command(
{"fd.allow_list"},
"fd.allow_list",
"show allow list of replica",
[this](const std::vector<std::string> &args) { return get_allow_list(args); });
}

void failure_detector::unregister_ctrl_commands() { UNREGISTER_VALID_HANDLER(_get_allow_list); }

std::string failure_detector::get_allow_list(const std::vector<std::string> &args) const
{
if (!_is_started)
return "error: fd is not started";

std::stringstream oss;
dsn::zauto_lock l(_lock);
oss << "get ok: allow list " << (_use_allow_list ? "enabled. list: " : "disabled.");
for (auto iter = _allow_list.begin(); iter != _allow_list.end(); ++iter) {
if (iter != _allow_list.begin())
oss << ",";
oss << iter->to_string();
}
return oss.str();
}

error_code failure_detector::start(uint32_t check_interval_seconds,
uint32_t beacon_interval_seconds,
uint32_t lease_seconds,
Expand Down Expand Up @@ -215,7 +243,7 @@ void failure_detector::check_all_records()
return;
}

std::vector<::dsn::rpc_address> expire;
std::vector<rpc_address> expire;
uint64_t now = dsn_now_ms();

{
Expand Down Expand Up @@ -291,6 +319,25 @@ bool failure_detector::remove_from_allow_list(::dsn::rpc_address node)
return _allow_list.erase(node) > 0;
}

void failure_detector::set_allow_list(const std::vector<std::string> &replica_addrs)
{
dassert(_is_started, "FD is already started, the allow list should really not be modified");

std::vector<rpc_address> nodes;
for (auto &addr : replica_addrs) {
rpc_address node;
if (!node.from_string_ipv4(addr.c_str())) {
dwarn("replica_white_list has invalid ip %s, the allow list won't be modified",
addr.c_str());
return;
}
nodes.push_back(node);
}

for (auto &node : nodes)
add_allow_list(node);
}

void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon_ack &ack)
{
ack.time = beacon.time;
Expand Down
10 changes: 10 additions & 0 deletions src/dist/replication/meta_server/meta_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ void meta_options::initialize()

cold_backup_disabled = dsn_config_get_value_bool(
"meta_server", "cold_backup_disabled", true, "whether to disable cold backup");

enable_white_list =
dsn_config_get_value_bool("meta_server",
"enable_white_list",
false,
"whether to enable white list of replica servers");

const char *replica_white_list_raw = dsn_config_get_value_string(
"meta_server", "replica_white_list", "", "white list of replica-servers in meta-server");
utils::split_args(replica_white_list_raw, replica_white_list, ',');
}
}
}
3 changes: 3 additions & 0 deletions src/dist/replication/meta_server/meta_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class meta_options

bool cold_backup_disabled;

bool enable_white_list;
std::vector<std::string> replica_white_list;

public:
void initialize();

Expand Down
58 changes: 53 additions & 5 deletions src/dist/replication/meta_server/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include <fmt/format.h>

#include <dsn/utility/factory_store.h>
#include <dsn/utility/extensible_object.h>
#include <dsn/utility/string_conv.h>
#include <dsn/dist/meta_state_service.h>
#include <dsn/tool-api/command_manager.h>
#include <algorithm> // for std::remove_if
Expand All @@ -56,6 +58,8 @@ meta_service::meta_service()
{
_opts.initialize();
_meta_opts.initialize();
_node_live_percentage_threshold_for_update =
_meta_opts.node_live_percentage_threshold_for_update;
_state.reset(new server_state());
_function_level.store(_meta_opts.meta_function_level_on_start);
if (_meta_opts.recover_from_replica_server) {
Expand All @@ -76,15 +80,19 @@ meta_service::meta_service()
"eon.meta_service", "unalive_nodes", COUNTER_TYPE_NUMBER, "current count of unalive nodes");
}

meta_service::~meta_service() { _tracker.cancel_outstanding_tasks(); }
meta_service::~meta_service()
{
_tracker.cancel_outstanding_tasks();
unregister_ctrl_commands();
}

bool meta_service::check_freeze() const
{
zauto_lock l(_failure_detector->_lock);
if (_alive_set.size() < _meta_opts.min_live_node_count_for_unfreeze)
return true;
int total = _alive_set.size() + _dead_set.size();
return _alive_set.size() * 100 < _meta_opts.node_live_percentage_threshold_for_update * total;
return _alive_set.size() * 100 < _node_live_percentage_threshold_for_update * total;
}

error_code meta_service::remote_storage_initialize()
Expand Down Expand Up @@ -159,6 +167,39 @@ void meta_service::get_node_state(/*out*/ std::map<rpc_address, bool> &all_nodes

void meta_service::balancer_run() { _state->check_all_partitions(); }

void meta_service::register_ctrl_commands()
{
_ctrl_node_live_percentage_threshold_for_update =
dsn::command_manager::instance().register_app_command(
{"live_percentage"},
"live_percentage [num | DEFAULT]",
"node live percentage threshold for update",
[this](const std::vector<std::string> &args) {
std::string result("OK");
if (args.empty()) {
result = std::to_string(_node_live_percentage_threshold_for_update);
} else {
if (args[0] == "DEFAULT") {
_node_live_percentage_threshold_for_update =
_meta_opts.node_live_percentage_threshold_for_update;
} else {
int32_t v = 0;
if (!dsn::buf2int32(args[0], v) || v < 0) {
result = std::string("ERR: invalid arguments");
} else {
_node_live_percentage_threshold_for_update = v;
}
}
}
return result;
});
}

void meta_service::unregister_ctrl_commands()
{
UNREGISTER_VALID_HANDLER(_ctrl_node_live_percentage_threshold_for_update);
}

void meta_service::start_service()
{
zauto_lock l(_failure_detector->_lock);
Expand Down Expand Up @@ -208,22 +249,29 @@ void meta_service::start_service()
error_code meta_service::start()
{
dassert(!_started, "meta service is already started");
register_ctrl_commands();

error_code err;

err = remote_storage_initialize();
dreturn_not_ok_logged(err, "init remote storage failed, err = %s", err.to_string());
ddebug("remote storage is successfully initialized");

// start failure detector, and try to acqure the leader lock
// start failure detector, and try to acquire the leader lock
_failure_detector.reset(new meta_server_failure_detector(this));
if (_meta_opts.enable_white_list)
_failure_detector->set_allow_list(_meta_opts.replica_white_list);
_failure_detector->register_ctrl_commands();

err = _failure_detector->start(_opts.fd_check_interval_seconds,
_opts.fd_beacon_interval_seconds,
_opts.fd_lease_seconds,
_opts.fd_grace_seconds,
false);
_meta_opts.enable_white_list);

dreturn_not_ok_logged(err, "start failure_detector failed, err = %s", err.to_string());
ddebug("meta service failure detector is successfully started");
ddebug("meta service failure detector is successfully started %s",
_meta_opts.enable_white_list ? "with whitelist enabled" : "");

// should register rpc handlers before acquiring leader lock, so that this meta service
// can tell others who is the current leader
Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/meta_server/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class meta_service : public serverlet<meta_service>

private:
void register_rpc_handlers();
void register_ctrl_commands();
void unregister_ctrl_commands();

// client => meta server
// query partition configuration
Expand Down Expand Up @@ -184,6 +186,8 @@ class meta_service : public serverlet<meta_service>

replication_options _opts;
meta_options _meta_opts;
uint64_t _node_live_percentage_threshold_for_update;
dsn_handle_t _ctrl_node_live_percentage_threshold_for_update = nullptr;

std::shared_ptr<server_state> _state;
std::shared_ptr<meta_server_failure_detector> _failure_detector;
Expand Down

0 comments on commit ddaa1ad

Please sign in to comment.