Skip to content

Commit

Permalink
Remove last use of tbb::task::suspend
Browse files Browse the repository at this point in the history
The defered task implementation avoids the problems where suspend
could resume a function on a different thread.
  • Loading branch information
Dr15Jones committed Dec 6, 2022
1 parent 777fa29 commit af96ac6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 23 deletions.
21 changes: 4 additions & 17 deletions FWCore/Concurrency/interface/include_first_syncWait.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,19 @@
// Created by Chris Jones on 2/24/21.
//
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Concurrency/interface/FinalWaitingTask.h"
#include "oneapi/tbb/task_group.h"
#include "oneapi/tbb/task.h"
#include <exception>

namespace edm {
template <typename F>
[[nodiscard]] std::exception_ptr syncWait(F&& iFunc) {
std::exception_ptr exceptPtr{};
//oneapi::tbb::task::suspend can only be run from within a task running in this arena. For 1 thread,
// it is often (always?) the case where not such task is being run here. Therefore we need
// to use a temp task_group to start up such a task.
oneapi::tbb::task_group group;
group.run([&]() {
oneapi::tbb::task::suspend([&](oneapi::tbb::task::suspend_point tag) {
auto waitTask = make_waiting_task([tag, &exceptPtr](std::exception_ptr const* iExcept) {
if (iExcept) {
exceptPtr = *iExcept;
}
oneapi::tbb::task::resume(tag);
});
iFunc(WaitingTaskHolder(group, waitTask));
}); //suspend
}); //group.run
FinalWaitingTask last{group};
group.run([&]() { iFunc(WaitingTaskHolder(group, &last)); }); //group.run

group.wait();
return exceptPtr;
return last.waitNoThrow();
}
} // namespace edm
#endif /* FWCore_Concurrency_syncWait_h */
9 changes: 3 additions & 6 deletions FWCore/Concurrency/src/SerialTaskQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//

// system include files
#include "oneapi/tbb/task.h"
#include "oneapi/tbb/task_group.h"

// user include files
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
Expand All @@ -30,11 +30,8 @@ SerialTaskQueue::~SerialTaskQueue() {
bool isTaskChosen = m_taskChosen;
if ((not isEmpty and not isPaused()) or isTaskChosen) {
oneapi::tbb::task_group g;
g.run([&g, this]() {
oneapi::tbb::task::suspend([&g, this](oneapi::tbb::task::suspend_point tag) {
push(g, [tag]() { oneapi::tbb::task::resume(tag); });
}); //suspend
}); //group run
tbb::task_handle last{g.defer([]() {})};
push(g, [&g, &last]() { g.run(std::move(last)); });
g.wait();
}
}
Expand Down

0 comments on commit af96ac6

Please sign in to comment.