Skip to content

Commit

Permalink
feat(hotkey): build a fundamental framework of hotkey detection (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored Oct 12, 2020
1 parent 5a35429 commit 3b96b26
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 1 deletion.
2 changes: 1 addition & 1 deletion rdsn
37 changes: 37 additions & 0 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "hotkey_collector.h"

namespace pegasus {
namespace server {

// TODO: (Tangyanzhao) implement these functions
void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
}

void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight)
{
// TODO: (Tangyanzhao) Add a judgment sentence to check if it is a raw key
}

void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weight) {}

} // namespace server
} // namespace pegasus
75 changes: 75 additions & 0 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <dsn/utility/string_view.h>
#include <dsn/dist/replication/replication_types.h>

namespace pegasus {
namespace server {

// hotkey_collector is responsible to find the hot keys after the partition
// was detected to be hot. The two types of hotkey, READ & WRITE, are detected
// separately.
//
// +--------------------+ +----------------------------------------------------+
// | Replcia server | | Hotkey collector |
// | | | +-----------------------------------------------+ |
// | +----------------+ | | | Corase capture | |
// | | | |--> | +----------+ | |
// | | RPC received | || | | | Data | | |
// | | | || | | +-----+----+ | |
// | +-------+--------+ || | | | | |
// | | || | | +---------------+----v--+-------+---------+ | |
// | v || | | | |Hot | | | | | |
// | +-------+--------+ || | | |Bucket |Bucket |Bucket |Bucket |Bucket | | |
// | | Replication | || | | +-----------+-----------------------------+ | |
// | | (only on the | || | | | | |
// | | write path)) | || | +--------------|--------------------------------+ |
// | +-------+--------+ || | +--v---+ |
// | | || | | Data | |
// | v || | +------+ |
// | +-------+--------+ || | +-----|-------+-------------+ |
// | | | || | +------|-------------|-------------|---------+ |
// | | Capture data ---| | | Fine |capture | | | |
// | | | | | | | | | | |
// | +-------+--------+ | | | +----v----+ +----v----+ +----v----+ | |
// | | | | | | queue | | queue | | queue | | |
// | v | | | +----+----+ +----+----+ +----+----+ | |
// | +-------+--------+ | | | | | | | |
// | | | | | | +----v-------------v-------------v------+ | |
// | | Place data | | | | | Analsis pool | | |
// | | to the disk | | | | +-----------------|---------------------+ | |
// | | | | | +-------------------|------------------------+ |
// | +----------------+ | | v |
// | | | Hotkey |
// +--------------------+ +----------------------------------------------------+

class hotkey_collector
{
public:
// TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection
// weight: calculate the weight according to the specific situation
void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
void capture_hash_key(const dsn::blob &hash_key, int64_t weight);
void handle_rpc(const dsn::replication::detect_hotkey_request &req,
/*out*/ dsn::replication::detect_hotkey_response &resp);
};

} // namespace server
} // namespace pegasus
24 changes: 24 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "capacity_unit_calculator.h"
#include "pegasus_server_write.h"
#include "meta_store.h"
#include "hotkey_collector.h"

using namespace dsn::literals::chrono_literals;

Expand Down Expand Up @@ -2798,5 +2799,28 @@ void pegasus_server_impl::set_ingestion_status(dsn::replication::ingestion_statu
_ingestion_status = status;
}

void pegasus_server_impl::on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{

if (dsn_unlikely(req.action != dsn::replication::detect_action::START &&
req.action != dsn::replication::detect_action::STOP)) {
resp.err = dsn::ERR_INVALID_PARAMETERS;
resp.__set_err_hint("invalid detect_action");
return;
}

if (dsn_unlikely(req.type != dsn::replication::hotkey_type::READ &&
req.type != dsn::replication::hotkey_type::WRITE)) {
resp.err = dsn::ERR_INVALID_PARAMETERS;
resp.__set_err_hint("invalid hotkey_type");
return;
}

auto collector = req.type == dsn::replication::hotkey_type::READ ? _read_hotkey_collector
: _write_hotkey_collector;
collector->handle_rpc(req, resp);
}

} // namespace server
} // namespace pegasus
7 changes: 7 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace server {
class meta_store;
class capacity_unit_calculator;
class pegasus_server_write;
class hotkey_collector;

class pegasus_server_impl : public pegasus_read_service
{
Expand Down Expand Up @@ -318,6 +319,9 @@ class pegasus_server_impl : public pegasus_read_service

::dsn::error_code flush_all_family_columns(bool wait);

void on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp) override;

private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
Expand Down Expand Up @@ -382,6 +386,9 @@ class pegasus_server_impl : public pegasus_read_service

dsn::task_tracker _tracker;

std::shared_ptr<hotkey_collector> _read_hotkey_collector;
std::shared_ptr<hotkey_collector> _write_hotkey_collector;

// perf counters
::dsn::perf_counter_wrapper _pfc_get_qps;
::dsn::perf_counter_wrapper _pfc_multi_get_qps;
Expand Down
4 changes: 4 additions & 0 deletions src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "meta_store.h"
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"
#include "hotkey_collector.h"

namespace pegasus {
namespace server {
Expand Down Expand Up @@ -42,6 +43,9 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
_gpid = get_gpid();

_read_hotkey_collector = std::make_shared<hotkey_collector>();
_write_hotkey_collector = std::make_shared<hotkey_collector>();

_verbose_log = dsn_config_get_value_bool("pegasus.server",
"rocksdb_verbose_log",
false,
Expand Down
1 change: 1 addition & 0 deletions src/server/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_mutation_duplicator.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
"../hotkey_collector.cpp"
)

set(MY_SRC_SEARCH_MODE "GLOB")
Expand Down

0 comments on commit 3b96b26

Please sign in to comment.