From 22696a0904434c875df79b630b61606d5e7bbc2d Mon Sep 17 00:00:00 2001 From: Mahmoud Al-Qudsi Date: Sun, 27 Jun 2021 18:42:19 -0500 Subject: [PATCH] Make `WaitForMultipleEvents` atomic when `waitAll` is true This patch significantly changes the behavior (and to an extent, the performance) of pevents to more closely mimic the documented behavior of `WaitForMultipleObjects` and `WaitForMultipleObjectsEx`. As reported in #9, the previous behavior did not make any atomicity guarantees when a call to `WaitForMultipleEvents()` was made with `waitAll = true`, and WFME would attempt to serially obtain the events in question, which could lead to a deadlock in case of circular locking and auto-reset events. The WFMO behavior documented on MSDN makes it clear that the Windows implementation does not modify the signalled state of any of the manual or auto reset events being awaited until the WFMO call is ready to return, at which point either the one event in question or all the events being awaited (dependent on `waitAll`) are atomically awaited. --- src/pevents.cpp | 312 +++++++++++++++++++++++++++--------------------- 1 file changed, 179 insertions(+), 133 deletions(-) diff --git a/src/pevents.cpp b/src/pevents.cpp index 857a626..70c1457 100644 --- a/src/pevents.cpp +++ b/src/pevents.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #ifdef WFMO #include #include @@ -30,7 +31,7 @@ namespace neosmart { int RefCount; union { int FiredEvent; // WFSO - int EventsLeft; // WFMO + std::atomic EventsLeft; // WFMO } Status; bool WaitAll; bool StillWaiting; @@ -47,6 +48,7 @@ namespace neosmart { struct neosmart_wfmo_info_t_ { neosmart_wfmo_t Waiter; int WaitIndex; + bool Signalled; }; typedef neosmart_wfmo_info_t_ *neosmart_wfmo_info_t; #endif // WFMO @@ -103,14 +105,9 @@ namespace neosmart { assert(result == 0); // memory_order_relaxed: Newly created event is guaranteed to not have any waiters - event->State.store(false, std::memory_order_relaxed); + event->State.store(initialState, std::memory_order_relaxed); event->AutoReset = !manualReset; - if (initialState) { - result = SetEvent(event); - assert(result == 0); - } - return event; } @@ -157,9 +154,26 @@ namespace neosmart { result = 0; // memory_order_release: Prevent overlapping/interleaved Set/Reset contexts event->State.store(false, std::memory_order_release); + } else { + // We're trying to obtain a manual reset event with a signaled state; + // don't do anything. } - // Else we're trying to obtain a manual reset event with a signaled state; - // don't do anything + +#ifdef WFMO + // Un-signal any registered waiters in case of an auto-reset event + if (!event->State.load(std::memory_order_relaxed)) { + for (auto &wfmo : event->RegisteredWaits) { + if (wfmo.Signalled) { + // We don't need to lock the WFMO mutex because the event mutex is required to + // change the Signalled flag, and the EventsLeft atomic is only changed as a + // direct result of that. + + wfmo.Signalled = false; + wfmo.Waiter->Status.EventsLeft++; + } + } + } +#endif return result; } @@ -221,7 +235,7 @@ namespace neosmart { bool done = false; waitIndex = -1; - for (int i = 0; i < count; ++i) { + for (int i = 0; !done && i < count; ++i) { waitInfo.WaitIndex = i; // Must not release lock until RegisteredWait is potentially added @@ -235,63 +249,102 @@ namespace neosmart { RemoveExpiredWaitHelper), events[i]->RegisteredWaits.end()); - if (UnlockedWaitForEvent(events[i], 0) == 0) { - tempResult = pthread_mutex_unlock(&events[i]->Mutex); - assert(tempResult == 0); - + // Set the signalled flag without modifying (i.e. consuming) the event + waitInfo.Signalled = events[i]->State; + if (waitInfo.Signalled) { if (waitAll) { --wfmo->Status.EventsLeft; assert(wfmo->Status.EventsLeft >= 0); } else { + // Consume the event because we don't need to atomically wait for all + tempResult = UnlockedWaitForEvent(events[i], 0); + assert(tempResult == 0); + wfmo->Status.FiredEvent = i; waitIndex = i; done = true; - break; } - } else { + } + + if (!done) { + // Register this wait with the event in question events[i]->RegisteredWaits.push_back(waitInfo); ++wfmo->RefCount; - - tempResult = pthread_mutex_unlock(&events[i]->Mutex); - assert(tempResult == 0); } - } - // We set the `done` flag above in case of WaitAny and at least one event was set. - // But we need to check again here if we were doing a WaitAll or else we'll incorrectly - // return WAIT_TIMEOUT. - if (waitAll && wfmo->Status.EventsLeft == 0) { - done = true; + tempResult = pthread_mutex_unlock(&events[i]->Mutex); + assert(tempResult == 0); } timespec ts; - if (!done) { - if (milliseconds == 0) { - result = WAIT_TIMEOUT; - done = true; - } else if (milliseconds != WAIT_INFINITE) { - timeval tv; - gettimeofday(&tv, NULL); + if (!done && milliseconds != WAIT_INFINITE && milliseconds != 0) { + timeval tv; + gettimeofday(&tv, NULL); - uint64_t nanoseconds = ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 + - milliseconds * 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000; + uint64_t nanoseconds = ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 + + milliseconds * 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000; - ts.tv_sec = (time_t) (nanoseconds / 1000 / 1000 / 1000); - ts.tv_nsec = (long) (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000); - } + ts.tv_sec = (time_t) (nanoseconds / 1000 / 1000 / 1000); + ts.tv_nsec = (long) (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000); } while (!done) { // One (or more) of the events we're monitoring has been triggered? - // If we're waiting for all events, assume we're done and check if there's an event that - // hasn't fired But if we're waiting for just one event, assume we're not done until we - // find a fired event - done = (waitAll && wfmo->Status.EventsLeft == 0) || - (!waitAll && wfmo->Status.FiredEvent != -1); + if (!waitAll) { + done = wfmo->Status.FiredEvent != -1; + } else if (wfmo->Status.EventsLeft == 0) { + // All events are currently signalled, but we must atomically obtain them before + // returning. + +retry: + bool lockedAtomically = true; + for (int i = 0; i < count; ++i) { + tempResult = pthread_mutex_trylock(&events[i]->Mutex); + if (tempResult == EBUSY) { + // The event state is locked; we can't continue without knowing for sure if + // all the events can be atomically claimed because we risk missing a wake + // otherwise. To avoid a deadlock here, we return all the locks and try + // again. + for (int j = i - 1; j >= 0; --j) { + tempResult = pthread_mutex_unlock(&events[j]->Mutex); + assert(tempResult == 0); + } + goto retry; + } + + assert(tempResult == 0); + if (!events[i]->State) { + // The event has been stolen from under us; since we hold the WFMO lock, it + // should be safe to sleep until a relevant SetEvent() call is made. But + // first, release all the locks we've accumulated. + for (int j = i; j >= 0; --j) { + tempResult = pthread_mutex_unlock(&events[j]->Mutex); + assert(tempResult == 0); + } + lockedAtomically = false; + break; + } + } + + if (lockedAtomically) { + // We have all the locks, so we can atomically consume all the events + for (int i = 0; i < count; ++i) { + tempResult = UnlockedWaitForEvent(events[i], 0); + assert(tempResult == 0); + + tempResult = pthread_mutex_unlock(&events[i]->Mutex); + assert(tempResult == 0); + } + done = true; + } + } if (!done) { - if (milliseconds != WAIT_INFINITE) { + if (milliseconds == 0) { + result = WAIT_TIMEOUT; + done = true; + } else if (milliseconds != WAIT_INFINITE) { result = pthread_cond_timedwait(&wfmo->CVariable, &wfmo->Mutex, &ts); } else { result = pthread_cond_wait(&wfmo->CVariable, &wfmo->Mutex); @@ -349,117 +402,79 @@ namespace neosmart { int result = pthread_mutex_lock(&event->Mutex); assert(result == 0); - // memory_order_release: Unblock threads waiting for the event - event->State.store(true, std::memory_order_release); - - // Depending on the event type, we either trigger everyone or only one - if (event->AutoReset) { #ifdef WFMO - while (!event->RegisteredWaits.empty()) { - neosmart_wfmo_info_t i = &event->RegisteredWaits.front(); - - result = pthread_mutex_lock(&i->Waiter->Mutex); - assert(result == 0); + bool consumed = false; + for (std::deque::iterator i = event->RegisteredWaits.begin(); + !consumed && i != event->RegisteredWaits.end();) { + result = pthread_mutex_lock(&i->Waiter->Mutex); + assert(result == 0); + // Remove expired waits + if (!i->Waiter->StillWaiting) { --i->Waiter->RefCount; assert(i->Waiter->RefCount >= 0); - if (!i->Waiter->StillWaiting) { - bool destroy = i->Waiter->RefCount == 0; - result = pthread_mutex_unlock(&i->Waiter->Mutex); - assert(result == 0); - if (destroy) { - i->Waiter->Destroy(); - delete i->Waiter; - } - event->RegisteredWaits.pop_front(); - continue; + + bool destroy = i->Waiter->RefCount == 0; + result = pthread_mutex_unlock(&i->Waiter->Mutex); + assert(result == 0); + if (destroy) { + i->Waiter->Destroy(); + delete i->Waiter; } + i = event->RegisteredWaits.erase(i); + continue; + } + if (!i->Signalled) { + i->Signalled = true; if (i->Waiter->WaitAll) { --i->Waiter->Status.EventsLeft; assert(i->Waiter->Status.EventsLeft >= 0); - // We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft - // != 0 but the only time it'll be equal to zero is if we're the last event, so - // no one else will be checking the StillWaiting flag. We're good to go without - // it. + + if (i->Waiter->Status.EventsLeft == 0) { + // Wake the waiter but don't consume our event + result = pthread_cond_signal(&i->Waiter->CVariable); + assert(result == 0); + } } else { i->Waiter->Status.FiredEvent = i->WaitIndex; - i->Waiter->StillWaiting = false; - } - - result = pthread_mutex_unlock(&i->Waiter->Mutex); - assert(result == 0); - result = pthread_cond_signal(&i->Waiter->CVariable); - assert(result == 0); - - event->RegisteredWaits.pop_front(); - - // memory_order_release: Prevent overlapping of sequential Set/Reset states. - event->State.store(false, std::memory_order_release); - - result = pthread_mutex_unlock(&event->Mutex); - assert(result == 0); - - return 0; - } -#endif // WFMO - // event->State can be false if compiled with WFMO support - // memory_order_relaxed: ordering is ensured by the mutex - if (event->State.load(std::memory_order_relaxed)) { - result = pthread_mutex_unlock(&event->Mutex); - assert(result == 0); - - result = pthread_cond_signal(&event->CVariable); - assert(result == 0); - - return 0; - } - } else { // this is a manual reset event -#ifdef WFMO - for (size_t i = 0; i < event->RegisteredWaits.size(); ++i) { - neosmart_wfmo_info_t info = &event->RegisteredWaits[i]; - - result = pthread_mutex_lock(&info->Waiter->Mutex); - assert(result == 0); - - --info->Waiter->RefCount; - assert(info->Waiter->RefCount >= 0); + // If the waiter is waiting on any single event, just consume the call to + // SetEvent() that brought us here (in case of an auto-reset event) and + // stop. + if (event->AutoReset) { + consumed = true; + } - if (!info->Waiter->StillWaiting) { - bool destroy = info->Waiter->RefCount == 0; - result = pthread_mutex_unlock(&info->Waiter->Mutex); + result = pthread_cond_signal(&i->Waiter->CVariable); assert(result == 0); - if (destroy) { - info->Waiter->Destroy(); - delete info->Waiter; - } - continue; } + } - if (info->Waiter->WaitAll) { - --info->Waiter->Status.EventsLeft; - assert(info->Waiter->Status.EventsLeft >= 0); - // We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft - // != 0 but the only time it'll be equal to zero is if we're the last event, so - // no one else will be checking the StillWaiting flag. We're good to go without - // it. - } else { - info->Waiter->Status.FiredEvent = info->WaitIndex; - info->Waiter->StillWaiting = false; - } + result = pthread_mutex_unlock(&i->Waiter->Mutex); + assert(result == 0); - result = pthread_mutex_unlock(&info->Waiter->Mutex); - assert(result == 0); + i = ++i; + } - result = pthread_cond_signal(&info->Waiter->CVariable); - assert(result == 0); - } - event->RegisteredWaits.clear(); -#endif // WFMO + if (consumed) { result = pthread_mutex_unlock(&event->Mutex); assert(result == 0); + return 0; + } +#endif // WFMO + + // memory_order_release: Unblock threads waiting for the event + event->State.store(true, std::memory_order_release); + result = pthread_mutex_unlock(&event->Mutex); + assert(result == 0); + + // Depending on the event type, we either trigger everyone or only one + if (event->AutoReset) { + result = pthread_cond_signal(&event->CVariable); + assert(result == 0); + } else { // this is a manual reset event result = pthread_cond_broadcast(&event->CVariable); assert(result == 0); } @@ -475,6 +490,37 @@ namespace neosmart { // be required per https://old.reddit.com/r/cpp/comments/g84bzv/c/fpua2yq/ event->State.store(false, std::memory_order_release); +#ifdef WFMO + for (std::deque::iterator i = event->RegisteredWaits.begin(); + i != event->RegisteredWaits.end();) { + result = pthread_mutex_lock(&i->Waiter->Mutex); + assert(result == 0); + + if (!i->Waiter->StillWaiting) { + --i->Waiter->RefCount; + assert(i->Waiter->RefCount >= 0); + + bool destroy = i->Waiter->RefCount == 0; + result = pthread_mutex_unlock(&i->Waiter->Mutex); + assert(result == 0); + if (destroy) { + i->Waiter->Destroy(); + delete i->Waiter; + } + i = event->RegisteredWaits.erase(i); + continue; + } + + if (i->Signalled) { + if (i->Waiter->WaitAll) { + i->Signalled = false; + ++i->Waiter->Status.EventsLeft; + i->Waiter->StillWaiting = true; + } + } + } +#endif // WFMO + result = pthread_mutex_unlock(&event->Mutex); assert(result == 0);