Skip to content

Commit

Permalink
feat(collector): add statistics for partition hotspot (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored Feb 17, 2020
1 parent ce8f53f commit 695b366
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 86 deletions.
25 changes: 25 additions & 0 deletions src/server/hotspot_partition_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once

#include "shell/commands.h"

namespace pegasus {
namespace server {

struct hotspot_partition_data
{
hotspot_partition_data(const row_data &row)
: total_qps(row.get_total_qps()),
total_cu(row.get_total_cu()),
partition_name(row.row_name){};
hotspot_partition_data() {}
double total_qps;
double total_cu;
std::string partition_name;
};

} // namespace server
} // namespace pegasus
123 changes: 42 additions & 81 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ info_collector::~info_collector()
for (auto kv : _app_stat_counters) {
delete kv.second;
}
for (auto store : _hotspot_calculator_store) {
delete store.second;
}
}

void info_collector::start()
Expand Down Expand Up @@ -125,100 +128,46 @@ void info_collector::stop() { _tracker.cancel_outstanding_tasks(); }
void info_collector::on_app_stat()
{
ddebug("start to stat apps");
std::vector<row_data> rows;
if (!get_app_stat(&_shell_context, "", rows)) {
std::map<std::string, std::vector<row_data>> all_rows;
if (!get_app_partition_stat(&_shell_context, all_rows)) {
derror("call get_app_stat() failed");
return;
}
std::vector<double> read_qps;
std::vector<double> write_qps;
rows.resize(rows.size() + 1);
read_qps.resize(rows.size());
write_qps.resize(rows.size());
row_data &all = rows.back();
all.row_name = "_all_";
for (int i = 0; i < rows.size() - 1; ++i) {
row_data &row = rows[i];
all.get_qps += row.get_qps;
all.multi_get_qps += row.multi_get_qps;
all.put_qps += row.put_qps;
all.multi_put_qps += row.multi_put_qps;
all.remove_qps += row.remove_qps;
all.multi_remove_qps += row.multi_remove_qps;
all.incr_qps += row.incr_qps;
all.check_and_set_qps += row.check_and_set_qps;
all.check_and_mutate_qps += row.check_and_mutate_qps;
all.scan_qps += row.scan_qps;
all.recent_read_cu += row.recent_read_cu;
all.recent_write_cu += row.recent_write_cu;
all.recent_expire_count += row.recent_expire_count;
all.recent_filter_count += row.recent_filter_count;
all.recent_abnormal_count += row.recent_abnormal_count;
all.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
all.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
all.storage_mb += row.storage_mb;
all.storage_count += row.storage_count;
all.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
all.rdb_block_cache_total_count += row.rdb_block_cache_total_count;
all.rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage;
all.rdb_memtable_mem_usage += row.rdb_memtable_mem_usage;
all.rdb_estimate_num_keys += row.rdb_estimate_num_keys;
read_qps[i] = row.get_qps + row.multi_get_qps + row.scan_qps;
write_qps[i] = row.put_qps + row.multi_put_qps + row.remove_qps + row.multi_remove_qps +
row.incr_qps + row.check_and_set_qps + row.check_and_mutate_qps;
}
read_qps[read_qps.size() - 1] = all.get_qps + all.multi_get_qps + all.scan_qps;
write_qps[read_qps.size() - 1] = all.put_qps + all.multi_put_qps + all.remove_qps +
all.multi_remove_qps + all.incr_qps + all.check_and_set_qps +
all.check_and_mutate_qps;
for (int i = 0; i < rows.size(); ++i) {
row_data &row = rows[i];
AppStatCounters *counters = get_app_counters(row.row_name);
counters->get_qps->set(row.get_qps);
counters->multi_get_qps->set(row.multi_get_qps);
counters->put_qps->set(row.put_qps);
counters->multi_put_qps->set(row.multi_put_qps);
counters->remove_qps->set(row.remove_qps);
counters->multi_remove_qps->set(row.multi_remove_qps);
counters->incr_qps->set(row.incr_qps);
counters->check_and_set_qps->set(row.check_and_set_qps);
counters->check_and_mutate_qps->set(row.check_and_mutate_qps);
counters->scan_qps->set(row.scan_qps);
counters->recent_read_cu->set(row.recent_read_cu);
counters->recent_write_cu->set(row.recent_write_cu);
counters->recent_expire_count->set(row.recent_expire_count);
counters->recent_filter_count->set(row.recent_filter_count);
counters->recent_abnormal_count->set(row.recent_abnormal_count);
counters->recent_write_throttling_delay_count->set(row.recent_write_throttling_delay_count);
counters->recent_write_throttling_reject_count->set(
row.recent_write_throttling_reject_count);
counters->storage_mb->set(row.storage_mb);
counters->storage_count->set(row.storage_count);
counters->rdb_block_cache_hit_rate->set(
std::abs(row.rdb_block_cache_total_count) < 1e-6
? 0
: row.rdb_block_cache_hit_count / row.rdb_block_cache_total_count * 1000000);
counters->rdb_index_and_filter_blocks_mem_usage->set(
row.rdb_index_and_filter_blocks_mem_usage);
counters->rdb_memtable_mem_usage->set(row.rdb_memtable_mem_usage);
counters->rdb_estimate_num_keys->set(row.rdb_estimate_num_keys);
counters->read_qps->set(read_qps[i]);
counters->write_qps->set(write_qps[i]);

table_stats all_stats("_all_");
for (const auto &app_rows : all_rows) {
// get statistics data for app
table_stats app_stats(app_rows.first);
for (auto partition_row : app_rows.second) {
app_stats.aggregate(partition_row);
}
get_app_counters(app_stats.app_name)->set(app_stats);
// get row data statistics for all of the apps
all_stats.merge(app_stats);

// hotspot_calculator is to detect hotspots
hotspot_calculator *hotspot_calculator =
get_hotspot_calculator(app_rows.first, app_rows.second.size());
hotspot_calculator->aggregate(app_rows.second);
// new policy can be designed by strategy pattern in hotspot_partition_data.h
hotspot_calculator->start_alg();
}
get_app_counters(all_stats.app_name)->set(all_stats);

ddebug("stat apps succeed, app_count = %d, total_read_qps = %.2f, total_write_qps = %.2f",
(int)(rows.size() - 1),
read_qps[read_qps.size() - 1],
write_qps[read_qps.size() - 1]);
(int)(all_rows.size() - 1),
all_stats.get_total_read_qps(),
all_stats.get_total_write_qps());
}

info_collector::AppStatCounters *info_collector::get_app_counters(const std::string &app_name)
info_collector::app_stat_counters *info_collector::get_app_counters(const std::string &app_name)
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_app_stat_counter_lock);
auto find = _app_stat_counters.find(app_name);
if (find != _app_stat_counters.end()) {
return find->second;
}
AppStatCounters *counters = new AppStatCounters();
app_stat_counters *counters = new app_stat_counters();

