Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock bug in EigenNonBlockingThreadPool.h #23098

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 66 additions & 58 deletions include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -1467,11 +1467,14 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
status = ThreadStatus::Spinning;
}

void SetBlocked(std::function<bool()> should_block,
bool SetBlocked(std::function<bool()> should_block,
std::function<void()> post_block) {
std::unique_lock<std::mutex> lk(mutex);
assert(GetStatus() == ThreadStatus::Spinning);
snnn marked this conversation as resolved.
Show resolved Hide resolved
status.store(ThreadStatus::Blocking, std::memory_order_relaxed);
auto old_status = status.exchange(ThreadStatus::Blocking, std::memory_order_seq_cst);
if (old_status != ThreadStatus::Spinning) {
// Encountered a logical error
return false;
}
snnn marked this conversation as resolved.
Show resolved Hide resolved
if (should_block()) {
status.store(ThreadStatus::Blocked, std::memory_order_relaxed);
do {
Expand All @@ -1480,6 +1483,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
post_block();
}
status.store(ThreadStatus::Spinning, std::memory_order_relaxed);
return true;
}

private:
Expand Down Expand Up @@ -1558,62 +1562,66 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter

// Attempt to block
if (!t) {
td.SetBlocked( // Pre-block test
[&]() -> bool {
bool should_block = true;
// Check whether work was pushed to us while attempting to block. We make
// this test while holding the per-thread status lock, and after setting
// our status to ThreadStatus::Blocking.
//
// This synchronizes with ThreadPool::Schedule which pushes work to the queue
// and then tests for ThreadStatus::Blocking/Blocked (via EnsureAwake):
//
// Main thread: Worker:
// #1 Push work #A Set status blocking
// #2 Read worker status #B Check queue
// #3 Wake if blocking/blocked
//
// If #A is before #2 then main sees worker blocked and wakes
//
// If #A if after #2 then #B will see #1, and we abandon blocking
assert(!t);
t = q.PopFront();
if (t) {
should_block = false;
}

// No work pushed to us, continue attempting to block. The remaining
// test is to synchronize with termination requests. If we are
// shutting down and all worker threads blocked without work, that's
// we are done.
if (should_block) {
blocked_++;
if (done_ && blocked_ == num_threads_) {
should_block = false;
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
// Now other worker threads can start exiting, which is bad if the
// work item submits other work. So we just check emptiness here,
// which ensures that all worker threads exit at the same time.
blocked_--;
} else {
should_exit = true;
if (!td.SetBlocked( // Pre-block test
[&]() -> bool {
bool should_block = true;
// Check whether work was pushed to us while attempting to block. We make
// this test while holding the per-thread status lock, and after setting
// our status to ThreadStatus::Blocking.
//
// This synchronizes with ThreadPool::Schedule which pushes work to the queue
// and then tests for ThreadStatus::Blocking/Blocked (via EnsureAwake):
//
// Main thread: Worker:
// #1 Push work #A Set status blocking
// #2 Read worker status #B Check queue
// #3 Wake if blocking/blocked
//
// If #A is before #2 then main sees worker blocked and wakes
//
// If #A if after #2 then #B will see #1, and we abandon blocking
assert(!t);
t = q.PopFront();
if (t) {
should_block = false;
}

// No work pushed to us, continue attempting to block. The remaining
// test is to synchronize with termination requests. If we are
// shutting down and all worker threads blocked without work, that's
// we are done.
if (should_block) {
blocked_++;
if (done_ && blocked_ == num_threads_) {
should_block = false;
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
// Now other worker threads can start exiting, which is bad if the
// work item submits other work. So we just check emptiness here,
// which ensures that all worker threads exit at the same time.
blocked_--;
} else {
should_exit = true;
}
}
}
}
}
return should_block;
},
// Post-block update (executed only if we blocked)
[&]() {
blocked_--;
});
return should_block;
},
// Post-block update (executed only if we blocked)
[&]() {
blocked_--;
})) {
// Encountered a fatal logic error in SetBlocked
should_exit = true;
break;
}
// Thread just unblocked. Unless we picked up work while
// blocking, or are exiting, then either work was pushed to
// us, or it was pushed to an overloaded queue
Expand Down
Loading