diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp index 053d72cbfd..c326e72c60 100644 --- a/src/server/hotkey_collector.cpp +++ b/src/server/hotkey_collector.cpp @@ -255,6 +255,13 @@ void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result) } } +void hotkey_coarse_data_collector::clear() +{ + for (int i = 0; i < FLAGS_hotkey_buckets_num; i++) { + _hash_buckets[i].store(0); + } +} + hotkey_fine_data_collector::hotkey_fine_data_collector(replica_base *base, int target_bucket_index, int max_queue_size) @@ -332,5 +339,12 @@ void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result) } } +void hotkey_fine_data_collector::clear() +{ + std::pair key_weight_pair; + while (_capture_key_queue.try_dequeue(key_weight_pair)) { + } +} + } // namespace server } // namespace pegasus diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h index 937d07da25..9d6c3f1ad4 100644 --- a/src/server/hotkey_collector.h +++ b/src/server/hotkey_collector.h @@ -102,6 +102,7 @@ class internal_collector_base : public dsn::replication::replica_base explicit internal_collector_base(replica_base *base) : replica_base(base){}; virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0; virtual void analyse_data(detect_hotkey_result &result) = 0; + virtual void clear() = 0; }; // used in hotkey_collector_state::STOPPED and hotkey_collector_state::FINISHED, avoid null pointers @@ -111,6 +112,7 @@ class hotkey_empty_data_collector : public internal_collector_base explicit hotkey_empty_data_collector(replica_base *base) : internal_collector_base(base) {} void capture_data(const dsn::blob &hash_key, uint64_t weight) override {} void analyse_data(detect_hotkey_result &result) override {} + void clear() override {} }; // TODO: (Tangyanzhao) add a unit test of hotkey_coarse_data_collector @@ -120,6 +122,7 @@ class hotkey_coarse_data_collector : public internal_collector_base explicit hotkey_coarse_data_collector(replica_base *base); void capture_data(const dsn::blob &hash_key, uint64_t weight) override; void analyse_data(detect_hotkey_result &result) override; + void clear() override; private: hotkey_coarse_data_collector() = delete; @@ -133,6 +136,7 @@ class hotkey_fine_data_collector : public internal_collector_base hotkey_fine_data_collector(replica_base *base, int target_bucket_index, int max_queue_size); void capture_data(const dsn::blob &hash_key, uint64_t weight) override; void analyse_data(detect_hotkey_result &result) override; + void clear() override; private: hotkey_fine_data_collector() = delete;