Skip to content

Commit

Permalink
Use notify_one() in CallbackNotifier (#140)
Browse files Browse the repository at this point in the history
It is unclear why but for some reason `notify_all()` is causing futexes never to return in some situations. This occurs very frequently in CI and is also less frequently reproducible locally.

The typical stack trace for the blocked thread is shown below:

```cpp
Thread 6 (Thread 0x7f13ec84f700 (LWP 2823667) "pytest"):
#0  futex_wait (private=<optimized out>, expected=32765, futex_word=0x7ffd5186a874) at ../sysdeps/nptl/futex-internal.h:141
#1  futex_wait_simple (private=<optimized out>, expected=32765, futex_word=0x7ffd5186a874) at ../sysdeps/nptl/futex-internal.h:172
#2  __condvar_quiesce_and_switch_g1 (private=<optimized out>, g1index=<synthetic pointer>, wseq=<optimized out>, cond=0x7ffd5186a860) at pthread_cond_common.c:416
#3  __pthread_cond_broadcast (cond=0x7ffd5186a860) at pthread_cond_broadcast.c:73
#4  0x00007f140fe5f23c in ucxx::BaseDelayedSubmissionCollection<std::function<void ()> >::process() (this=0x560d0effafd0) at /repo/cpp/include/ucxx/delayed_submission.h:154
#5  0x00007f140fe5f399 in ucxx::DelayedSubmissionCollection::processPost (this=<optimized out>) at /repo/cpp/src/delayed_submission.cpp:84
#6  0x00007f140fe7ed71 in ucxx::WorkerProgressThread::progressUntilSync(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>) (progressFunction=..., stop=@0x560d0f6527f8: false, startCallback=..., startCallbackArg=<optimized out>, delayedSubmissionCollection=...) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/shared_ptr_base.h:1295
#7  0x00007f140fe7f3ee in std::__invoke_impl<void, void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> >(std::__invoke_other, void (*&&)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>&&, std::reference_wrapper<bool>&&, std::function<void (void*)>&&, void*&&, std::shared_ptr<ucxx::DelayedSubmissionCollection>&&) (__f=<optimized out>, __f=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/invoke.h:61
#8  std::__invoke<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> >(void (*&&)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>&&, std::reference_wrapper<bool>&&, std::function<void (void*)>&&, void*&&, std::shared_ptr<ucxx::DelayedSubmissionCollection>&&) (__fn=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/invoke.h:96
#9  std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > >::_M_invoke<0ul, 1ul, 2ul, 3ul, 4ul, 5ul>(std::_Index_tuple<0ul, 1ul, 2ul, 3ul, 4ul, 5ul>) (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:259
#10 std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > >::operator()() (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:266
#11 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > > >::_M_run() (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:211
#12 0x00007f140f92fe95 in std::execute_native_thread_routine (__p=<optimized out>) at ../../../../../libstdc++-v3/src/c++11/thread.cc:104
#13 0x00007f1412647609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#14 0x00007f1412412133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #140
  • Loading branch information
pentschev authored Dec 5, 2023
1 parent 5a83470 commit 197a0a9
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
10 changes: 5 additions & 5 deletions cpp/include/ucxx/utils/callback_notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ class CallbackNotifier {
* @brief Construct a thread-safe notification object
*
* Construct a thread-safe notification object which can signal
* release of some shared state with `set()` while other threads
* block on `wait()` until the shared state is released.
* release of some shared state with `set()` while a single thread
* blocks on `wait()` until the shared state is released.
*
* If libc is glibc and the version is older than 2.25, the
* implementation uses a spinlock otherwise it uses a condition
* variable.
*
* When C++-20 is the minimum supported version, it should use
* atomic.wait + notify_all.
* atomic.wait + notify_one.
*/
CallbackNotifier() : _flag{false} {};

Expand All @@ -42,8 +42,8 @@ class CallbackNotifier {
/**
* @brief Notify waiting threads that we are done and they can proceed
*
* Set the flag to true and notify others threads blocked by a call to `wait()`.
* See also `std::condition_variable::notify_all`.
* Set the flag to true and notify a single thread blocked by a call to `wait()`.
* See also `std::condition_variable::notify_one`.
*/
void set();

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/utils/callback_notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void CallbackNotifier::set()
// ordering.
_flag.store(true, std::memory_order_relaxed);
}
_conditionVariable.notify_all();
_conditionVariable.notify_one();
}
}

Expand Down

0 comments on commit 197a0a9

Please sign in to comment.