Skip to content

Commit

Permalink
add try/catch around scheduler to reset sleep state (#54721)
Browse files Browse the repository at this point in the history
Fixes #54700

Mostly just an indentation change, so recommend viewing with whitespace
hidden (or if backporting).

(cherry picked from commit b1e5a86)
  • Loading branch information
vtjnash committed Jun 13, 2024
1 parent df1b38d commit 038e666
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 122 deletions.
4 changes: 2 additions & 2 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -2339,8 +2339,8 @@ extern int had_exception;
__eh_ct = jl_current_task; \
size_t __excstack_state = jl_excstack_state(__eh_ct); \
jl_enter_handler(__eh_ct, &__eh); \
if (1)
/* TRY BLOCK; */
for (i__try=1; i__try; i__try=0)

#define JL_CATCH \
if (!had_exception) \
jl_eh_restore_state_noexcept(__eh_ct, &__eh); \
Expand Down
251 changes: 131 additions & 120 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,140 +459,151 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}
continue;
}
task = get_next_task(trypoptask, q); // note: this should not yield
if (ptls != ct->ptls) {
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
ptls = ct->ptls;
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
volatile int isrunning = 1;
JL_TRY {
task = get_next_task(trypoptask, q); // note: this should not yield
if (ptls != ct->ptls) {
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
ptls = ct->ptls;
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
continue; // jump to JL_CATCH
}
if (task)
return task;
continue;
}
if (task) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
if (task) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
continue; // jump to JL_CATCH
}
return task;
}

// IO is always permitted, but outside a threaded region, only
// thread 0 will process messages.
// Inside a threaded region, any thread can listen for IO messages,
// and one thread should win this race and watch the event loop,
// but we bias away from idle threads getting parked here.
//
// The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]:
// - After decrementing _threadedregion, the thread is required to
// call jl_wakeup_thread(0), that will kick out any thread who is
// already there, and then eventually thread 0 will get here.
// - Inside a _threadedregion, there must exist at least one
// thread that has a happens-before relationship on the libuv lock
// before reaching this decision point in the code who will see
// the lock as unlocked and thus must win this race here.
int uvlock = 0;
if (jl_atomic_load_relaxed(&_threadedregion)) {
uvlock = jl_mutex_trylock(&jl_uv_mutex);
}
else if (ptls->tid == 0) {
uvlock = 1;
JL_UV_LOCK();
}
else {
// Since we might have started some IO work, we might need
// to ensure tid = 0 will go watch that new event source.
// If trylock would have succeeded, that may have been our
// responsibility, so need to make sure thread 0 will take care
// of us.
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
jl_wakeup_thread(0);
}
if (uvlock) {
int enter_eventloop = may_sleep(ptls);
int active = 0;
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
// if we won the race against someone who actually needs
// the lock to do real work, we need to let them have it instead
enter_eventloop = 0;
if (enter_eventloop) {
uv_loop_t *loop = jl_global_event_loop();
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
jl_gc_safepoint();
// IO is always permitted, but outside a threaded region, only
// thread 0 will process messages.
// Inside a threaded region, any thread can listen for IO messages,
// and one thread should win this race and watch the event loop,
// but we bias away from idle threads getting parked here.
//
// The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]:
// - After decrementing _threadedregion, the thread is required to
// call jl_wakeup_thread(0), that will kick out any thread who is
// already there, and then eventually thread 0 will get here.
// - Inside a _threadedregion, there must exist at least one
// thread that has a happens-before relationship on the libuv lock
// before reaching this decision point in the code who will see
// the lock as unlocked and thus must win this race here.
int uvlock = 0;
if (jl_atomic_load_relaxed(&_threadedregion)) {
uvlock = jl_mutex_trylock(&jl_uv_mutex);
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do.
// Otherwise we got a spurious wakeup since some other thread
// that just wanted to steal libuv from us. We will just go
// right back to sleep on the individual wake signal to let
// them take it from us without conflict.
if (active || !may_sleep(ptls)) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
else if (ptls->tid == 0) {
uvlock = 1;
JL_UV_LOCK();
}
else {
// Since we might have started some IO work, we might need
// to ensure tid = 0 will go watch that new event source.
// If trylock would have succeeded, that may have been our
// responsibility, so need to make sure thread 0 will take care
// of us.
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
jl_wakeup_thread(0);
}
if (uvlock) {
int enter_eventloop = may_sleep(ptls);
int active = 0;
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
// if we won the race against someone who actually needs
// the lock to do real work, we need to let them have it instead
enter_eventloop = 0;
if (enter_eventloop) {
uv_loop_t *loop = jl_global_event_loop();
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
jl_gc_safepoint();
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do.
// Otherwise we got a spurious wakeup since some other thread
// that just wanted to steal libuv from us. We will just go
// right back to sleep on the individual wake signal to let
// them take it from us without conflict.
if (active || !may_sleep(ptls)) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue; // jump to JL_CATCH
}
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive, just spin-looping if necessary
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue; // jump to JL_CATCH
}
start_cycles = 0;
continue;
}
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive, just spin-looping if necessary
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);

// any thread which wants us running again will have to observe
// sleep_check_state==sleeping and increment nrunning for us
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
assert(wasrunning);
isrunning = 0;
if (wasrunning == 1) {
// This was the last running thread, and there is no thread with !may_sleep
// so make sure tid 0 is notified to check wait_empty
// TODO: this also might be a good time to check again that
// libuv's queue is truly empty, instead of during delete_thread
if (ptls->tid != 0) {
uv_mutex_lock(&ptls->sleep_lock);
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
uv_mutex_unlock(&ptls->sleep_lock);
}
start_cycles = 0;
continue;
}
}

// any thread which wants us running again will have to observe
// sleep_check_state==sleeping and increment nrunning for us
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
assert(wasrunning);
if (wasrunning == 1) {
// This was the last running thread, and there is no thread with !may_sleep
// so make sure tid 0 is notified to check wait_empty
// TODO: this also might be a good time to check again that
// libuv's queue is truly empty, instead of during delete_thread
if (ptls->tid != 0) {
uv_mutex_lock(&ptls->sleep_lock);
// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
task = wait_empty;
if (ptls->tid == 0 && task && jl_atomic_load_relaxed(&nrunning) == 0) {
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(!wasrunning);
wasrunning = !set_not_sleeping(ptls);
assert(!wasrunning);
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
if (!ptls->finalizers_inhibited)
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
break;
}
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
uv_mutex_unlock(&ptls->sleep_lock);
}
}

// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
task = wait_empty;
if (ptls->tid == 0 && task && jl_atomic_load_relaxed(&nrunning) == 0) {
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(!wasrunning);
wasrunning = !set_not_sleeping(ptls);
assert(!wasrunning);
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
if (!ptls->finalizers_inhibited)
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
break;
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&nrunning));
start_cycles = 0;
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
if (task) {
assert(task == wait_empty);
wait_empty = NULL;
continue;
}
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&nrunning));
start_cycles = 0;
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
if (task) {
assert(task == wait_empty);
wait_empty = NULL;
return task;
JL_CATCH {
// probably SIGINT, but possibly a user mistake in trypoptask
if (!isrunning)
jl_atomic_fetch_add_relaxed(&nrunning, 1);
set_not_sleeping(ptls);
jl_rethrow();
}
if (task)
return task;
}
else {
// maybe check the kernel for new messages too
Expand Down

0 comments on commit 038e666

Please sign in to comment.