Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to tbb::task_group #32804

Merged
merged 32 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d229056
Switch to tbb::task_group
Dr15Jones Jan 5, 2021
47ac1ec
Fixed unit tests to properly wait for threads.
Dr15Jones Jan 29, 2021
380dce5
Wait in Worker needs its own task_group
Dr15Jones Jan 29, 2021
a7be834
Reuse a task_group in EventProcessor
Dr15Jones Jan 29, 2021
5d504e7
Removed WaitingThreadIntProducer
Dr15Jones Feb 1, 2021
2fadf61
Fixed copy and assignment operations
Dr15Jones Feb 2, 2021
418cf89
Added utility methods to WaitingTask*Holder
Dr15Jones Feb 2, 2021
6b57e82
Fix FWCore/Concurrency unit tests
Dr15Jones Feb 2, 2021
16e3bbc
Properly handle EventSetup IOVs with tbb::task_group
Dr15Jones Feb 2, 2021
a2d1176
Applied code-format
Dr15Jones Feb 3, 2021
9997dbd
Use tbb::task_group in ExternalLHEProducer
Dr15Jones Feb 3, 2021
97d8acb
Removed unnecessary include
Dr15Jones Feb 3, 2021
b7f6706
Converted SecondaryEventProvider to use tbb::task_group
Dr15Jones Feb 4, 2021
52203d3
Updated ScopedContext to new WaitingTaskWithArenaHolder interface
Dr15Jones Feb 4, 2021
afb7901
RunManagerMTWorker now uses tbb::task_arena
Dr15Jones Feb 5, 2021
98fe3e7
Corrected change to ScopedContext
Dr15Jones Feb 5, 2021
9d8bfea
Fix clang warnings
Dr15Jones Feb 7, 2021
f827a10
Applied code-format
Dr15Jones Feb 25, 2021
10e14e7
Removed unnecessary tbb::empty_task
Dr15Jones Feb 25, 2021
5cd5acd
Use tbb preview tbb::task::suspend/resume API
Dr15Jones Feb 25, 2021
b40b071
Use edm::syncWait function
Dr15Jones Feb 25, 2021
da317d6
Remove the need for a FunctorTask
Dr15Jones Feb 25, 2021
275782c
Set lambda capture instead of a local variable
Dr15Jones Feb 26, 2021
0eb6c2c
Remove TBB thread observer when InitRootHandler's is destroyed
Dr15Jones Feb 27, 2021
82ef6b0
Avoid use of global tbb::task_arena
Dr15Jones Feb 27, 2021
fc8f1be
Properly pass down EventSetupInpl in calls
Dr15Jones Mar 1, 2021
fbbc3c7
Make ThreadTracker local to the main arena, and delete in ~InitRootHa…
Mar 1, 2021
3e2b8f2
Merge pull request #5 from dan131riley/thread-observer-segfault-fix
Dr15Jones Mar 1, 2021
6781e7f
Properly shutdown PythonEventProcessor
Dr15Jones Mar 2, 2021
85a7fc6
Properly wait for all tasks to finish in SerialTaskQueue dtr
Dr15Jones Mar 2, 2021
9cda9cc
Added comment on use and finished the 'rule of 5'
Dr15Jones Mar 3, 2021
73ec200
Use TaskSentry and remove unnneeded return statements
Dr15Jones Mar 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions FWCore/Concurrency/interface/FunctorTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,27 @@
#include <atomic>
#include <exception>
#include <memory>
#include "tbb/task.h"

// user include files
#include "FWCore/Concurrency/interface/TaskBase.h"

// forward declarations

