Skip to content

Commit

Permalink
12.16
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz committed Dec 16, 2019
1 parent 0ae00a4 commit 41fccbb
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/client_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ file("GLOB"

add_definitions(-fPIC)

add_library(pegasus_client_impl_objects OBJECT ${source_files})
add_library(pegasus_client_impl_objects OBJECT ${source_files} ../server/TableHotspotPolicy.cpp)
target_include_directories(pegasus_client_impl_objects PUBLIC $<TARGET_PROPERTY:RocksDB::rocksdb,INTERFACE_INCLUDE_DIRECTORIES>)

# both shared & static version of pegasus client depends on boost_system,boost_filesystem,aio,dl
Expand Down
35 changes: 35 additions & 0 deletions src/server/TableHotspotPolicy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include "TableHotspotPolicy.h"

namespace pegasus {
namespace server {

Hotpot_calculator::Hotpot_calculator(const std::string &name, const int &app_size)
: data_stores(app_size), _name(name)
{
}

void Hotpot_calculator::aggregate(std::vector<row_data> partitions)
{
for (int i = 0; i < partitions.size(); i++) {
while (data_stores[i].size() > MAX_STORE_SIZE)
data_stores[i].pop();
Data_store temp;
temp.aggregate(partitions[i]);
data_stores[i].emplace(temp);
}
while (data_stores[partitions.size()].size() > MAX_STORE_SIZE)
data_stores[partitions.size()].pop();
Data_store temp_all;
for (int i = 0; i < partitions.size(); i++) {
temp_all.merge(data_stores[i].back());
}
data_stores[partitions.size()].emplace(temp_all);
}

void Hotpot_calculator::start_alg()
{
_policy = new Algo1(&data_stores, &_hotpot_points);
_policy->detect_hotspot_policy();
}
}
}
106 changes: 55 additions & 51 deletions src/server/TableHotspotPolicy.h
Original file line number Diff line number Diff line change
@@ -1,56 +1,60 @@
#include "data_store.h"
#include <dsn/perf_counter/perf_counter.h>

namespace pegasus {
namespace server {
class Hotspot_policy
{
public:
Hotspot_policy(Data_store *data_store){
_data_store = data_store;
}
virtual double detect_hotspot_policy() = 0;
private:
Data_store *_data_store;
};

class Algo1 : public Hotspot_policy
{
public:
explicit Algo1(Data_store *data_store):Hotspot_policy(data_store){};
double detect_hotspot_policy()
{
return 0;
}
};

class Algo2 : public Hotspot_policy
{
public:
explicit Algo2(Data_store *data_store):Hotspot_policy(data_store){};
double detect_hotspot_policy()
{
return 0;
}
};


class Hotpot_calculator
{
private:
std::string _name;

#define MAX_STORE_SIZE 100

public:
std::vector< std::vector<Data_store> > data_stores;

Hotpot_calculator(const std::string &name,int app_size) : _name(name), data_stores(app_size){}

void aggregate(const std::vector<row_data> *partitions) {
for (int i=0;i<partitions->size();i++){
data_stores[i].push_back(partitions[i]);
}
}

};
namespace pegasus {
namespace server {
class Hotspot_policy
{
public:
Hotspot_policy(std::vector<std::queue<Data_store>> *data_stores,
std::vector<dsn::perf_counter> *hot_points)
: _data_stores(data_stores), _hot_points(hot_points)
{
}
virtual void detect_hotspot_policy() = 0;

protected:
std::vector<std::queue<Data_store>> *_data_stores;
std::vector<dsn::perf_counter> *_hot_points;
};

class Algo1 : public Hotspot_policy
{
public:
explicit Algo1(std::vector<std::queue<Data_store>> *data_stores,
std::vector<dsn::perf_counter> *hot_points)
: Hotspot_policy(data_stores, hot_points){};
void detect_hotspot_policy()
{
for (int i = 0; i < (_data_stores->size()); i++)
(*_hot_points)[i].set((*_data_stores)[i].front().total_multi_get_qps);
return;
}
};

class Algo2 : public Hotspot_policy
{
public:
explicit Algo2(std::vector<std::queue<Data_store>> *data_stores,
std::vector<dsn::perf_counter> *hot_points)
: Hotspot_policy(data_stores, hot_points){};
void detect_hotspot_policy() { return; }
};

class Hotpot_calculator
{
public:
std::vector<std::queue<Data_store>> data_stores;
Hotpot_calculator(const std::string &name, const int &app_size);
void aggregate(std::vector<row_data> partitions);
void start_alg();

private:
std::string _name;
Hotspot_policy *_policy;
std::vector<dsn::perf_counter> _hotpot_points;
};
}
}
27 changes: 21 additions & 6 deletions src/server/data_store.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#include "shell/commands.h"

#define MAX_STORE_SIZE 100

namespace pegasus {
namespace server {

class Data_store {
class Data_store
{
public:
Data_store() {};
Data_store(){};
std::string store_name;
double total_get_qps = 0;
double total_multi_get_qps = 0;
Expand All @@ -23,7 +22,8 @@ class Data_store {
double total_recent_write_cu = 0;
std::string name;

void aggregate(const row_data &row) {
void aggregate(const row_data &row)
{
total_get_qps = row.get_qps;
total_multi_get_qps = row.multi_get_qps;
total_put_qps = row.put_qps;
Expand All @@ -38,7 +38,22 @@ class Data_store {
total_recent_write_cu = row.recent_write_cu;
name = row.row_name;
}
};

void merge(const Data_store &sub_data_store)
{
total_get_qps += sub_data_store.total_get_qps;
total_multi_get_qps += sub_data_store.total_multi_get_qps;
total_put_qps += sub_data_store.total_put_qps;
total_multi_put_qps += sub_data_store.total_multi_put_qps;
total_remove_qps += sub_data_store.total_remove_qps;
total_multi_remove_qps += sub_data_store.total_multi_remove_qps;
total_incr_qps += sub_data_store.total_incr_qps;
total_check_and_set_qps += sub_data_store.total_check_and_set_qps;
total_check_and_mutate_qps += sub_data_store.total_check_and_mutate_qps;
total_scan_qps += sub_data_store.total_scan_qps;
total_recent_read_cu += sub_data_store.total_recent_read_cu;
total_recent_write_cu += sub_data_store.total_recent_write_cu;
}
};
}
}
7 changes: 4 additions & 3 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ info_collector::~info_collector()
for (auto kv : _app_stat_counters) {
delete kv.second;
}
for (auto store: _calculator_store) {
for (auto store : _calculator_store) {
delete store.second;
}
}
Expand Down Expand Up @@ -140,8 +140,9 @@ void info_collector::on_app_stat()
}
for (auto app_rows : all_rows) {
Hotpot_calculator *app_store = nullptr;
get_store_handler(app_rows.first,app_rows.second.size(),app_store);
app_store->aggregate(&app_rows.second);
get_store_handler(app_rows.first, app_rows.second.size(), app_store);
app_store->aggregate(app_rows.second);
app_store->start_alg();
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class info_collector
::dsn::perf_counter_wrapper cu_max_min_scale;
};


info_collector();
~info_collector();

Expand Down Expand Up @@ -157,16 +156,17 @@ class info_collector
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
std::map<std::string, Hotpot_calculator *> _calculator_store;
void get_store_handler(const std::string app_name,const int app_size,Hotpot_calculator * store){
auto iter=_calculator_store.find(app_name);
if (iter!=_calculator_store.end()){
void get_store_handler(const std::string app_name, const int app_size, Hotpot_calculator *store)
{
auto iter = _calculator_store.find(app_name);
if (iter != _calculator_store.end()) {
store = iter->second;
return ;
return;
}
Hotpot_calculator *handler = new Hotpot_calculator(app_name,app_size);
Hotpot_calculator *handler = new Hotpot_calculator(app_name, app_size);
_calculator_store[app_name] = handler;
store = handler;
return ;
return;
}
};
} // namespace server
Expand Down

0 comments on commit 41fccbb

Please sign in to comment.