From 197a0a9b73dfeebec53f2ec519ceab42e169a946 Mon Sep 17 00:00:00 2001
From: Peter Andreas Entschev <peter@entschev.com>
Date: Tue, 5 Dec 2023 13:17:23 +0100
Subject: [PATCH] Use `notify_one()` in `CallbackNotifier` (#140)

It is unclear why but for some reason `notify_all()` is causing futexes never to return in some situations. This occurs very frequently in CI and is also less frequently reproducible locally.

The typical stack trace for the blocked thread is shown below:

```cpp
Thread 6 (Thread 0x7f13ec84f700 (LWP 2823667) "pytest"):
#0  futex_wait (private=<optimized out>, expected=32765, futex_word=0x7ffd5186a874) at ../sysdeps/nptl/futex-internal.h:141
#1  futex_wait_simple (private=<optimized out>, expected=32765, futex_word=0x7ffd5186a874) at ../sysdeps/nptl/futex-internal.h:172
#2  __condvar_quiesce_and_switch_g1 (private=<optimized out>, g1index=<synthetic pointer>, wseq=<optimized out>, cond=0x7ffd5186a860) at pthread_cond_common.c:416
#3  __pthread_cond_broadcast (cond=0x7ffd5186a860) at pthread_cond_broadcast.c:73
#4  0x00007f140fe5f23c in ucxx::BaseDelayedSubmissionCollection<std::function<void ()> >::process() (this=0x560d0effafd0) at /repo/cpp/include/ucxx/delayed_submission.h:154
#5  0x00007f140fe5f399 in ucxx::DelayedSubmissionCollection::processPost (this=<optimized out>) at /repo/cpp/src/delayed_submission.cpp:84
#6  0x00007f140fe7ed71 in ucxx::WorkerProgressThread::progressUntilSync(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>) (progressFunction=..., stop=@0x560d0f6527f8: false, startCallback=..., startCallbackArg=<optimized out>, delayedSubmissionCollection=...) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/shared_ptr_base.h:1295
#7  0x00007f140fe7f3ee in std::__invoke_impl<void, void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> >(std::__invoke_other, void (*&&)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>&&, std::reference_wrapper<bool>&&, std::function<void (void*)>&&, void*&&, std::shared_ptr<ucxx::DelayedSubmissionCollection>&&) (__f=<optimized out>, __f=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/invoke.h:61
#8  std::__invoke<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> >(void (*&&)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>&&, std::reference_wrapper<bool>&&, std::function<void (void*)>&&, void*&&, std::shared_ptr<ucxx::DelayedSubmissionCollection>&&) (__fn=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/invoke.h:96
#9  std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > >::_M_invoke<0ul, 1ul, 2ul, 3ul, 4ul, 5ul>(std::_Index_tuple<0ul, 1ul, 2ul, 3ul, 4ul, 5ul>) (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:259
#10 std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > >::operator()() (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:266
#11 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::function<bool ()>, bool const&, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection>), std::function<bool ()>, std::reference_wrapper<bool>, std::function<void (void*)>, void*, std::shared_ptr<ucxx::DelayedSubmissionCollection> > > >::_M_run() (this=<optimized out>) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/bits/std_thread.h:211
#12 0x00007f140f92fe95 in std::execute_native_thread_routine (__p=<optimized out>) at ../../../../../libstdc++-v3/src/c++11/thread.cc:104
#13 0x00007f1412647609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#14 0x00007f1412412133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: https://github.com/rapidsai/ucxx/pull/140
---
 cpp/include/ucxx/utils/callback_notifier.h | 10 +++++-----
 cpp/src/utils/callback_notifier.cpp        |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/cpp/include/ucxx/utils/callback_notifier.h b/cpp/include/ucxx/utils/callback_notifier.h
index 4c4e1cd4..ea5bea61 100644
--- a/cpp/include/ucxx/utils/callback_notifier.h
+++ b/cpp/include/ucxx/utils/callback_notifier.h
@@ -20,15 +20,15 @@ class CallbackNotifier {
    * @brief Construct a thread-safe notification object
    *
    * Construct a thread-safe notification object which can signal
-   * release of some shared state with `set()` while other threads
-   * block on `wait()` until the shared state is released.
+   * release of some shared state with `set()` while a single thread
+   * blocks on `wait()` until the shared state is released.
    *
    * If libc is glibc and the version is older than 2.25, the
    * implementation uses a spinlock otherwise it uses a condition
    * variable.
    *
    * When C++-20 is the minimum supported version, it should use
-   * atomic.wait + notify_all.
+   * atomic.wait + notify_one.
    */
   CallbackNotifier() : _flag{false} {};
 
@@ -42,8 +42,8 @@ class CallbackNotifier {
   /**
    * @brief Notify waiting threads that we are done and they can proceed
    *
-   * Set the flag to true and notify others threads blocked by a call to `wait()`.
-   * See also `std::condition_variable::notify_all`.
+   * Set the flag to true and notify a single thread blocked by a call to `wait()`.
+   * See also `std::condition_variable::notify_one`.
    */
   void set();
 
diff --git a/cpp/src/utils/callback_notifier.cpp b/cpp/src/utils/callback_notifier.cpp
index 6a786816..2d5812f9 100644
--- a/cpp/src/utils/callback_notifier.cpp
+++ b/cpp/src/utils/callback_notifier.cpp
@@ -46,7 +46,7 @@ void CallbackNotifier::set()
       // ordering.
       _flag.store(true, std::memory_order_relaxed);
     }
-    _conditionVariable.notify_all();
+    _conditionVariable.notify_one();
   }
 }