Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: improve capacity unit calculation #339

Merged
merged 10 commits into from
Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/server/config-server.ini
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,19 @@ falcon_path = /v1/push

[pegasus.collector]
cluster = onebox

available_detect_app = @APP_NAME@
available_detect_alert_script_dir = ./package/bin
available_detect_alert_email_address =
available_detect_interval_seconds = 3
available_detect_alert_fail_count = 30
available_detect_timeout = 5000

app_stat_interval_seconds = 10

cu_stat_app = stat
cu_fetch_interval_seconds = 8
usage_stat_app = @APP_NAME@
capacity_unit_fetch_interval_seconds = 8
storage_size_fetch_interval_seconds = 3600

[pegasus.clusters]
onebox = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603
Expand Down
7 changes: 5 additions & 2 deletions src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,19 @@

[pegasus.collector]
cluster = %{cluster.name}

available_detect_app = temp
available_detect_alert_script_dir = ./package/bin
available_detect_alert_email_address =
available_detect_interval_seconds = 3
available_detect_alert_fail_count = 30
available_detect_timeout = 5000

app_stat_interval_seconds = 10

cu_stat_app = stat
cu_fetch_interval_seconds = 8
usage_stat_app = stat
capacity_unit_fetch_interval_seconds = 8
storage_size_fetch_interval_seconds = 3600

[pegasus.clusters]
%{cluster.name} = %{meta.server.list}
Expand Down
118 changes: 93 additions & 25 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ namespace pegasus {
namespace server {

DEFINE_TASK_CODE(LPC_PEGASUS_APP_STAT_TIMER, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE(LPC_PEGASUS_CU_STAT_TIMER, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE(LPC_PEGASUS_CAPACITY_UNIT_STAT_TIMER,
TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE(LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)

info_collector::info_collector()
{
Expand All @@ -49,22 +54,40 @@ info_collector::info_collector()
10, // default value 10s
"app stat interval seconds");

_cu_stat_app = dsn_config_get_value_string(
"pegasus.collector", "cu_stat_app", "", "app for recording capacity unit info");
dassert(!_cu_stat_app.empty(), "");
_usage_stat_app = dsn_config_get_value_string(
"pegasus.collector", "usage_stat_app", "", "app for recording usage statistics");
dassert(!_usage_stat_app.empty(), "");
// initialize the _client.
if (!pegasus_client_factory::initialize(nullptr)) {
dassert(false, "Initialize the pegasus client failed");
}
_client = pegasus_client_factory::get_client(_cluster_name.c_str(), _cu_stat_app.c_str());
_client = pegasus_client_factory::get_client(_cluster_name.c_str(), _usage_stat_app.c_str());
dassert(_client != nullptr, "Initialize the client failed");
_result_writer = dsn::make_unique<result_writer>(_client);

_cu_fetch_interval_seconds =
_capacity_unit_fetch_interval_seconds =
(uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"cu_fetch_interval_seconds",
"capacity_unit_fetch_interval_seconds",
8, // default value 8s
"capacity unit fetch interval seconds");
// _capacity_unit_retry_wait_seconds is in range of [1, 10]
_capacity_unit_retry_wait_seconds =
std::min(10u, std::max(1u, _capacity_unit_fetch_interval_seconds / 10));
// _capacity_unit_retry_max_count is in range of [0, 3]
_capacity_unit_retry_max_count =
std::min(3u, _capacity_unit_fetch_interval_seconds / _capacity_unit_retry_wait_seconds);

_storage_size_fetch_interval_seconds =
(uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"storage_size_fetch_interval_seconds",
3600, // default value 1h
"storage size fetch interval seconds");
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
// _storage_size_retry_max_count is in range of [0, 3]
_storage_size_retry_max_count =
std::min(3u, _storage_size_fetch_interval_seconds / _storage_size_retry_wait_seconds);
}

info_collector::~info_collector()
Expand All @@ -85,13 +108,21 @@ void info_collector::start()
0,
std::chrono::minutes(1));

_cu_stat_timer_task =
::dsn::tasking::enqueue_timer(LPC_PEGASUS_CU_STAT_TIMER,
&_tracker,
[this] { on_capacity_unit_stat(); },
std::chrono::seconds(_cu_fetch_interval_seconds),
0,
std::chrono::minutes(1));
_capacity_unit_stat_timer_task = ::dsn::tasking::enqueue_timer(
LPC_PEGASUS_CAPACITY_UNIT_STAT_TIMER,
&_tracker,
[this] { on_capacity_unit_stat(_capacity_unit_retry_max_count); },
std::chrono::seconds(_capacity_unit_fetch_interval_seconds),
0,
std::chrono::minutes(1));

_storage_size_stat_timer_task = ::dsn::tasking::enqueue_timer(
LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
&_tracker,
[this] { on_storage_size_stat(_storage_size_retry_max_count); },
std::chrono::seconds(_storage_size_fetch_interval_seconds),
0,
std::chrono::minutes(1));
}

void info_collector::stop() { _tracker.cancel_outstanding_tasks(); }
Expand Down Expand Up @@ -230,38 +261,75 @@ info_collector::AppStatCounters *info_collector::get_app_counters(const std::str
return counters;
}

void info_collector::on_capacity_unit_stat()
void info_collector::on_capacity_unit_stat(int remaining_retry_count)
{
ddebug("start to stat capacity unit");
ddebug("start to stat capacity unit, remaining_retry_count = %d", remaining_retry_count);
std::vector<node_capacity_unit_stat> nodes_stat;
if (!get_capacity_unit_stat(&_shell_context, nodes_stat)) {
derror("get capacity unit stat failed");
if (remaining_retry_count > 0) {
derror("get capacity unit stat failed, remaining_retry_count = %d, "
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
"wait %u seconds to retry",
remaining_retry_count,
_capacity_unit_retry_wait_seconds);
::dsn::tasking::enqueue(LPC_PEGASUS_CAPACITY_UNIT_STAT_TIMER,
&_tracker,
[=] { on_capacity_unit_stat(remaining_retry_count - 1); },
0,
std::chrono::seconds(_capacity_unit_retry_wait_seconds));
} else {
derror("get capacity unit stat failed, remaining_retry_count = 0, no retry anymore");
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
}
return;
}
for (auto elem : nodes_stat) {
if (!has_capacity_unit_updated(elem.node_address, elem.timestamp)) {
for (node_capacity_unit_stat &elem : nodes_stat) {
if (elem.node_address.empty() || elem.timestamp.empty() ||
!has_capacity_unit_updated(elem.node_address, elem.timestamp)) {
dinfo("recent read/write capacity unit value of node %s has not updated",
elem.node_address.c_str());
continue;
}
_result_writer->set_result(elem.timestamp, elem.node_address, elem.dump_to_json());
_result_writer->set_result(elem.timestamp, "cu@" + elem.node_address, elem.dump_to_json());
}
}

bool info_collector::has_capacity_unit_updated(const std::string &node_address,
const std::string &timestamp)
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_cu_update_info_lock);
auto find = _cu_update_info.find(node_address);
if (find == _cu_update_info.end()) {
_cu_update_info[node_address] = timestamp;
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_capacity_unit_update_info_lock);
auto find = _capacity_unit_update_info.find(node_address);
if (find == _capacity_unit_update_info.end()) {
_capacity_unit_update_info[node_address] = timestamp;
return true;
}
if (timestamp > find->second) {
_cu_update_info[node_address] = timestamp;
find->second = timestamp;
return true;
}
return false;
}

void info_collector::on_storage_size_stat(int remaining_retry_count)
{
ddebug("start to stat storage size, remaining_retry_count = %d", remaining_retry_count);
app_storage_size_stat st_stat;
if (!get_storage_size_stat(&_shell_context, st_stat)) {
if (remaining_retry_count > 0) {
derror("get storage size stat failed, remaining_retry_count = %d, "
"wait %u seconds to retry",
remaining_retry_count,
_storage_size_retry_wait_seconds);
::dsn::tasking::enqueue(LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
&_tracker,
[=] { on_storage_size_stat(remaining_retry_count - 1); },
0,
std::chrono::seconds(_storage_size_retry_wait_seconds));
} else {
derror("get storage size stat failed, remaining_retry_count = 0, no retry anymore");
}
return;
}
_result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json());
}

} // namespace server
} // namespace pegasus
22 changes: 15 additions & 7 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ class info_collector
void on_app_stat();
AppStatCounters *get_app_counters(const std::string &app_name);

