Skip to content

Commit

Permalink
feat(hotkey): add unit test of hotkey collector (#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored Dec 9, 2020
1 parent 6a47a3b commit cea104d
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_ty
_internal_fine_collector =
std::make_shared<hotkey_fine_data_collector>(this, now_hash_bucket_num);
_internal_empty_collector = std::make_shared<hotkey_empty_data_collector>(this);
_state.store(hotkey_collector_state::STOPPED);
}

inline void hotkey_collector::change_state_to_stopped()
Expand Down
2 changes: 2 additions & 0 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class hotkey_collector : public dsn::replication::replica_base
std::shared_ptr<hotkey_empty_data_collector> _internal_empty_collector;
std::shared_ptr<hotkey_coarse_data_collector> _internal_coarse_collector;
std::shared_ptr<hotkey_fine_data_collector> _internal_fine_collector;

friend class hotkey_collector_test;
};

// Be sure every function in internal_collector_base should be thread safe
Expand Down
1 change: 1 addition & 0 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
dsn::replication::detect_hotkey_request req;
req.type = hotkey_type;
req.action = action;
req.pid = dsn::gpid(app_id, partition_index);
auto error = _shell_context->ddl_client->detect_hotkey(target_address, req, resp);

ddebug_f("{} {} hotkey detection in {}.{}, server address: {}",
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class pegasus_server_impl : public pegasus_read_service
friend class manual_compact_service_test;
friend class pegasus_compression_options_test;
friend class pegasus_server_impl_test;
friend class hotkey_collector_test;
FRIEND_TEST(pegasus_server_impl_test, default_data_version);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
Expand Down
181 changes: 179 additions & 2 deletions src/server/test/hotkey_collector_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

#include <dsn/utility/rand.h>
#include <dsn/utility/flags.h>
#include <dsn/utility/defer.h>
#include <dsn/tool-api/task_tracker.h>
#include "server/test/message_utils.h"
#include "base/pegasus_key_schema.h"
#include "pegasus_server_test_base.h"

namespace pegasus {
Expand All @@ -44,7 +47,7 @@ static std::string generate_hash_key_by_random(bool is_hotkey, int probability =
return result;
}

TEST(hotkey_collector_test, get_bucket_id_test)
TEST(hotkey_collector_public_func_test, get_bucket_id_test)
{
int bucket_id = -1;
for (int i = 0; i < 1000000; i++) {
Expand All @@ -55,7 +58,7 @@ TEST(hotkey_collector_test, get_bucket_id_test)
}
}

TEST(hotkey_collector_test, find_outlier_index_test)
TEST(hotkey_collector_public_func_test, find_outlier_index_test)
{
int threshold = 3;
int hot_index;
Expand Down Expand Up @@ -195,5 +198,179 @@ TEST_F(fine_collector_test, fine_collector)
ASSERT_LT(now_queue_size(), max_queue_size * 2);
}

class hotkey_collector_test : public pegasus_server_test_base
{
public:
hotkey_collector_test() { start(); }

std::shared_ptr<pegasus::server::hotkey_collector> get_read_collector()
{
return _server->_read_hotkey_collector;
}
std::shared_ptr<pegasus::server::hotkey_collector> get_write_collector()
{
return _server->_write_hotkey_collector;
}
dsn::replication::hotkey_type::type
get_collector_type(std::shared_ptr<pegasus::server::hotkey_collector> c)
{
return c->_hotkey_type;
}
hotkey_collector_state get_collector_stat(std::shared_ptr<pegasus::server::hotkey_collector> c)
{
return c->_state;
}

detect_hotkey_result *get_result(std::shared_ptr<pegasus::server::hotkey_collector> c)
{
return &c->_result;
}

void on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
_server->on_detect_hotkey(req, resp);
}

get_rpc generate_get_rpc(std::string hash_key)
{
dsn::blob raw_key;
pegasus_generate_key(raw_key, hash_key, std::string("sortkey"));
get_rpc rpc(dsn::make_unique<dsn::blob>(raw_key), dsn::apps::RPC_RRDB_RRDB_GET);
return rpc;
}

dsn::apps::update_request generate_set_req(std::string hash_key)
{
dsn::apps::update_request req;
dsn::blob raw_key;
pegasus_generate_key(raw_key, hash_key, std::string("sortkey"));
req.key = raw_key;
req.value.assign(hash_key.c_str(), 0, hash_key.length());
return req;
}

dsn::replication::detect_hotkey_request
generate_control_rpc(dsn::replication::hotkey_type::type type,
dsn::replication::detect_action::type action)
{
dsn::replication::detect_hotkey_request req;
req.type = type;
req.action = action;
req.pid = dsn::gpid(0, 2);
return req;
}

dsn::task_tracker _tracker;
};

TEST_F(hotkey_collector_test, hotkey_type)
{
ASSERT_EQ(get_collector_type(get_read_collector()), dsn::replication::hotkey_type::READ);
ASSERT_EQ(get_collector_type(get_write_collector()), dsn::replication::hotkey_type::WRITE);
}

TEST_F(hotkey_collector_test, state_transform)
{
auto collector = get_read_collector();
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::STOPPED);

dsn::replication::detect_hotkey_response resp;
on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::START),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::COARSE_DETECTING);

