Skip to content

Commit

Permalink
Added collaborative_call_once and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Isaev <[email protected]>
  • Loading branch information
isaevil committed May 19, 2021
1 parent 3e9e8c9 commit a003969
Show file tree
Hide file tree
Showing 5 changed files with 524 additions and 6 deletions.
182 changes: 182 additions & 0 deletions include/oneapi/tbb/collaborative_call_once.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
Copyright (c) 2021 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef __TBB_collaborative_call_once_H
#define __TBB_collaborative_call_once_H

#include "tbb/task_arena.h"
#include "tbb/task_group.h"
#include "tbb/task.h"

#include <atomic>

namespace tbb {
namespace detail {
namespace d1 {

#if _MSC_VER && !defined(__INTEL_COMPILER)
// Suppress warning: structure was padded due to alignment specifier
#pragma warning (push)
#pragma warning (disable: 4324)
#endif

constexpr std::size_t bit_count = 7;

static_assert(1 << bit_count == max_nfs_size, "bit_count must be a log2(max_nfs_size)");

inline constexpr std::uintptr_t maskoff_pointer(std::uintptr_t ptr) {
return ptr & (std::size_t(-1) << bit_count);
}

class alignas(max_nfs_size) run_once {
task_arena my_arena;
wait_context my_wait_context;
task_group_context my_context;

std::atomic<std::int64_t> ref_count{0};

template <typename Func>
class call_once_delegate : public delegate_base {
public:
call_once_delegate(Func& f) : my_func(f) {}

bool operator()() const override {
my_func();
return true;
}

private:
Func& my_func;
};

template<typename Fn>
void isolated_execute(Fn f) {
call_once_delegate<Fn> delegate(f);

r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this));
}

public:
run_once()
: my_arena(task_arena::attach{})
, my_wait_context(0)
, my_context(task_group_context::bound,
task_group_context::default_traits | task_group_context::concurrent_wait)
{}

~run_once() {
spin_wait_while(ref_count, [&](std::int64_t value) { return value > 0; }, std::memory_order_acquire);
}

void increase_ref() { ref_count++; }

void decrease_ref() { ref_count--; }

template <typename F>
void run_and_wait(F&& f) {
my_arena.execute([&] {
isolated_execute([&] {
function_stack_task<F> t{ std::forward<F>(f), my_wait_context };
my_wait_context.reserve();

execute_and_wait(t, my_context, my_wait_context, my_context);
});
});
}

void wait() {
my_arena.execute([&] {
isolated_execute([&] {
// We do not want to get an exception from user functor on moonlighting threads.
// The exception is handled with the winner thread
task_group_context stub_context;
detail::d1::wait(my_wait_context, stub_context);
});
});
}

};

class collaborative_once_flag : no_copy {
enum state {uninitialized, done};
std::atomic<std::uintptr_t> my_state{ state::uninitialized };

template <typename Fn, typename... Args>
friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args);

template <typename Fn>
void do_collaborative_call_once(Fn&& f) {
std::uintptr_t expected = my_state.load(std::memory_order_acquire);
run_once local_runner;
while (expected != state::done) {
if (expected == state::uninitialized && my_state.compare_exchange_strong(expected, reinterpret_cast<std::uintptr_t>(&local_runner))) {
// winner
auto local_expected = reinterpret_cast<std::uintptr_t>(&local_runner);
try_call([&] {
local_runner.run_and_wait(std::forward<Fn>(f));
}).on_exception([&] {
while (!my_state.compare_exchange_strong(local_expected, state::uninitialized, std::memory_order_release, std::memory_order_relaxed)) {
local_expected = reinterpret_cast<std::uintptr_t>(&local_runner);
}
});

while (!my_state.compare_exchange_strong(local_expected, state::done, std::memory_order_release, std::memory_order_relaxed)) {
local_expected = reinterpret_cast<std::uintptr_t>(&local_runner);
}
return;
} else {
// moonlighting thread
do {
auto max_value = expected | (max_nfs_size-1);
expected = spin_wait_while_eq(my_state, max_value);
// "expected > state::done" prevents storing values, when state is uninitialized
} while (expected > state::done && !my_state.compare_exchange_strong(expected, expected + 1));

if (auto runner = reinterpret_cast<run_once*>(maskoff_pointer(expected))) {

runner->increase_ref();
my_state.fetch_sub(1);

// The moonlighting threads are not expected to handle exceptions from user functor.
// Therefore, no exception is expected from wait().
[runner] () noexcept { runner->wait(); }();

runner->decrease_ref();
}
}
}
}
};