void on_capacity_unit_stat();
void on_capacity_unit_stat(int remaining_retry_count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改成 retry_count 吧, 函数本身没有remaining这层概念

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得有remaining语义更清楚,表示还允许多少次重试。不然只是retry_count,那么retry_count=3可能被理解为这次调用是第3次重试。

bool has_capacity_unit_updated(const std::string &node_address, const std::string &timestamp);

void on_storage_size_stat(int remaining_retry_count);

private:
dsn::task_tracker _tracker;
::dsn::rpc_address _meta_servers;
Expand All @@ -79,17 +81,23 @@ class info_collector
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
std::map<std::string, AppStatCounters *> _app_stat_counters;

// app for recording read/write cu.
std::string _cu_stat_app;
// app for recording usage statistics, including read/write capacity unit and storage size.
std::string _usage_stat_app;
// client to access server.
pegasus_client *_client;
// for writing cu stat result
std::unique_ptr<result_writer> _result_writer;
uint32_t _cu_fetch_interval_seconds;
::dsn::task_ptr _cu_stat_timer_task;
::dsn::utils::ex_lock_nr _cu_update_info_lock;
uint32_t _capacity_unit_fetch_interval_seconds;
uint32_t _capacity_unit_retry_wait_seconds;
uint32_t _capacity_unit_retry_max_count;
::dsn::task_ptr _capacity_unit_stat_timer_task;
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _cu_update_info;
std::map<std::string, string> _capacity_unit_update_info;
};
} // namespace server
} // namespace pegasus
Loading