Skip to content

Commit

Permalink
feat(hotkey): add a function test of hotkey detection (#665)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored Jan 9, 2021
1 parent 33ef638 commit 685bbd2
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 25 deletions.
1 change: 1 addition & 0 deletions src/server/config.min.ini
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
available_detect_app = @APP_NAME@
available_detect_alert_script_dir = ./package/bin
usage_stat_app = stat
enable_detect_hotkey = false

[pegasus.clusters]
onebox = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603
Expand Down
36 changes: 19 additions & 17 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
#include "base/pegasus_key_schema.h"
#include "base/pegasus_utils.h"

namespace pegasus {
namespace server {

DSN_DEFINE_uint32(
"pegasus.server",
hot_bucket_variance_threshold,
3,
7,
"the variance threshold to detect hot bucket during coarse analysis of hotkey detection");

DSN_DEFINE_uint32(
"pegasus.server",
hot_key_variance_threshold,
3,
5,
"the variance threshold to detect hot key during fine analysis of hotkey detection");

DSN_DEFINE_uint32("pegasus.server",
Expand Down Expand Up @@ -168,7 +169,8 @@ inline void hotkey_collector::change_state_by_result()
case hotkey_collector_state::FINE_DETECTING:
if (!_result.hot_hash_key.empty()) {
change_state_to_finished();
derror_replica("Find the hotkey: {}", _result.hot_hash_key);
derror_replica("Find the hotkey: {}",
pegasus::utils::c_escape_string(_result.hot_hash_key));
}
break;
default:
Expand Down Expand Up @@ -239,32 +241,32 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
switch (now_state) {
case hotkey_collector_state::COARSE_DETECTING:
case hotkey_collector_state::FINE_DETECTING:
resp.err = dsn::ERR_INVALID_STATE;
resp.err = dsn::ERR_BUSY;
hint = fmt::format("still detecting {} hotkey, state is {}",
dsn::enum_to_string(_hotkey_type),
enum_to_string(now_state));
dwarn_replica(hint);
return;
break;
case hotkey_collector_state::FINISHED:
resp.err = dsn::ERR_INVALID_STATE;
hint = fmt::format(
"{} hotkey result has been found, you can send a stop rpc to restart hotkey detection",
dsn::enum_to_string(_hotkey_type));
dwarn_replica(hint);
return;
resp.err = dsn::ERR_BUSY;
hint = fmt::format("{} hotkey result has been found: {}, you can send a stop rpc to "
"restart hotkey detection",
dsn::enum_to_string(_hotkey_type),
pegasus::utils::c_escape_string(_result.hot_hash_key));
break;
case hotkey_collector_state::STOPPED:
change_state_to_coarse_detecting();
resp.err = dsn::ERR_OK;
hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
return;
break;
default:
hint = "invalid collector state";
resp.err = dsn::ERR_INVALID_STATE;
resp.__set_err_hint(hint);
derror_replica(hint);
dassert(false, "invalid collector state");
}
resp.__set_err_hint(hint);
dwarn_replica(hint);
}

void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
Expand All @@ -280,13 +282,13 @@ void hotkey_collector::query_result(dsn::replication::detect_hotkey_response &re
{
if (_state != hotkey_collector_state::FINISHED) {
resp.err = dsn::ERR_BUSY;
std::string hint = fmt::format("hotkey is detecting now, now state: {}",
dsn::enum_to_string(_hotkey_type));
std::string hint =
fmt::format("Can't get hotkey now, now state: {}", enum_to_string(_state.load()));
resp.__set_err_hint(hint);
ddebug_replica(hint);
} else {
resp.err = dsn::ERR_OK;
resp.__set_hotkey_result(_result.hot_hash_key);
resp.__set_hotkey_result(pegasus::utils::c_escape_string(_result.hot_hash_key));
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ DSN_DEFINE_int32("pegasus.collector",

DSN_DEFINE_int32("pegasus.collector",
occurrence_threshold,
100,
3,
"hot paritiotion occurrence times' threshold to send rpc to detect hotkey");

void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
Expand Down Expand Up @@ -170,16 +170,16 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
}
}

/*static*/ void hotspot_partition_calculator::send_detect_hotkey_request(
void hotspot_partition_calculator::send_detect_hotkey_request(
const std::string &app_name,
const uint64_t partition_index,
const dsn::replication::hotkey_type::type hotkey_type,
const dsn::replication::detect_action::type action)
{
FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {});

int app_id;
int partition_count;
int app_id = -1;
int partition_count = -1;
std::vector<dsn::partition_configuration> partitions;
_shell_context->ddl_client->list_app(app_name, app_id, partition_count, partitions);

Expand Down
5 changes: 4 additions & 1 deletion src/server/hotspot_partition_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ class hotspot_partition_calculator
hotspot_partition_calculator(const std::string &app_name,
int partition_count,
std::shared_ptr<shell_context> context)
: _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
: _app_name(app_name),
_hot_points(partition_count),
_shell_context(context),
_hotpartition_counter(partition_count)
{
init_perf_counter(partition_count);
}
Expand Down
12 changes: 9 additions & 3 deletions src/server/test/hotspot_partition_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace pegasus {
namespace server {

DSN_DECLARE_int32(occurrence_threshold);
DSN_DECLARE_bool(enable_detect_hotkey);

class hotspot_partition_test : public pegasus_server_test_base
{
Expand All @@ -33,8 +34,13 @@ class hotspot_partition_test : public pegasus_server_test_base
{
dsn::fail::setup();
dsn::fail::cfg("send_detect_hotkey_request", "return()");
FLAGS_enable_detect_hotkey = true;
};
~hotspot_partition_test() { dsn::fail::teardown(); }
~hotspot_partition_test()
{
FLAGS_enable_detect_hotkey = false;
dsn::fail::teardown();
}

hotspot_partition_calculator calculator;

Expand Down Expand Up @@ -136,8 +142,8 @@ TEST_F(hotspot_partition_test, send_detect_hotkey_request)
expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold;
aggregate_analyse_data(test_rows, expect_result, FLAGS_occurrence_threshold);
const int back_to_normal = 30;
expect_result[READ_HOT_PARTITION][0] = FLAGS_occurrence_threshold - back_to_normal;
expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold - back_to_normal;
expect_result[READ_HOT_PARTITION][0] = 0;
expect_result[WRITE_HOT_PARTITION][1] = 0;
aggregate_analyse_data(generate_row_data(), expect_result, back_to_normal);
}

Expand Down
2 changes: 2 additions & 0 deletions src/shell/commands/detect_hotkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ bool generate_hotkey_request(dsn::replication::detect_hotkey_request &req,
req.action = dsn::replication::detect_action::START;
} else if (!strcasecmp(hotkey_action.c_str(), "stop")) {
req.action = dsn::replication::detect_action::STOP;
} else if (!strcasecmp(hotkey_type.c_str(), "query")) {
req.action = dsn::replication::detect_action::QUERY;
} else {
err_info =
fmt::format("\"{}\" is an invalid hotkey detect action (should be 'start' or 'stop')\n",
Expand Down
2 changes: 2 additions & 0 deletions src/test/function_test/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ if [ $on_travis == "NO" ]; then
exit_if_fail $? "run test recovery failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/bulk_load.xml" GTEST_FILTER="bulk_load_test.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test bulk load failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/test_detect_hotspot.xml" GTEST_FILTER="test_detect_hotspot.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test test_detect_hotspot load failed: $test_case $config_file $table_name"
fi
Loading

0 comments on commit 685bbd2

Please sign in to comment.