template <typename Fn, typename... Args>
void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) {
auto func = [&] { fn(std::forward<Args>(args)...); };
flag.do_collaborative_call_once(func);
}

#if _MSC_VER && !defined(__INTEL_COMPILER)
#pragma warning (pop) // 4324 warning
#endif

} // namespace d1
} // namespace detail

using detail::d1::collaborative_call_once;
using detail::d1::collaborative_once_flag;
} // namespace tbb

#endif // __TBB_collaborative_call_once_H
15 changes: 9 additions & 6 deletions include/oneapi/tbb/detail/_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,28 @@ class atomic_backoff {
//! Spin WHILE the condition is true.
/** T and U should be comparable types. */
template <typename T, typename C>
void spin_wait_while(const std::atomic<T>& location, C comp, std::memory_order order) {
T spin_wait_while(const std::atomic<T>& location, C comp, std::memory_order order) {
atomic_backoff backoff;
while (comp(location.load(order))) {
T snapshot = location.load(order);
while (comp(snapshot)) {
backoff.pause();
snapshot = location.load(order);
}
return snapshot;
}

//! Spin WHILE the value of the variable is equal to a given value
/** T and U should be comparable types. */
template <typename T, typename U>
void spin_wait_while_eq(const std::atomic<T>& location, const U value, std::memory_order order = std::memory_order_acquire) {
spin_wait_while(location, [&value](T t) { return t == value; }, order);
T spin_wait_while_eq(const std::atomic<T>& location, const U value, std::memory_order order = std::memory_order_acquire) {
return spin_wait_while(location, [&value](T t) { return t == value; }, order);
}

//! Spin UNTIL the value of the variable is equal to a given value
/** T and U should be comparable types. */
template<typename T, typename U>
void spin_wait_until_eq(const std::atomic<T>& location, const U value, std::memory_order order = std::memory_order_acquire) {
spin_wait_while(location, [&value](T t) { return t != value; }, order);
T spin_wait_until_eq(const std::atomic<T>& location, const U value, std::memory_order order = std::memory_order_acquire) {
return spin_wait_while(location, [&value](T t) { return t != value; }, order);
}

//! Spin UNTIL the condition returns true or spinning time is up.
Expand Down
14 changes: 14 additions & 0 deletions include/tbb/collaborative_call_once.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
Copyright (c) 2021 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "../oneapi/tbb/collaborative_call_once.h"
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ tbb_add_test(SUBDIR tbb NAME test_tick_count DEPENDENCIES TBB::tbb)
tbb_add_test(SUBDIR tbb NAME test_allocators DEPENDENCIES TBB::tbb)
tbb_add_test(SUBDIR tbb NAME test_arena_priorities DEPENDENCIES TBB::tbb)
tbb_add_test(SUBDIR tbb NAME test_dynamic_link DEPENDENCIES TBB::tbb)
tbb_add_test(SUBDIR tbb NAME test_collaborative_call_once DEPENDENCIES TBB::tbb)
tbb_add_test(SUBDIR tbb NAME test_concurrent_lru_cache DEPENDENCIES TBB::tbb)
tbb_add_test(SUBDIR tbb NAME test_concurrent_unordered_map DEPENDENCIES TBB::tbb)
tbb_add_test(SUBDIR tbb NAME test_concurrent_unordered_set DEPENDENCIES TBB::tbb)
Expand Down
Loading

0 comments on commit a003969

Please sign in to comment.