Skip to content

Commit

Permalink
*:support capacity unit read/write statistics (apache#318)
Browse files Browse the repository at this point in the history
Former-commit-id: 31812d21797dbf8d4ab622e4633f093f24d749b9 [formerly fa8ae10]
Former-commit-id: d202d192c7d62378bb943bcf6cebbb6eb7624255
  • Loading branch information
zhangyifan27 authored and Wu Tao committed May 30, 2019
1 parent 46415ac commit 519b883
Show file tree
Hide file tree
Showing 20 changed files with 1,212 additions and 278 deletions.
2 changes: 1 addition & 1 deletion rdsn
11 changes: 11 additions & 0 deletions scripts/falcon_screen.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@
"method": "",
"timespan": 86400
},
{
"title": "集群读写吞吐量(统计最近10s内的读写吞吐量,单位:Capacity Unit)",
"endpoints": ["cluster=${cluster.name} job=collector service=pegasus"],
"counters": [
"collector*app.pegasus*app.stat.recent_read_cu#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.recent_write_cu#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{
"title": "集群Load-Balance状态(待执行的balance操作数、已执行的balance操作数等)",
"endpoints": ["cluster=${cluster.name} job=meta service=pegasus"],
Expand Down
96 changes: 15 additions & 81 deletions src/server/available_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
#include <sstream>

#include "base/pegasus_key_schema.h"
#include "result_writer.h"

namespace pegasus {
namespace server {

DEFINE_TASK_CODE(LPC_DETECT_AVAILABLE, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)

available_detector::available_detector()
: _client(nullptr),
_ddl_client(nullptr),
Expand Down Expand Up @@ -64,6 +67,7 @@ available_detector::available_detector()
}
_client = pegasus_client_factory::get_client(_cluster_name.c_str(), _app_name.c_str());
dassert(_client != nullptr, "Initialize the _client failed");
_result_writer = dsn::make_unique<result_writer>(_client);
_ddl_client.reset(new replication_ddl_client(_meta_list));
dassert(_ddl_client != nullptr, "Initialize the _ddl_client failed");
if (!_alert_email_address.empty()) {
Expand Down Expand Up @@ -118,45 +122,28 @@ available_detector::available_detector()
_pfc_available_minute->set(1000000); // init to 100%
}

available_detector::~available_detector()
{
// don't delete _client, just set _client to nullptr.
_client = nullptr;
stop();
}
available_detector::~available_detector() = default;

void available_detector::start()
{
// available detector delay 60s to wait the pegasus finishing the initialization.
_detect_timer = ::dsn::tasking::enqueue(LPC_DETECT_AVAILABLE,
nullptr,
&_tracker,
std::bind(&available_detector::detect_available, this),
0,
std::chrono::minutes(1));
report_availability_info();
}

void available_detector::stop()
{
for (auto &tptr : _detect_tasks) {
if (tptr != nullptr)
tptr->cancel(true);
}

if (_detect_timer != nullptr)
_detect_timer->cancel(true);

if (_report_task != nullptr)
_report_task->cancel(true);
}
void available_detector::stop() { _tracker.cancel_outstanding_tasks(); }

void available_detector::detect_available()
{
if (!generate_hash_keys()) {
derror("initialize hash_keys failed, do not detect available, retry after 60 seconds");
_detect_timer =
::dsn::tasking::enqueue(LPC_DETECT_AVAILABLE,
nullptr,
&_tracker,
std::bind(&available_detector::detect_available, this),
0,
std::chrono::minutes(1));
Expand All @@ -172,7 +159,7 @@ void available_detector::detect_available()
auto call_func = std::bind(&available_detector::on_detect, this, i);
_detect_tasks[i] =
::dsn::tasking::enqueue_timer(LPC_DETECT_AVAILABLE,
nullptr,
&_tracker,
std::move(call_func),
std::chrono::seconds(_detect_interval_seconds));
}
Expand Down Expand Up @@ -225,7 +212,7 @@ void available_detector::report_availability_info()
};
_report_task = ::dsn::tasking::enqueue_timer(
LPC_DETECT_AVAILABLE,
nullptr,
&_tracker,
std::move(call_func),
std::chrono::minutes(1),
0,
Expand Down Expand Up @@ -426,9 +413,7 @@ void available_detector::on_day_report()
}
}

// set try_count to 3000 (keep on retrying for 3000 minutes) to avoid losting detect result
// if the result table is also unavailable for a long time.
set_detect_result(hash_key, sort_key, value, 3000);
_result_writer->set_result(hash_key, sort_key, value);
}

void available_detector::on_hour_report()
Expand All @@ -452,9 +437,7 @@ void available_detector::on_hour_report()
_pfc_fail_times_hour->set(fail_times);
_pfc_available_hour->set(available);

// set try_count to 3000 (keep on retrying for 3000 minutes) to avoid losting detect result
// if the result table is also unavailable for a long time.
set_detect_result(hash_key, sort_key, value, 3000);
_result_writer->set_result(hash_key, sort_key, value);
}

