Skip to content

Commit

Permalink
Fix a deadlock bug in EigenNonBlockingThreadPool.h (#23098)
Browse files Browse the repository at this point in the history
### Description
This PR fixes a deadlock bug in EigenNonBlockingThreadPool.h. It only happens on platforms with weakly ordered memory model, such as ARM64.
  • Loading branch information
snnn authored and guschmue committed Dec 16, 2024
1 parent 84e1a76 commit 2d25d29
Showing 1 changed file with 66 additions and 58 deletions.
124 changes: 66 additions & 58 deletions include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -1467,11 +1467,14 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
status = ThreadStatus::Spinning;
}

void SetBlocked(std::function<bool()> should_block,
bool SetBlocked(std::function<bool()> should_block,
std::function<void()> post_block) {
std::unique_lock<std::mutex> lk(mutex);
assert(GetStatus() == ThreadStatus::Spinning);
status.store(ThreadStatus::Blocking, std::memory_order_relaxed);
auto old_status = status.exchange(ThreadStatus::Blocking, std::memory_order_seq_cst);
if (old_status != ThreadStatus::Spinning) {
// Encountered a logical error
return false;
}
if (should_block()) {
status.store(ThreadStatus::Blocked, std::memory_order_relaxed);
do {
Expand All @@ -1480,6 +1483,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
post_block();
}
status.store(ThreadStatus::Spinning, std::memory_order_relaxed);
return true;
}

private:
Expand Down Expand Up @@ -1558,62 +1562,66 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter

// Attempt to block
if (!t) {
td.SetBlocked( // Pre-block test
[&]() -> bool {
bool should_block = true;
// Check whether work was pushed to us while attempting to block. We make
// this test while holding the per-thread status lock, and after setting
// our status to ThreadStatus::Blocking.
//
// This synchronizes with ThreadPool::Schedule which pushes work to the queue
// and then tests for ThreadStatus::Blocking/Blocked (via EnsureAwake):
//
// Main thread: Worker:
// #1 Push work #A Set status blocking
// #2 Read worker status #B Check queue
// #3 Wake if blocking/blocked
//
// If #A is before #2 then main sees worker blocked and wakes
//
// If #A if after #2 then #B will see #1, and we abandon blocking
assert(!t);
t = q.PopFront();
if (t) {
should_block = false;
}

// No work pushed to us, continue attempting to block. The remaining
// test is to synchronize with termination requests. If we are
// shutting down and all worker threads blocked without work, that's
// we are done.
if (should_block) {
blocked_++;
if (done_ && blocked_ == num_threads_) {
should_block = false;
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
// Now other worker threads can start exiting, which is bad if the
// work item submits other work. So we just check emptiness here,
// which ensures that all worker threads exit at the same time.
blocked_--;
} else {
should_exit = true;
if (!td.SetBlocked( // Pre-block test
[&]() -> bool {
bool should_block = true;
// Check whether work was pushed to us while attempting to block. We make
// this test while holding the per-thread status lock, and after setting
// our status to ThreadStatus::Blocking.
//
// This synchronizes with ThreadPool::Schedule which pushes work to the queue
// and then tests for ThreadStatus::Blocking/Blocked (via EnsureAwake):
//
// Main thread: Worker:
// #1 Push work #A Set status blocking
// #2 Read worker status #B Check queue
// #3 Wake if blocking/blocked
//
// If #A is before #2 then main sees worker blocked and wakes
//
// If #A if after #2 then #B will see #1, and we abandon blocking
assert(!t);
t = q.PopFront();
if (t) {
should_block = false;
}

// No work pushed to us, continue attempting to block. The remaining
// test is to synchronize with termination requests. If we are
// shutting down and all worker threads blocked without work, that's
// we are done.
if (should_block) {
blocked_++;
if (done_ && blocked_ == num_threads_) {
should_block = false;
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
// Now other worker threads can start exiting, which is bad if the
// work item submits other work. So we just check emptiness here,
// which ensures that all worker threads exit at the same time.
blocked_--;
} else {
should_exit = true;
}
}
}
}
}
return should_block;
},
// Post-block update (executed only if we blocked)
[&]() {
blocked_--;
});
return should_block;
},
// Post-block update (executed only if we blocked)
[&]() {
blocked_--;
})) {
// Encountered a fatal logic error in SetBlocked
should_exit = true;
break;
}
// Thread just unblocked. Unless we picked up work while
// blocking, or are exiting, then either work was pushed to
// us, or it was pushed to an overloaded queue
Expand Down

0 comments on commit 2d25d29

Please sign in to comment.