Skip to content

Commit

Permalink
Change atomic memory access semantics
Browse files Browse the repository at this point in the history
* The explanation at [0] and in that article's comments is sufficiently
  convincing to downgrade from `memory_order_release` to
  `memory_order_relaxed` when acquiring a lock (i.e. when setting the
  event to false).
* We explicitly handle any concurrent calls to `ResetEvent()` as racy by
  nature and don't waste time trying to obtain the mutex.
* All `RefCount` manipulations have been downgraded from the default
  `memory_order_seq_cst` to an explicit `memory_order_acq_rel` given
  their access patterns.
* Back-to-back changes to `event->State` during processing have been
  replaced with explicit values at the distinct call branches to avoid
  any cache line issues.
  • Loading branch information
mqudsi committed Jul 10, 2021
1 parent 78df743 commit d0f1864
Showing 1 changed file with 98 additions and 68 deletions.
166 changes: 98 additions & 68 deletions src/pevents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace neosmart {
int EventsLeft; // WFMO
} Status;
bool WaitAll;
bool StillWaiting;
std::atomic<bool> StillWaiting;

void Destroy() {
pthread_mutex_destroy(&Mutex);
Expand Down Expand Up @@ -64,14 +64,14 @@ namespace neosmart {

#ifdef WFMO
static bool RemoveExpiredWaitHelper(neosmart_wfmo_info_t_ wait) {
if (wait.Waiter->StillWaiting) {
if (wait.Waiter->StillWaiting.load(std::memory_order_relaxed)) {
return false;
}

int ref_count = --wait.Waiter->RefCount;
assert(ref_count >= 0);
int ref_count = wait.Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel);
assert(ref_count > 0);

if (ref_count == 0) {
if (ref_count == 1) {
wait.Waiter->Destroy();
delete wait.Waiter;
}
Expand All @@ -88,21 +88,18 @@ namespace neosmart {
result = pthread_mutex_init(&event->Mutex, 0);
assert(result == 0);

// memory_order_relaxed: Newly created event is guaranteed to not have any waiters
event->State.store(false, std::memory_order_relaxed);
event->AutoReset = !manualReset;

if (initialState) {
result = SetEvent(event);
assert(result == 0);
}
// memory_order_release: if `initialState == true`, allow a load with acquire semantics to
// see the value.
event->State.store(initialState, std::memory_order_release);

return event;
}

static int UnlockedWaitForEvent(neosmart_event_t event, uint64_t milliseconds) {
int result = 0;
// memory_order_relaxed: unlocking/ordering is guaranteed prior to calling this function
// memory_order_relaxed: `State` is only set to true with the mutex held, and we require
// that this function only be called after the mutex is obtained.
if (!event->State.load(std::memory_order_relaxed)) {
// Zero-timeout event state check optimization
if (milliseconds == 0) {
Expand All @@ -129,23 +126,24 @@ namespace neosmart {
} else {
result = pthread_cond_wait(&event->CVariable, &event->Mutex);
}
// memory_order_relaxed: ordering is guaranteed by the mutex
// memory_order_relaxed: ordering is guaranteed by the mutex, as `State = true` is
// only ever written with the mutex held.
} while (result == 0 && !event->State.load(std::memory_order_relaxed));

if (result == 0 && event->AutoReset) {
// We've only accquired the event if the wait succeeded
// memory_order_release: Prevent overlapping/interleaved Set/Reset contexts
event->State.store(false, std::memory_order_release);
}
} else if (event->AutoReset) {
// It's an auto-reset event that's currently available;
// we need to stop anyone else from using it
result = 0;
// memory_order_release: Prevent overlapping/interleaved Set/Reset contexts
event->State.store(false, std::memory_order_release);
}
// Else we're trying to obtain a manual reset event with a signaled state;
// don't do anything
else {
// We're trying to obtain a manual reset event with a signaled state; don't do anything
}

if (result == 0 && event->AutoReset) {
// We've only accquired the event if the wait succeeded
// memory_order_relaxed: we never act on `State == true` without fully synchronizing
// or grabbing the mutex, so it's OK to use relaxed semantics here.
event->State.store(false, std::memory_order_relaxed);
}

return result;
}
Expand Down Expand Up @@ -191,16 +189,20 @@ namespace neosmart {
waitInfo.Waiter = wfmo;
waitInfo.WaitIndex = -1;

wfmo->WaitAll = waitAll;
wfmo->StillWaiting = true;
wfmo->RefCount = 1;

if (waitAll) {
wfmo->Status.EventsLeft = count;
} else {
wfmo->Status.FiredEvent = -1;
}

wfmo->WaitAll = waitAll;
wfmo->StillWaiting.store(true, std::memory_order_release);
// memory_order_release: this is the initial value other threads should see
wfmo->RefCount.store(1 + count, std::memory_order_release);
// Separately keep track of how many refs to decrement after the initialization loop, to
// avoid repeatedly clearing the cache line.
int skipped_refs = 0;

tempResult = pthread_mutex_lock(&wfmo->Mutex);
assert(tempResult == 0);

Expand All @@ -226,17 +228,18 @@ namespace neosmart {
assert(tempResult == 0);

if (waitAll) {
++skipped_refs;
--wfmo->Status.EventsLeft;
assert(wfmo->Status.EventsLeft >= 0);
} else {
skipped_refs += (count - i);
wfmo->Status.FiredEvent = i;
waitIndex = i;
done = true;
break;
}
} else {
events[i]->RegisteredWaits.push_back(waitInfo);
wfmo->RefCount.fetch_add(1, std::memory_order_relaxed);

tempResult = pthread_mutex_unlock(&events[i]->Mutex);
assert(tempResult == 0);
Expand Down Expand Up @@ -290,14 +293,17 @@ namespace neosmart {
}

waitIndex = wfmo->Status.FiredEvent;
wfmo->StillWaiting = false;
// memory_order_relaxed: this is only checked outside the mutex to determine if waiting has
// terminated meaning it's safe to decrement the ref count. If it's true (which we write
// with release semantics), then the mutex is always entered.
wfmo->StillWaiting.store(false, std::memory_order_relaxed);

tempResult = pthread_mutex_unlock(&wfmo->Mutex);
assert(tempResult == 0);

int ref_count = --wfmo->RefCount;
assert(ref_count >= 0);
if (ref_count == 0) {
int ref_count = wfmo->RefCount.fetch_sub(1 + skipped_refs, std::memory_order_acq_rel);
assert(ref_count > 0);
if (ref_count == 1 + skipped_refs) {
wfmo->Destroy();
delete wfmo;
}
Expand Down Expand Up @@ -335,34 +341,42 @@ namespace neosmart {
int result = pthread_mutex_lock(&event->Mutex);
assert(result == 0);

// memory_order_release: Unblock threads waiting for the event
event->State.store(true, std::memory_order_release);

// Depending on the event type, we either trigger everyone or only one
if (event->AutoReset) {
#ifdef WFMO
while (!event->RegisteredWaits.empty()) {
neosmart_wfmo_info_t i = &event->RegisteredWaits.front();

// memory_order_relaxed: this is just an optimization to see if it is OK to skip
// this waiter, and if it's observed to be false then it's OK to bypass the mutex at
// that point.
if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) {
int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel);
if (ref_count == 1) {
i->Waiter->Destroy();
delete i->Waiter;
}
event->RegisteredWaits.pop_front();
continue;
}

result = pthread_mutex_lock(&i->Waiter->Mutex);
assert(result == 0);

int ref_count = --i->Waiter->RefCount;
assert(ref_count >= 0);
if (!i->Waiter->StillWaiting) {
int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel);
assert(ref_count > 0);
// memory_order_relaxed: this is only changed with the lock acquired
if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) {
result = pthread_mutex_unlock(&i->Waiter->Mutex);
assert(result == 0);

if (ref_count == 0) {
if (ref_count == 1) {
i->Waiter->Destroy();
delete i->Waiter;
}
event->RegisteredWaits.pop_front();
continue;
}
else {
assert(ref_count > 0);
}

if (i->Waiter->WaitAll) {
--i->Waiter->Status.EventsLeft;
Expand All @@ -373,7 +387,9 @@ namespace neosmart {
// it.
} else {
i->Waiter->Status.FiredEvent = i->WaitIndex;
i->Waiter->StillWaiting = false;
// memory_order_relaxed: The flip to false is only lazily observed as an
// optimization to bypass the mutex for cleanup.
i->Waiter->StillWaiting.store(false, std::memory_order_relaxed);
}

result = pthread_mutex_unlock(&i->Waiter->Mutex);
Expand All @@ -384,42 +400,58 @@ namespace neosmart {

event->RegisteredWaits.pop_front();

// memory_order_release: Prevent overlapping of sequential Set/Reset states.
event->State.store(false, std::memory_order_release);
// The auto-reset event has been consumed, so we must make sure it's no longer set.
// memory_order_relaxed: `State` is only set to true with the mutex held, and we
// require that this function only be called after the mutex is obtained.
event->State.store(false, std::memory_order_relaxed);

result = pthread_mutex_unlock(&event->Mutex);
assert(result == 0);

return 0;
}
#endif // WFMO
// event->State can be false if compiled with WFMO support
// memory_order_relaxed: ordering is ensured by the mutex
if (event->State.load(std::memory_order_relaxed)) {
result = pthread_mutex_unlock(&event->Mutex);
assert(result == 0);
// memory_order_release: this is the synchronization point for any threads spin-waiting
// for the event to become available.
event->State.store(true, std::memory_order_release);

result = pthread_cond_signal(&event->CVariable);
assert(result == 0);
result = pthread_mutex_unlock(&event->Mutex);
assert(result == 0);

return 0;
}
result = pthread_cond_signal(&event->CVariable);
assert(result == 0);

return 0;
} else { // this is a manual reset event
// memory_order_release: this is the synchronization point for any threads spin-waiting
// for the event to become available.
event->State.store(true, std::memory_order_release);
#ifdef WFMO
for (size_t i = 0; i < event->RegisteredWaits.size(); ++i) {
neosmart_wfmo_info_t info = &event->RegisteredWaits[i];

// memory_order_relaxed: this is just an optimization to see if it is OK to skip
// this waiter, and if it's observed to be false then it's OK to bypass the mutex at
// that point.
if (!info->Waiter->StillWaiting.load(std::memory_order_relaxed)) {
int ref_count = info->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel);
if (ref_count == 1) {
info->Waiter->Destroy();
delete info->Waiter;
}
continue;
}

result = pthread_mutex_lock(&info->Waiter->Mutex);
assert(result == 0);

--info->Waiter->RefCount;
assert(info->Waiter->RefCount >= 0);
int ref_count = info->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel);
assert(ref_count > 0);

if (!info->Waiter->StillWaiting) {
bool destroy = info->Waiter->RefCount == 0;
result = pthread_mutex_unlock(&info->Waiter->Mutex);
assert(result == 0);
if (destroy) {
if (ref_count == 1) {
info->Waiter->Destroy();
delete info->Waiter;
}
Expand All @@ -435,7 +467,9 @@ namespace neosmart {
// it.
} else {
info->Waiter->Status.FiredEvent = info->WaitIndex;
info->Waiter->StillWaiting = false;
// memory_order_relaxed: The flip to false is only lazily observed as an
// optimization to bypass the mutex for cleanup.
info->Waiter->StillWaiting.store(false, std::memory_order_relaxed);
}

result = pthread_mutex_unlock(&info->Waiter->Mutex);
Expand All @@ -457,14 +491,10 @@ namespace neosmart {
}

int ResetEvent(neosmart_event_t event) {
int result = pthread_mutex_lock(&event->Mutex);
assert(result == 0);

event->State.store(false, std::memory_order_release);

result = pthread_mutex_unlock(&event->Mutex);
assert(result == 0);

// memory_order_relaxed and no mutex: there can't be any guarantees about concurrent calls
// to either of WFMO()/SetEvent() and ResetEvent() because they're racy by nature. Only the
// behavior of concurrent WFMO() and SetEvent() calls is strongly defined.
event->State.store(false, std::memory_order_relaxed);
return 0;
}

Expand Down

0 comments on commit d0f1864

Please sign in to comment.