Skip to content

Commit

Permalink
Put both the map and the mutex into one shared_ptr and make sure to o…
Browse files Browse the repository at this point in the history
…nly access them safe.
  • Loading branch information
achirkin committed Feb 8, 2023
1 parent 61b66a2 commit 1a416fa
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions cpp/include/raft/core/interruptible.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,8 @@ class interruptible {
auto operator=(interruptible&&) -> interruptible& = delete;

private:
/** Protect the access to the registry. */
static inline std::mutex mutex_;

using registry_t = std::unordered_map<std::thread::id, std::weak_ptr<interruptible>>;
using registry_t =
std::tuple<std::mutex, std::unordered_map<std::thread::id, std::weak_ptr<interruptible>>>;
/** Global registry of thread-local cancellation stores. */
static inline std::shared_ptr<registry_t> registry_{new registry_t{}};

Expand All @@ -203,26 +201,34 @@ class interruptible {
template <bool Claim>
static auto get_token_impl(std::thread::id thread_id) -> std::shared_ptr<interruptible>
{
std::lock_guard<std::mutex> guard_get(mutex_);
// the following constructs an empty shared_ptr if the key does not exist.
auto& weak_store = (*registry_)[thread_id];
// Make a local copy of the shared pointer to make sure the registry is not destroyed,
// if, for any reason, this function is called at program exit.
std::shared_ptr<registry_t> shared_registry = registry_;
// If the registry is not available, create a lone token that cannot be accessed from
// the outside of the thread.
if (!shared_registry) { return std::shared_ptr<interruptible>{new interruptible()}; }
// Otherwise, proceed with the normal logic
std::lock_guard<std::mutex> guard_get(std::get<0>(*shared_registry));
// the following two lines construct an empty shared_ptr if the key does not exist.
auto& weak_store = std::get<1>(*shared_registry)[thread_id];
auto thread_store = weak_store.lock();
if (!thread_store || (Claim && thread_store->claimed_)) {
std::weak_ptr<registry_t> weak_registry = registry_;
std::weak_ptr<registry_t> weak_registry = shared_registry;
// Create a new thread_store in two cases:
// 1. It does not exist in the map yet
// 2. The previous store in the map has not yet been deleted
thread_store.reset(new interruptible(), [weak_registry, thread_id](auto ts) {
auto registry = weak_registry.lock();
if (registry) {
std::lock_guard<std::mutex> guard_erase(mutex_);
auto found = registry->find(thread_id);
if (found != registry->end()) {
std::lock_guard<std::mutex> guard_erase(std::get<0>(*registry));
auto& map = std::get<1>(*registry);
auto found = map.find(thread_id);
if (found != map.end()) {
auto stored = found->second.lock();
// thread_store is not moveable, thus retains its original location.
// Not equal pointers below imply the new store has been already placed
// in the registry by the same std::thread::id
if (!stored || stored.get() == ts) { registry->erase(found); }
if (!stored || stored.get() == ts) { map.erase(found); }
}
}
delete ts;
Expand Down

0 comments on commit 1a416fa

Please sign in to comment.