Skip to content

Commit

Permalink
scheduler: use explicit memory fences
Browse files Browse the repository at this point in the history
These were previously implied (on TSO platforms, such as x86) by the
atomic_store to sleeping and the sleep_locks acquire before the
wake_check loop, but this makes it more explicit. We might want to
consider in the future if it would be better (faster) to acquire each
possible lock on the sleeping path instead, so that we do each operation
with seq_cst, instead of using a fence to only order the operations we
care about directly.
  • Loading branch information
vtjnash committed Dec 13, 2021
1 parent 52300c2 commit 6bba175
Showing 1 changed file with 50 additions and 25 deletions.
75 changes: 50 additions & 25 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,16 +348,20 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
}


static void wake_thread(int16_t tid)
static int wake_thread(int16_t tid)
{
jl_ptls_t other = jl_all_tls_states[tid];
int8_t state = sleeping;
jl_atomic_cmpswap(&other->sleep_check_state, &state, not_sleeping);
if (state == sleeping) {
uv_mutex_lock(&sleep_locks[tid]);
uv_cond_signal(&wake_signals[tid]);
uv_mutex_unlock(&sleep_locks[tid]);

if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
uv_mutex_lock(&sleep_locks[tid]);
uv_cond_signal(&wake_signals[tid]);
uv_mutex_unlock(&sleep_locks[tid]);
return 1;
}
}
return 0;
}


Expand All @@ -372,37 +376,48 @@ static void wake_libuv(void)
JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
{
jl_task_t *ct = jl_current_task;
jl_ptls_t ptls = ct->ptls;
jl_task_t *uvlock = jl_atomic_load(&jl_uv_mutex.owner);
int16_t self = jl_atomic_load_relaxed(&ct->tid);
if (tid != self)
jl_fence(); // ensures sleep_check_state is sequentially-consistent with enqueuing the task
jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner);
JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() );
if (tid == self || tid == -1) {
// we're already awake, but make sure we'll exit uv_run
jl_ptls_t ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping);
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping);
if (uvlock == ct)
uv_stop(jl_global_event_loop());
}
else {
// something added to the sticky-queue: notify that thread
wake_thread(tid);
// check if we need to notify uv_run too
jl_task_t *system_tid = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task);
if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) == system_tid)
wake_libuv();
if (wake_thread(tid)) {
// check if we need to notify uv_run too
jl_fence();
jl_task_t *tid_task = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task);
// now that we have changed the thread to not-sleeping, ensure that
// either it has not yet acquired the libuv lock, or that it will
// observe the change of state to not_sleeping
if (uvlock != ct && jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task)
wake_libuv();
}
}
// check if the other threads might be sleeping
if (tid == -1) {
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
int anysleep = 0;
for (tid = 0; tid < jl_n_threads; tid++) {
if (tid != self)
wake_thread(tid);
anysleep |= wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) != NULL)
wake_libuv();
if (uvlock != ct && anysleep) {
jl_fence();
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL)
wake_libuv();
}
}
JULIA_DEBUG_SLEEPWAKE( wakeup_leave = cycleclock() );
}
Expand All @@ -426,7 +441,9 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
// sleep_check_state is only transitioned from not_sleeping to sleeping
// by the thread itself. As a result, if this returns false, it will
// continue returning false. If it returns true, there are no guarantees.
// continue returning false. If it returns true, we know the total
// modification order of the fences.
jl_fence();
return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
}

Expand All @@ -452,18 +469,26 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
jl_cpu_pause();
jl_ptls_t ptls = ct->ptls;
if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) {
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
if (!multiq_check_empty()) {
// acquire sleep-check lock
jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
jl_fence(); // add a sequenced-before edge between the sleep_check_state modify and get_next_task checks
if (!multiq_check_empty()) { // uses relaxed loads
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
continue;
}
task = get_next_task(trypoptask, q); // WARNING: this should not yield
if (ptls != ct->ptls)
continue; // oops, get_next_task did yield--start over
if (ptls != ct->ptls) {
ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (task)
return task;
continue;
}
if (task) {
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
return task;
}

Expand Down Expand Up @@ -519,7 +544,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
continue;
}
Expand Down

0 comments on commit 6bba175

Please sign in to comment.