for (int i = 0; i < 100; i++) {
dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
_server->on_get(generate_get_rpc(generate_hash_key_by_random(true, 80)));
});
}
_tracker.wait_outstanding_tasks();
collector->analyse_data();
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::FINE_DETECTING);

for (int i = 0; i < 100; i++) {
dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
_server->on_get(generate_get_rpc(generate_hash_key_by_random(true, 80)));
});
}
_tracker.wait_outstanding_tasks();
collector->analyse_data();
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::FINISHED);

auto result = get_result(collector);
ASSERT_TRUE(result->if_find_result);
ASSERT_EQ(result->hot_hash_key, "ThisisahotkeyThisisahotkey");

on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::STOP),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::STOPPED);

on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::START),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::COARSE_DETECTING);

for (int i = 0; i < 1000; i++) {
dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
_server->on_get(generate_get_rpc(generate_hash_key_by_random(false)));
});
}
collector->analyse_data();
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::COARSE_DETECTING);

on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::STOP),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::STOPPED);
_tracker.wait_outstanding_tasks();
}

TEST_F(hotkey_collector_test, data_completeness)
{
dsn::replication::detect_hotkey_response resp;
on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::START),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::WRITE,
dsn::replication::detect_action::START),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);

auto writes = new dsn::message_ex *[1000];
auto cleanup = dsn::defer([=]() {
for (int i = 0; i < 1000; i++) {
writes[i]->release_ref();
}
delete[] writes;
});
for (int i = 0; i < 1000; i++) {
writes[i] = create_put_request(generate_set_req(std::to_string(i)));
}
_server->on_batched_write_requests(int64_t(0), uint64_t(0), writes, 1000);

for (int i = 0; i < 1000; i++) {
auto rpc = generate_get_rpc(std::to_string(i));
_server->on_get(rpc);
auto value = rpc.response().value.to_string();
ASSERT_EQ(value, std::to_string(i));
}

on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::STOP),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::WRITE,
dsn::replication::detect_action::STOP),
resp);
}

} // namespace server
} // namespace pegasus
7 changes: 6 additions & 1 deletion src/server/test/pegasus_server_write_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ class pegasus_server_write_test : public pegasus_server_test_base
for (int i = put_rpc_cnt; i < total_rpc_cnt; i++) {
writes[i] = pegasus::create_remove_request(key);
}
auto cleanup = dsn::defer([=]() { delete[] writes; });
auto cleanup = dsn::defer([=]() {
for (int i = 0; i < total_rpc_cnt; i++) {
writes[i]->release_ref();
}
delete[] writes;
});

int err =
_server_write->on_batched_write_requests(writes, total_rpc_cnt, decree, 0);
Expand Down

0 comments on commit cea104d

Please sign in to comment.