void available_detector::on_minute_report()
Expand All @@ -478,56 +461,7 @@ void available_detector::on_minute_report()
_pfc_fail_times_minute->set(fail_times);
_pfc_available_minute->set(available);

// set try_count to 3000 (keep on retrying for 3000 minutes) to avoid losting detect result
// if the result table is also unavailable for a long time.
set_detect_result(hash_key, sort_key, value, 3000);
}

void available_detector::set_detect_result(const std::string &hash_key,
const std::string &sort_key,
const std::string &value,
int try_count)
{
_client->async_set(
hash_key,
sort_key,
value,
[this, hash_key, sort_key, value, try_count](int err,
pegasus_client::internal_info &&info) {
if (err != PERR_OK) {
int new_try_count = try_count - 1;
if (new_try_count > 0) {
derror("set_detect_result fail, hash_key = %s, sort_key = %s, value = %s, "
"error = %s, left_try_count = %d, try again after 1 minute",
hash_key.c_str(),
sort_key.c_str(),
value.c_str(),
_client->get_error_string(err),
new_try_count);
::dsn::tasking::enqueue(LPC_DETECT_AVAILABLE,
nullptr,
[this, hash_key, sort_key, value, new_try_count]() {
set_detect_result(
hash_key, sort_key, value, new_try_count);
},
0,
std::chrono::minutes(1));
} else {
derror("set_detect_result fail, hash_key = %s, sort_key = %s, value = %s, "
"error = %s, left_try_count = %d, do not try again",
hash_key.c_str(),
sort_key.c_str(),
value.c_str(),
_client->get_error_string(err),
new_try_count);
}
} else {
ddebug("set_detect_result succeed, hash_key = %s, sort_key = %s, value = %s",
hash_key.c_str(),
sort_key.c_str(),
value.c_str());
}
});
}
_result_writer->set_result(hash_key, sort_key, value);
}
} // namespace
} // namespace server
} // namespace pegasus
15 changes: 7 additions & 8 deletions src/server/available_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,18 @@ namespace server {

using ::dsn::replication::replication_ddl_client;

DEFINE_TASK_CODE(LPC_DETECT_AVAILABLE, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
class result_writer;

class available_detector
{
public:
available_detector();
virtual ~available_detector();
~available_detector();

void start();
void stop();

private:
void set_detect_result(const std::string &hash_key,
const std::string &sort_key,
const std::string &value,
int try_count);
// generate hash_keys that can access every partition.
bool generate_hash_keys();
void on_detect(int32_t idx);
Expand All @@ -41,8 +37,11 @@ class available_detector
void on_minute_report();

private:
dsn::task_tracker _tracker;
std::string _cluster_name;
std::string _app_name;
// for writing detect result
std::unique_ptr<result_writer> _result_writer;
// client to access server.
pegasus_client *_client;
std::shared_ptr<replication_ddl_client> _ddl_client;
Expand Down Expand Up @@ -87,5 +86,5 @@ class available_detector
::dsn::perf_counter_wrapper _pfc_fail_times_minute;
::dsn::perf_counter_wrapper _pfc_available_minute;
};
}
}
} // namespace server
} // namespace pegasus
Loading

0 comments on commit 519b883

Please sign in to comment.