Skip to content

Commit

Permalink
Attempt fix
Browse files Browse the repository at this point in the history
Signed-off-by: Joaquin Anton Guirao <[email protected]>
  • Loading branch information
jantonguirao committed Jul 23, 2024
1 parent 0a35363 commit c44c269
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
10 changes: 5 additions & 5 deletions dali/pipeline/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ThreadPool::ThreadPool(int num_thread, int device_id, bool set_affinity, const c
ThreadPool::~ThreadPool() {
WaitForWork(false);

std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<spinlock> lock(lock_);
running_ = false;
condition_.notify_all();
lock.unlock();
Expand All @@ -59,7 +59,7 @@ ThreadPool::~ThreadPool() {
void ThreadPool::AddWork(Work work, int64_t priority, bool start_immediately) {
bool started_before = false;
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<spinlock> lock(lock_);
work_queue_.push({priority, std::move(work)});
work_complete_ = false;
started_before = started_;
Expand All @@ -75,7 +75,7 @@ void ThreadPool::AddWork(Work work, int64_t priority, bool start_immediately) {

// Blocks until all work issued to the thread pool is complete
void ThreadPool::WaitForWork(bool checkForErrors) {
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<spinlock> lock(lock_);
completed_.wait(lock, [this] { return this->work_complete_; });
started_ = false;
if (checkForErrors) {
Expand All @@ -93,7 +93,7 @@ void ThreadPool::WaitForWork(bool checkForErrors) {

void ThreadPool::RunAll(bool wait) {
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<spinlock> lock(lock_);
started_ = true;
}
condition_.notify_all(); // other threads will be waken up if needed
Expand Down Expand Up @@ -145,7 +145,7 @@ void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity,

while (running_) {
// Block on the condition to wait for work
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<spinlock> lock(lock_);
condition_.wait(lock, [this] { return !running_ || (!work_queue_.empty() && started_); });
// If we're no longer running, exit the run loop
if (!running_) break;
Expand Down
8 changes: 4 additions & 4 deletions dali/pipeline/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
#include <utility>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <string>
#include "dali/core/common.h"
#include "dali/core/spinlock.h"
#if NVML_ENABLED
#include "dali/util/nvml.h"
#endif
Expand Down Expand Up @@ -90,9 +90,9 @@ class DLL_PUBLIC ThreadPool {
bool work_complete_;
bool started_;
int active_threads_;
std::mutex mutex_;
std::condition_variable condition_;
std::condition_variable completed_;
spinlock lock_;
std::condition_variable_any condition_;
std::condition_variable_any completed_;

// Stored error strings for each thread
vector<std::queue<string>> tl_errors_;
Expand Down

0 comments on commit c44c269

Please sign in to comment.