Skip to content

Commit

Permalink
server: support table level write throttling (apache#230)
Browse files Browse the repository at this point in the history
Former-commit-id: a550784710524779af004ab3147c9919dae9ee93 [formerly 568e0b8]
Former-commit-id: bf1d458b8815a0ac263b4fc223e201019b762275
  • Loading branch information
qinzuoyan authored Dec 10, 2018
1 parent ee571da commit d1fdf42
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 10 deletions.
20 changes: 20 additions & 0 deletions scripts/falcon_screen.json
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,26 @@
"method": "",
"timespan": 86400
},
{
"title": "Delay数据条数(统计各表最近10秒write throttling delay的数据条数)",
"endpoints": ["cluster=${cluster.name} job=collector service=pegasus"],
"counters": [
"collector*app.pegasus*app.stat.recent_throttling_delay_count#${for.each.table}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{
"title": "Reject数据条数(统计各表最近10秒write throttling reject的数据条数)",
"endpoints": ["cluster=${cluster.name} job=collector service=pegasus"],
"counters": [
"collector*app.pegasus*app.stat.recent_throttling_reject_count#${for.each.table}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{
"title": "【${for.each.table}】单表QPS",
"endpoints": ["cluster=${cluster.name} job=collector service=pegasus"],
Expand Down
2 changes: 2 additions & 0 deletions src/client_lib/pegasus_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1297,9 +1297,11 @@ const char *pegasus_client_impl::get_error_string(int error_code) const
_server_error_to_client[::dsn::ERR_NETWORK_FAILURE] = PERR_NETWORK_FAILURE;
_server_error_to_client[::dsn::ERR_HANDLER_NOT_FOUND] = PERR_HANDLER_NOT_FOUND;
_server_error_to_client[::dsn::ERR_OPERATION_DISABLED] = PERR_OPERATION_DISABLED;
_server_error_to_client[::dsn::ERR_NOT_ENOUGH_MEMBER] = PERR_NOT_ENOUGH_MEMBER;

_server_error_to_client[::dsn::ERR_APP_NOT_EXIST] = PERR_APP_NOT_EXIST;
_server_error_to_client[::dsn::ERR_APP_EXIST] = PERR_APP_EXIST;
_server_error_to_client[::dsn::ERR_BUSY] = PERR_APP_BUSY;

// rocksdb error;
for (int i = 1001; i < 1013; i++) {
Expand Down
2 changes: 2 additions & 0 deletions src/include/pegasus/error_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ PEGASUS_ERR_CODE(PERR_OBJECT_NOT_FOUND, -3, "object not found");
PEGASUS_ERR_CODE(PERR_NETWORK_FAILURE, -4, "network failure");
PEGASUS_ERR_CODE(PERR_HANDLER_NOT_FOUND, -5, "handler not found");
PEGASUS_ERR_CODE(PERR_OPERATION_DISABLED, -6, "operation disabled");
PEGASUS_ERR_CODE(PERR_NOT_ENOUGH_MEMBER, -7, "no enough member");
PEGASUS_ERR_CODE(PERR_SCAN_COMPLETE, 1, "scan complete");

// SERVER ERROR
PEGASUS_ERR_CODE(PERR_APP_NOT_EXIST, -101, "app not exist");
PEGASUS_ERR_CODE(PERR_APP_EXIST, -102, "app already exist");
PEGASUS_ERR_CODE(PERR_SERVER_INTERNAL_ERROR, -103, "server internal error");
PEGASUS_ERR_CODE(PERR_SERVER_CHANGED, -104, "server changed");
PEGASUS_ERR_CODE(PERR_APP_BUSY, -105, "app busy");

// CLIENT ERROR
PEGASUS_ERR_CODE(PERR_INVALID_APP_NAME,
Expand Down
5 changes: 1 addition & 4 deletions src/reporter/pegasus_counter_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,12 @@ void pegasus_counter_reporter::http_request_done(struct evhttp_request *req, voi
} break;

default:
derror("http post request receive ERROR: %u", req->response_code);

struct evbuffer *buf = evhttp_request_get_input_buffer(req);
size_t len = evbuffer_get_length(buf);
char *tmp = (char *)malloc(len + 1);
char *tmp = (char *)alloca(len + 1);
memcpy(tmp, evbuffer_pullup(buf, -1), len);
tmp[len] = '\0';
derror("http post request receive ERROR: %u, %s", req->response_code, tmp);
free(tmp);
event_base_loopexit(event, 0);
return;
}
Expand Down
3 changes: 0 additions & 3 deletions src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,6 @@
[task.LPC_PER_REPLICA_COLLECT_INFO_TIMER]
;is_profile = true

[task.LPC_MUTATION_PENDING_TIMER]
;is_profile = true

[task.LPC_GROUP_CHECK]
;is_profile = true

Expand Down
8 changes: 8 additions & 0 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ void info_collector::on_app_stat()
all.recent_expire_count += row.recent_expire_count;
all.recent_filter_count += row.recent_filter_count;
all.recent_abnormal_count += row.recent_abnormal_count;
all.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
all.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
all.storage_mb += row.storage_mb;
all.storage_count += row.storage_count;
all.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
Expand Down Expand Up @@ -126,6 +128,10 @@ void info_collector::on_app_stat()
counters->recent_expire_count->set(row.recent_expire_count);
counters->recent_filter_count->set(row.recent_filter_count);
counters->recent_abnormal_count->set(row.recent_abnormal_count);
counters->recent_write_throttling_delay_count->set(
row.recent_write_throttling_delay_count);
counters->recent_write_throttling_reject_count->set(
row.recent_write_throttling_reject_count);
counters->storage_mb->set(row.storage_mb);
counters->storage_count->set(row.storage_count);
counters->rdb_block_cache_hit_rate->set(row.rdb_block_cache_hit_count /
Expand Down Expand Up @@ -175,6 +181,8 @@ info_collector::AppStatCounters *info_collector::get_app_counters(const std::str
INIT_COUNER(recent_expire_count);
INIT_COUNER(recent_filter_count);
INIT_COUNER(recent_abnormal_count);
INIT_COUNER(recent_write_throttling_delay_count);
INIT_COUNER(recent_write_throttling_reject_count);
INIT_COUNER(storage_mb);
INIT_COUNER(storage_count);
INIT_COUNER(rdb_block_cache_hit_rate);
Expand Down
2 changes: 2 additions & 0 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class info_collector
::dsn::perf_counter_wrapper recent_expire_count;
::dsn::perf_counter_wrapper recent_filter_count;
::dsn::perf_counter_wrapper recent_abnormal_count;
::dsn::perf_counter_wrapper recent_write_throttling_delay_count;
::dsn::perf_counter_wrapper recent_write_throttling_reject_count;
::dsn::perf_counter_wrapper storage_mb;
::dsn::perf_counter_wrapper storage_count;
::dsn::perf_counter_wrapper rdb_block_cache_hit_rate;
Expand Down
10 changes: 8 additions & 2 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ struct row_data
double recent_expire_count = 0;
double recent_filter_count = 0;
double recent_abnormal_count = 0;
double recent_write_throttling_delay_count = 0;
double recent_write_throttling_reject_count = 0;
double storage_mb = 0;
double storage_count = 0;
double rdb_block_cache_hit_count = 0;
Expand Down Expand Up @@ -432,6 +434,10 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
row.recent_filter_count += value;
else if (counter_name == "recent.abnormal.count")
row.recent_abnormal_count += value;
else if (counter_name == "recent.write.throttling.delay.count")
row.recent_write_throttling_delay_count += value;
else if (counter_name == "recent.write.throttling.reject.count")
row.recent_write_throttling_reject_count += value;
else if (counter_name == "disk.storage.sst(MB)")
row.storage_mb += value;
else if (counter_name == "disk.storage.sst.count")
Expand Down Expand Up @@ -484,9 +490,9 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_dat
command.cmd = "perf-counters";
char tmp[256];
if (app_name.empty()) {
sprintf(tmp, ".*\\*app\\.pegasus\\*.*@.*");
sprintf(tmp, ".*@.*");
} else {
sprintf(tmp, ".*\\*app\\.pegasus\\*.*@%d\\..*", app_info->app_id);
sprintf(tmp, ".*@%d\\..*", app_info->app_id);
}
command.arguments.push_back(tmp);
std::vector<std::pair<bool, std::string>> results;
Expand Down
6 changes: 6 additions & 0 deletions src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -3654,6 +3654,8 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
sum.recent_expire_count += row.recent_expire_count;
sum.recent_filter_count += row.recent_filter_count;
sum.recent_abnormal_count += row.recent_abnormal_count;
sum.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
sum.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
sum.storage_mb += row.storage_mb;
sum.storage_count += row.storage_count;
sum.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
Expand Down Expand Up @@ -3690,6 +3692,8 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
tp.add_column("expired");
tp.add_column("filtered");
tp.add_column("abnormal");
tp.add_column("delayed");
tp.add_column("rejected");
tp.add_column("file_mb");
tp.add_column("file_num");
tp.add_column("hit_rate");
Expand All @@ -3712,6 +3716,8 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
tp.append_data(row.recent_expire_count);
tp.append_data(row.recent_filter_count);
tp.append_data(row.recent_abnormal_count);
tp.append_data(row.recent_write_throttling_delay_count);
tp.append_data(row.recent_write_throttling_reject_count);
tp.append_data(row.storage_mb);
tp.append_data((uint64_t)row.storage_count);
double block_cache_hit_rate =
Expand Down

0 comments on commit d1fdf42

Please sign in to comment.