char counter_name[1024];
char counter_desc[1024];
Expand Down Expand Up @@ -329,5 +278,17 @@ void info_collector::on_storage_size_stat(int remaining_retry_count)
_result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json());
}

hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &app_name,
const int partition_num)
{
auto iter = _hotspot_calculator_store.find(app_name);
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
hotspot_calculator *calculator_address = new hotspot_calculator(app_name, partition_num);
_hotspot_calculator_store[app_name] = calculator_address;
return calculator_address;
}

} // namespace server
} // namespace pegasus
50 changes: 46 additions & 4 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
#include <event2/event.h>
#include <event2/http.h>
#include <event2/bufferevent.h>
#include <fstream>

#include "../shell/commands.h"
#include "table_stats.h"
#include "table_hotspot_policy.h"

namespace pegasus {
namespace server {
Expand All @@ -28,8 +29,44 @@ class result_writer;
class info_collector
{
public:
struct AppStatCounters
struct app_stat_counters
{
void set(const table_stats &row_stats)
{
get_qps->set(row_stats.total_get_qps);
multi_get_qps->set(row_stats.total_multi_get_qps);
put_qps->set(row_stats.total_put_qps);
multi_put_qps->set(row_stats.total_multi_put_qps);
remove_qps->set(row_stats.total_remove_qps);
multi_remove_qps->set(row_stats.total_multi_remove_qps);
incr_qps->set(row_stats.total_incr_qps);
check_and_set_qps->set(row_stats.total_check_and_set_qps);
check_and_mutate_qps->set(row_stats.total_check_and_mutate_qps);
scan_qps->set(row_stats.total_scan_qps);
recent_read_cu->set(row_stats.total_recent_read_cu);
recent_write_cu->set(row_stats.total_recent_write_cu);
recent_expire_count->set(row_stats.total_recent_expire_count);
recent_filter_count->set(row_stats.total_recent_filter_count);
recent_abnormal_count->set(row_stats.total_recent_abnormal_count);
recent_write_throttling_delay_count->set(
row_stats.total_recent_write_throttling_delay_count);
recent_write_throttling_reject_count->set(
row_stats.total_recent_write_throttling_reject_count);
storage_mb->set(row_stats.total_storage_mb);
storage_count->set(row_stats.total_storage_count);
rdb_block_cache_hit_rate->set(
std::abs(row_stats.total_rdb_block_cache_total_count) < 1e-6
? 0
: row_stats.total_rdb_block_cache_hit_count /
row_stats.total_rdb_block_cache_total_count * 1000000);
rdb_index_and_filter_blocks_mem_usage->set(
row_stats.total_rdb_index_and_filter_blocks_mem_usage);
rdb_memtable_mem_usage->set(row_stats.total_rdb_memtable_mem_usage);
rdb_estimate_num_keys->set(row_stats.total_rdb_estimate_num_keys);
read_qps->set(row_stats.get_total_read_qps());
write_qps->set(row_stats.get_total_write_qps());
}

::dsn::perf_counter_wrapper get_qps;
::dsn::perf_counter_wrapper multi_get_qps;
::dsn::perf_counter_wrapper put_qps;
Expand Down Expand Up @@ -65,7 +102,7 @@ class info_collector
void stop();

void on_app_stat();
AppStatCounters *get_app_counters(const std::string &app_name);
app_stat_counters *get_app_counters(const std::string &app_name);

void on_capacity_unit_stat(int remaining_retry_count);
bool has_capacity_unit_updated(const std::string &node_address, const std::string &timestamp);
Expand All @@ -80,7 +117,7 @@ class info_collector
uint32_t _app_stat_interval_seconds;
::dsn::task_ptr _app_stat_timer_task;
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
std::map<std::string, AppStatCounters *> _app_stat_counters;
std::map<std::string, app_stat_counters *> _app_stat_counters;

// app for recording usage statistics, including read/write capacity unit and storage size.
std::string _usage_stat_app;
Expand All @@ -99,6 +136,11 @@ class info_collector
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
std::map<std::string, hotspot_calculator *> _hotspot_calculator_store;

hotspot_calculator *get_hotspot_calculator(const std::string &app_name,
const int partition_num);
};

} // namespace server
} // namespace pegasus
40 changes: 40 additions & 0 deletions src/server/table_hotspot_policy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "table_hotspot_policy.h"

#include <dsn/dist/fmt_logging.h>

namespace pegasus {
namespace server {

void hotspot_calculator::aggregate(const std::vector<row_data> &partitions)
{
while (_app_data.size() > kMaxQueueSize - 1) {
_app_data.pop();
}
std::vector<hotspot_partition_data> temp(partitions.size());
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
}
_app_data.emplace(temp);
}

void hotspot_calculator::init_perf_counter(const int perf_counter_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < perf_counter_count; i++) {
string paritition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", paritition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", paritition_desc);
_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}

void hotspot_calculator::start_alg() { _policy->analysis(_app_data, _points); }

} // namespace server
} // namespace pegasus
68 changes: 68 additions & 0 deletions src/server/table_hotspot_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once

