diff --git a/rdsn b/rdsn index 69102a786f..ccecd5996a 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 69102a786f3b888155bc18b8b6c58031c7d2fd98 +Subproject commit ccecd5996a7b5c5b3281860186b30048a2ee4763 diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp new file mode 100644 index 0000000000..c05e33c1ac --- /dev/null +++ b/src/server/hotkey_collector.cpp @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hotkey_collector.h" + +namespace pegasus { +namespace server { + +// TODO: (Tangyanzhao) implement these functions +void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req, + dsn::replication::detect_hotkey_response &resp) +{ +} + +void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight) +{ + // TODO: (Tangyanzhao) Add a judgment sentence to check if it is a raw key +} + +void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weight) {} + +} // namespace server +} // namespace pegasus diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h new file mode 100644 index 0000000000..57c7482aa0 --- /dev/null +++ b/src/server/hotkey_collector.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace pegasus { +namespace server { + +// hotkey_collector is responsible to find the hot keys after the partition +// was detected to be hot. The two types of hotkey, READ & WRITE, are detected +// separately. +// +// +--------------------+ +----------------------------------------------------+ +// | Replcia server | | Hotkey collector | +// | | | +-----------------------------------------------+ | +// | +----------------+ | | | Corase capture | | +// | | | |--> | +----------+ | | +// | | RPC received | || | | | Data | | | +// | | | || | | +-----+----+ | | +// | +-------+--------+ || | | | | | +// | | || | | +---------------+----v--+-------+---------+ | | +// | v || | | | |Hot | | | | | | +// | +-------+--------+ || | | |Bucket |Bucket |Bucket |Bucket |Bucket | | | +// | | Replication | || | | +-----------+-----------------------------+ | | +// | | (only on the | || | | | | | +// | | write path)) | || | +--------------|--------------------------------+ | +// | +-------+--------+ || | +--v---+ | +// | | || | | Data | | +// | v || | +------+ | +// | +-------+--------+ || | +-----|-------+-------------+ | +// | | | || | +------|-------------|-------------|---------+ | +// | | Capture data ---| | | Fine |capture | | | | +// | | | | | | | | | | | +// | +-------+--------+ | | | +----v----+ +----v----+ +----v----+ | | +// | | | | | | queue | | queue | | queue | | | +// | v | | | +----+----+ +----+----+ +----+----+ | | +// | +-------+--------+ | | | | | | | | +// | | | | | | +----v-------------v-------------v------+ | | +// | | Place data | | | | | Analsis pool | | | +// | | to the disk | | | | +-----------------|---------------------+ | | +// | | | | | +-------------------|------------------------+ | +// | +----------------+ | | v | +// | | | Hotkey | +// +--------------------+ +----------------------------------------------------+ + +class hotkey_collector +{ +public: + // TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection + // weight: calculate the weight according to the specific situation + void capture_raw_key(const dsn::blob &raw_key, int64_t weight); + void capture_hash_key(const dsn::blob &hash_key, int64_t weight); + void handle_rpc(const dsn::replication::detect_hotkey_request &req, + /*out*/ dsn::replication::detect_hotkey_response &resp); +}; + +} // namespace server +} // namespace pegasus diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 577c907abe..122c638537 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -22,6 +22,7 @@ #include "capacity_unit_calculator.h" #include "pegasus_server_write.h" #include "meta_store.h" +#include "hotkey_collector.h" using namespace dsn::literals::chrono_literals; @@ -2798,5 +2799,28 @@ void pegasus_server_impl::set_ingestion_status(dsn::replication::ingestion_statu _ingestion_status = status; } +void pegasus_server_impl::on_detect_hotkey(const dsn::replication::detect_hotkey_request &req, + dsn::replication::detect_hotkey_response &resp) +{ + + if (dsn_unlikely(req.action != dsn::replication::detect_action::START && + req.action != dsn::replication::detect_action::STOP)) { + resp.err = dsn::ERR_INVALID_PARAMETERS; + resp.__set_err_hint("invalid detect_action"); + return; + } + + if (dsn_unlikely(req.type != dsn::replication::hotkey_type::READ && + req.type != dsn::replication::hotkey_type::WRITE)) { + resp.err = dsn::ERR_INVALID_PARAMETERS; + resp.__set_err_hint("invalid hotkey_type"); + return; + } + + auto collector = req.type == dsn::replication::hotkey_type::READ ? _read_hotkey_collector + : _write_hotkey_collector; + collector->handle_rpc(req, resp); +} + } // namespace server } // namespace pegasus diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 1db2077f6b..15cf80a4f8 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -28,6 +28,7 @@ namespace server { class meta_store; class capacity_unit_calculator; class pegasus_server_write; +class hotkey_collector; class pegasus_server_impl : public pegasus_read_service { @@ -318,6 +319,9 @@ class pegasus_server_impl : public pegasus_read_service ::dsn::error_code flush_all_family_columns(bool wait); + void on_detect_hotkey(const dsn::replication::detect_hotkey_request &req, + dsn::replication::detect_hotkey_response &resp) override; + private: static const std::chrono::seconds kServerStatUpdateTimeSec; static const std::string COMPRESSION_HEADER; @@ -382,6 +386,9 @@ class pegasus_server_impl : public pegasus_read_service dsn::task_tracker _tracker; + std::shared_ptr _read_hotkey_collector; + std::shared_ptr _write_hotkey_collector; + // perf counters ::dsn::perf_counter_wrapper _pfc_get_qps; ::dsn::perf_counter_wrapper _pfc_multi_get_qps; diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp index b97d66cdf7..71c6979a9c 100644 --- a/src/server/pegasus_server_impl_init.cpp +++ b/src/server/pegasus_server_impl_init.cpp @@ -12,6 +12,7 @@ #include "meta_store.h" #include "pegasus_event_listener.h" #include "pegasus_server_write.h" +#include "hotkey_collector.h" namespace pegasus { namespace server { @@ -42,6 +43,9 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) _primary_address = dsn::rpc_address(dsn_primary_address()).to_string(); _gpid = get_gpid(); + _read_hotkey_collector = std::make_shared(); + _write_hotkey_collector = std::make_shared(); + _verbose_log = dsn_config_get_value_bool("pegasus.server", "rocksdb_verbose_log", false, diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index 0184dbf115..20f290bdec 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -10,6 +10,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp" "../pegasus_mutation_duplicator.cpp" "../hotspot_partition_calculator.cpp" "../meta_store.cpp" + "../hotkey_collector.cpp" ) set(MY_SRC_SEARCH_MODE "GLOB")