namespace edm {
template <typename F>
class FunctorTask : public tbb::task {
class FunctorTask : public TaskBase {
public:
explicit FunctorTask(F f) : func_(std::move(f)) {}

task* execute() override {
func_();
return nullptr;
};
void execute() final { func_(); };

private:
F func_;
};

template <typename ALLOC, typename F>
FunctorTask<F>* make_functor_task(ALLOC&& iAlloc, F f) {
return new (iAlloc) FunctorTask<F>(std::move(f));
template <typename F>
FunctorTask<F>* make_functor_task(F f) {
return new FunctorTask<F>(std::move(f));
}
} // namespace edm

Expand Down
43 changes: 6 additions & 37 deletions FWCore/Concurrency/interface/LimitedTaskQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,7 @@ namespace edm {
* \param[in] iAction Must be a functor that takes no arguments and return no values.
*/
template <typename T>
void push(T&& iAction);

/// synchronously pushes functor iAction into queue
/**
* The function will wait until iAction has completed before returning.
* If another task is already running on the queue, the system is allowed
* to find another TBB task to execute while waiting for the iAction to finish.
* In that way the core is not idled while waiting.
* \param[in] iAction Must be a functor that takes no arguments and return no values.
*/
template <typename T>
void pushAndWait(T&& iAction);
void push(tbb::task_group& iGroup, T&& iAction);

class Resumer {
public:
Expand Down Expand Up @@ -113,7 +102,7 @@ namespace edm {
Using this function will decrease the allowed concurrency limit by 1.
*/
template <typename T>
void pushAndPause(T&& iAction);
void pushAndPause(tbb::task_group& iGroup, T&& iAction);

unsigned int concurrencyLimit() const { return m_queues.size(); }

Expand All @@ -123,10 +112,10 @@ namespace edm {
};

template <typename T>
void LimitedTaskQueue::push(T&& iAction) {
void LimitedTaskQueue::push(tbb::task_group& iGroup, T&& iAction) {
auto set_to_run = std::make_shared<std::atomic<bool>>(false);
for (auto& q : m_queues) {
q.push([set_to_run, iAction]() mutable {
q.push(iGroup, [set_to_run, iAction]() mutable {
bool expected = false;
if (set_to_run->compare_exchange_strong(expected, true)) {
iAction();
Expand All @@ -136,30 +125,10 @@ namespace edm {
}

template <typename T>
void LimitedTaskQueue::pushAndWait(T&& iAction) {
tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
waitTask->set_ref_count(2);
auto set_to_run = std::make_shared<std::atomic<bool>>(false);
for (auto& q : m_queues) {
q.push([set_to_run, waitTask, iAction]() mutable {
bool expected = false;
if (set_to_run->compare_exchange_strong(expected, true)) {
// Exception needs to be caught in order to decrease the waitTask reference count at the end. The user of SerialTaskQueue should handle exceptions within iAction.
CMS_SA_ALLOW try { iAction(); } catch (...) {
}
waitTask->decrement_ref_count();
}
});
}
waitTask->wait_for_all();
tbb::task::destroy(*waitTask);
}

template <typename T>
void LimitedTaskQueue::pushAndPause(T&& iAction) {
void LimitedTaskQueue::pushAndPause(tbb::task_group& iGroup, T&& iAction) {
auto set_to_run = std::make_shared<std::atomic<bool>>(false);
for (auto& q : m_queues) {
q.push([&q, set_to_run, iAction]() mutable {
q.push(iGroup, [&q, set_to_run, iAction]() mutable {
bool expected = false;
if (set_to_run->compare_exchange_strong(expected, true)) {
q.pause();
Expand Down
55 changes: 17 additions & 38 deletions FWCore/Concurrency/interface/SerialTaskQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
#include <atomic>
#include <cassert>

#include "tbb/task.h"
#include "tbb/task_group.h"
#include "tbb/concurrent_queue.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

Expand Down Expand Up @@ -116,41 +116,33 @@ namespace edm {
* \param[in] iAction Must be a functor that takes no arguments and return no values.
*/
template <typename T>
void push(const T& iAction);

/// synchronously pushes functor iAction into queue
/**
* The function will wait until iAction has completed before returning.
* If another task is already running on the queue, the system is allowed
* to find another TBB task to execute while waiting for the iAction to finish.
* In that way the core is not idled while waiting.
* \param[in] iAction Must be a functor that takes no arguments and return no values.
*/
template <typename T>
void pushAndWait(const T& iAction);
void push(tbb::task_group&, const T& iAction);

private:
/** Base class for all tasks held by the SerialTaskQueue */
class TaskBase : public tbb::task {
class TaskBase {
friend class SerialTaskQueue;
TaskBase() : m_queue(nullptr) {}

tbb::task_group* group() { return m_group; }
virtual void execute() = 0;

public:
virtual ~TaskBase() = default;

protected:
TaskBase* finishedTask();
explicit TaskBase(tbb::task_group* iGroup) : m_group(iGroup) {}

private:
void setQueue(SerialTaskQueue* iQueue) { m_queue = iQueue; }

SerialTaskQueue* m_queue;
tbb::task_group* m_group;
};

template <typename T>
class QueuedTask : public TaskBase {
public:
QueuedTask(const T& iAction) : m_action(iAction) {}
QueuedTask(tbb::task_group& iGroup, const T& iAction) : TaskBase(&iGroup), m_action(iAction) {}

private:
TaskBase* execute() override;
void execute() final;

T m_action;
};
Expand All @@ -163,7 +155,7 @@ namespace edm {
//returns nullptr if a task is already being processed
TaskBase* pickNextTask();

void pushAndWait(tbb::empty_task* iWait, TaskBase*);
void spawn(TaskBase&);

// ---------- member data --------------------------------
tbb::concurrent_queue<TaskBase*> m_tasks;
Expand All @@ -172,29 +164,16 @@ namespace edm {
};

template <typename T>
void SerialTaskQueue::push(const T& iAction) {
QueuedTask<T>* pTask{new (tbb::task::allocate_root()) QueuedTask<T>{iAction}};
pTask->setQueue(this);
void SerialTaskQueue::push(tbb::task_group& iGroup, const T& iAction) {
QueuedTask<T>* pTask{new QueuedTask<T>{iGroup, iAction}};
Dr15Jones marked this conversation as resolved.
Show resolved Hide resolved
pushTask(pTask);
}

template <typename T>
void SerialTaskQueue::pushAndWait(const T& iAction) {
tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
waitTask->set_ref_count(2);
QueuedTask<T>* pTask{new (waitTask->allocate_child()) QueuedTask<T>{iAction}};
pTask->setQueue(this);
pushAndWait(waitTask, pTask);
}

inline SerialTaskQueue::TaskBase* SerialTaskQueue::TaskBase::finishedTask() { return m_queue->finishedTask(); }

template <typename T>
SerialTaskQueue::TaskBase* SerialTaskQueue::QueuedTask<T>::execute() {
void SerialTaskQueue::QueuedTask<T>::execute() {
// Exception has to swallowed in order to avoid throwing from execute(). The user of SerialTaskQueue should handle exceptions within m_action().
CMS_SA_ALLOW try { this->m_action(); } catch (...) {
}
return this->finishedTask();
}

} // namespace edm
Expand Down
55 changes: 9 additions & 46 deletions FWCore/Concurrency/interface/SerialTaskQueueChain.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,7 @@ namespace edm {
* \param[in] iAction Must be a functor that takes no arguments and return no values.
*/
template <typename T>
void push(T&& iAction);

/// synchronously pushes functor iAction into queue
/**
* The function will wait until iAction has completed before returning.
* If another task is already running on the queue, the system is allowed
* to find another TBB task to execute while waiting for the iAction to finish.
* In that way the core is not idled while waiting.
* \param[in] iAction Must be a functor that takes no arguments and return no values.
*/
template <typename T>
void pushAndWait(T&& iAction);
void push(tbb::task_group& iGroup, T&& iAction);

unsigned long outstandingTasks() const { return m_outstandingTasks; }
std::size_t numberOfQueues() const { return m_queues.size(); }
Expand All @@ -76,61 +65,35 @@ namespace edm {
std::atomic<unsigned long> m_outstandingTasks{0};

template <typename T>
void passDownChain(unsigned int iIndex, T&& iAction);
void passDownChain(unsigned int iIndex, tbb::task_group& iGroup, T&& iAction);

template <typename T>
void actionToRun(T&& iAction);
};

template <typename T>
void SerialTaskQueueChain::push(T&& iAction) {
void SerialTaskQueueChain::push(tbb::task_group& iGroup, T&& iAction) {
++m_outstandingTasks;
if (m_queues.size() == 1) {
m_queues[0]->push([this, iAction]() mutable { this->actionToRun(iAction); });
m_queues[0]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
} else {
assert(!m_queues.empty());
m_queues[0]->push([this, iAction]() mutable { this->passDownChain(1, iAction); });
}
}

template <typename T>
void SerialTaskQueueChain::pushAndWait(T&& iAction) {
auto destry = [](tbb::task* iTask) { tbb::task::destroy(*iTask); };

std::unique_ptr<tbb::task, decltype(destry)> waitTask(new (tbb::task::allocate_root()) tbb::empty_task, destry);
waitTask->set_ref_count(3);

std::exception_ptr ptr;
auto waitTaskPtr = waitTask.get();
push([waitTaskPtr, iAction, &ptr]() {
//must wait until exception ptr would be set
auto dec = [](tbb::task* iTask) { iTask->decrement_ref_count(); };
std::unique_ptr<tbb::task, decltype(dec)> sentry(waitTaskPtr, dec);
// Caught exception is rethrown further below.
CMS_SA_ALLOW try { iAction(); } catch (...) {
ptr = std::current_exception();
}
});

waitTask->decrement_ref_count();
waitTask->wait_for_all();

if (ptr) {
std::rethrow_exception(ptr);
m_queues[0]->push(iGroup, [this, &iGroup, iAction]() mutable { this->passDownChain(1, iGroup, iAction); });
}
}

template <typename T>
void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, T&& iAction) {
void SerialTaskQueueChain::passDownChain(unsigned int iQueueIndex, tbb::task_group& iGroup, T&& iAction) {
//Have to be sure the queue associated to this running task
// does not attempt to start another task
m_queues[iQueueIndex - 1]->pause();
//is this the last queue?
if (iQueueIndex + 1 == m_queues.size()) {
m_queues[iQueueIndex]->push([this, iAction]() mutable { this->actionToRun(iAction); });
m_queues[iQueueIndex]->push(iGroup, [this, iAction]() mutable { this->actionToRun(iAction); });
} else {
auto nextQueue = iQueueIndex + 1;
m_queues[iQueueIndex]->push([this, nextQueue, iAction]() mutable { this->passDownChain(nextQueue, iAction); });
m_queues[iQueueIndex]->push(
iGroup, [this, nextQueue, &iGroup, iAction]() mutable { this->passDownChain(nextQueue, iGroup, iAction); });
}
}

Expand Down
65 changes: 65 additions & 0 deletions FWCore/Concurrency/interface/TaskBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#ifndef FWCore_Concurrency_TaskBase_h
#define FWCore_Concurrency_TaskBase_h
// -*- C++ -*-
//
// Package: Concurrency
// Class : TaskBase
//
/**\class TaskBase TaskBase.h FWCore/Concurrency/interface/TaskBase.h

Description: Base class for tasks.

Usage:
Used as a callback to happen after a task has been completed.
*/
//
// Original Author: Chris Jones
// Created: Tue Jan 5 13:46:31 CST 2020
// $Id$
//

// system include files
#include <atomic>
#include <exception>
#include <memory>

// user include files

// forward declarations

namespace edm {
class TaskBase {
public:
friend class TaskSentry;

///Constructor
TaskBase() : m_refCount{0} {}
virtual ~TaskBase() = default;

virtual void execute() = 0;

void increment_ref_count() { ++m_refCount; }
unsigned int decrement_ref_count() { return --m_refCount; }

private:
virtual void recycle() { delete this; }
Dr15Jones marked this conversation as resolved.
Show resolved Hide resolved

std::atomic<unsigned int> m_refCount{0};
};

class TaskSentry {
public:
TaskSentry(TaskBase* iTask) : m_task{iTask} {}
~TaskSentry() { m_task->recycle(); }
TaskSentry() = delete;
TaskSentry(TaskSentry const&) = delete;
TaskSentry(TaskSentry&&) = delete;
TaskSentry operator=(TaskSentry const&) = delete;
TaskSentry operator=(TaskSentry&&) = delete;

private:
TaskBase* m_task;
};
} // namespace edm

#endif
Loading