#include "hotspot_partition_data.h"

#include <algorithm>
#include <gtest/gtest_prod.h>
#include <dsn/perf_counter/perf_counter.h>

namespace pegasus {
namespace server {
class hotspot_policy
{
public:
// hotspot_app_data store the historical data which related to hotspot
// it uses rolling queue to save one app's data
// vector is used to save the partitions' data of this app
// hotspot_partition_data is used to save data of one partition
virtual void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &hot_points) = 0;
};

class hotspot_algo_qps_skew : public hotspot_policy
{
public:
void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &hot_points)
{
const auto &anly_data = hotspot_app_data.back();
double min_total_qps = INT_MAX;
for (auto partition_anly_data : anly_data) {
min_total_qps = std::min(min_total_qps, partition_anly_data.total_qps);
}
min_total_qps = std::max(1.0, min_total_qps);
dassert(anly_data.size() == hot_points.size(), "partition counts error, please check");
for (int i = 0; i < hot_points.size(); i++) {
hot_points[i]->set(anly_data[i].total_qps / min_total_qps);
}
}
};

// hotspot_calculator is used to find the hotspot in Pegasus
class hotspot_calculator
{
public:
hotspot_calculator(const std::string &app_name, const int partition_num)
: _app_name(app_name), _points(partition_num), _policy(new hotspot_algo_qps_skew())
{
init_perf_counter(partition_num);
}
void aggregate(const std::vector<row_data> &partitions);
void start_alg();
void init_perf_counter(const int perf_counter_count);

private:
const std::string _app_name;
std::vector<::dsn::perf_counter_wrapper> _points;
std::queue<std::vector<hotspot_partition_data>> _app_data;
std::unique_ptr<hotspot_policy> _policy;
static const int kMaxQueueSize = 100;

FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_skew);
};
} // namespace server
} // namespace pegasus
Loading

0 comments on commit 695b366

Please sign in to comment.