Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hotkey): add thread_safe method to clear collector #640

Merged
merged 2 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
}
}

void hotkey_coarse_data_collector::clear()
{
for (int i = 0; i < FLAGS_hotkey_buckets_num; i++) {
_hash_buckets[i].store(0);
}
}

hotkey_fine_data_collector::hotkey_fine_data_collector(replica_base *base,
int target_bucket_index,
int max_queue_size)
Expand Down Expand Up @@ -332,5 +339,12 @@ void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result)
}
}

void hotkey_fine_data_collector::clear()
{
std::pair<dsn::blob, uint64_t> key_weight_pair;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about change it like this?

    moodycamel::ConcurrentQueue<std::pair<dsn::blob, uint64_t>> temp;
    _capture_key_queue.swap(temp);

Copy link
Contributor Author

@Smityz Smityz Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concurrentqueue.h:890
	// Swaps this queue's state with the other's. Not thread-safe.
	// Swapping two queues does not invalidate their tokens, however
	// the tokens that were created for one queue must be used with
	// only the swapped queue (i.e. the tokens are tied to the
	// queue's movable state, not the object itself).

It's not a thread safe way

while (_capture_key_queue.try_dequeue(key_weight_pair)) {
};
Smityz marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace server
} // namespace pegasus
4 changes: 4 additions & 0 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class internal_collector_base : public dsn::replication::replica_base
explicit internal_collector_base(replica_base *base) : replica_base(base){};
virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0;
virtual void analyse_data(detect_hotkey_result &result) = 0;
virtual void clear() = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where you call this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

};

// used in hotkey_collector_state::STOPPED and hotkey_collector_state::FINISHED, avoid null pointers
Expand All @@ -111,6 +112,7 @@ class hotkey_empty_data_collector : public internal_collector_base
explicit hotkey_empty_data_collector(replica_base *base) : internal_collector_base(base) {}
void capture_data(const dsn::blob &hash_key, uint64_t weight) override {}
void analyse_data(detect_hotkey_result &result) override {}
void clear() override {}
};

// TODO: (Tangyanzhao) add a unit test of hotkey_coarse_data_collector
Expand All @@ -120,6 +122,7 @@ class hotkey_coarse_data_collector : public internal_collector_base
explicit hotkey_coarse_data_collector(replica_base *base);
void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
void analyse_data(detect_hotkey_result &result) override;
void clear() override;

private:
hotkey_coarse_data_collector() = delete;
Expand All @@ -133,6 +136,7 @@ class hotkey_fine_data_collector : public internal_collector_base
hotkey_fine_data_collector(replica_base *base, int target_bucket_index, int max_queue_size);
void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
void analyse_data(detect_hotkey_result &result) override;
void clear() override;

private:
hotkey_fine_data_collector() = delete;
Expand Down