Skip to content

Commit

Permalink
eliminate global sleep_check_state optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Jan 20, 2020
1 parent b51e3f4 commit d59e81a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 81 deletions.
2 changes: 1 addition & 1 deletion base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ function wait()
W = Workqueues[Threads.threadid()]
reftask = poptaskref(W)
result = try_yieldto(ensure_rescheduled, reftask)
Sys.isjsvm() || process_events()
process_events()
# return when we come out of the queue
return result
end
Expand Down
2 changes: 1 addition & 1 deletion src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ JL_DLLEXPORT int jl_process_events(void)
uv_loop_t *loop = jl_io_loop;
if (loop && (_threadedregion || ptls->tid == 0)) {
jl_gc_safepoint_(ptls);
if (jl_mutex_trylock(&jl_uv_mutex)) {
if (jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
loop->stop_flag = 0;
int r = uv_run(loop, UV_RUN_NOWAIT);
JL_UV_UNLOCK();
Expand Down
100 changes: 22 additions & 78 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,17 @@ extern "C" {

// thread sleep state

static int16_t sleep_check_state; // status of the multi-queue. possible values:

// no thread should be sleeping--there might be work in the multi-queue.
// thread should not be sleeping--it might need to do work.
static const int16_t not_sleeping = 0;

// it is acceptable for a thread to be sleeping if its sticky queue is empty.
// sleep_check_state == sleeping + 1 + tid means thread tid is checking the multi-queue
// to see if it is safe to transition to sleeping.
// it is acceptable for the thread to be sleeping.
static const int16_t sleeping = 1;

// invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending).
// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
// information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue.
// information: Observing global not-sleeping is NOT sufficient to ensure the target thread will subsequently inspect its local queue.
// information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons.
// information: When we transition global state from sleeping to not_sleeping, we also transition the local to not_sleeping to reduce repeat wakeup attempts.

JULIA_DEBUG_SLEEPWAKE(
uint64_t wakeup_enter;
Expand Down Expand Up @@ -224,43 +218,6 @@ static int multiq_check_empty(void)
}


static int sleep_check_now(int16_t tid)
{
while (1) {
int16_t state = jl_atomic_load(&sleep_check_state);
if (state > sleeping) {
// if some thread is already checking, the decision of that thread
// is correct for us also
do {
state = jl_atomic_load(&sleep_check_state);
} while (state > sleeping);
if (state == not_sleeping)
return 0;
}
else if (state == not_sleeping) {
int16_t checking_for_sleeping = sleeping + 1 + tid;
// transition from sleeping ==> checking
if (jl_atomic_bool_compare_exchange(&sleep_check_state, not_sleeping,
checking_for_sleeping)) {
if (multiq_check_empty()) {
// transition from checking ==> sleeping
if (jl_atomic_bool_compare_exchange(&sleep_check_state, checking_for_sleeping,
sleeping))
return 1;
}
else {
// transition from checking ==> not_sleeping
jl_atomic_store(&sleep_check_state, not_sleeping);
return 0;
}
}
continue;
}
assert(state == sleeping);
return 1;
}
}


// parallel task runtime
// ---
Expand All @@ -274,7 +231,6 @@ void jl_init_threadinginfra(void)
jl_ptls_t ptls = jl_get_ptls_states();
uv_mutex_init(&ptls->sleep_lock);
uv_cond_init(&ptls->wake_signal);
sleep_check_state = not_sleeping;
}


Expand Down Expand Up @@ -367,7 +323,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
JULIA_DEBUG_SLEEPWAKE( wakeup_enter = rdtscp() );
if (tid == self || tid == -1) {
// we're already awake, but make sure we'll exit uv_run
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
if (ptls->sleep_check_state != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping);
if (uvlock == system_self)
uv_stop(jl_global_event_loop());
Expand All @@ -385,20 +341,13 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
if (jl_atomic_load(&sleep_check_state) != not_sleeping) {
int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping);
if (state == sleeping) {
for (tid = 0; tid < jl_n_threads; tid++) {
if (sleep_check_state != not_sleeping)
return; // no point in trying to get everyone running since the work is already done
if (tid != self)
wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != system_self && jl_atomic_load(&jl_uv_mutex.owner) != 0)
wake_libuv();
}
for (tid = 0; tid < jl_n_threads; tid++) {
if (tid != self)
wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != system_self && jl_atomic_load(&jl_uv_mutex.owner) != 0)
wake_libuv();
}
JULIA_DEBUG_SLEEPWAKE( wakeup_leave = rdtscp() );
}
Expand All @@ -416,14 +365,12 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q)
return task;
}
jl_gc_safepoint();
if (jl_atomic_load(&sleep_check_state) != sleeping)
return multiq_deletemin();
return NULL;
return multiq_deletemin();
}

static int may_sleep(jl_ptls_t ptls)
{
return jl_atomic_load(&sleep_check_state) == sleeping && jl_atomic_load(&ptls->sleep_check_state) == sleeping;
return ptls->sleep_check_state == sleeping;
}

extern volatile unsigned _threadedregion;
Expand All @@ -448,12 +395,15 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)

jl_cpu_pause();
if (sleep_check_after_threshold(&start_cycles) || (!_threadedregion && ptls->tid == 0)) {
if (!sleep_check_now(ptls->tid))
continue;
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
if (!multiq_check_empty()) {
if (ptls->sleep_check_state != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
continue;
}
task = get_next_task(trypoptask, q);
if (task) {
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
if (ptls->sleep_check_state != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
return task;
}
Expand All @@ -469,7 +419,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
}
else if (ptls->tid == 0) {
uvlock = 1;
JL_UV_LOCK();
JL_UV_LOCK(); // jl_mutex_lock(&jl_uv_mutex);
}
if (uvlock) {
int active = 1;
Expand All @@ -491,7 +441,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do
if (!may_sleep(ptls)) {
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
if (ptls->sleep_check_state != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
continue;
Expand All @@ -507,7 +457,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
if (!_threadedregion && active && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
if (ptls->sleep_check_state != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
continue;
Expand All @@ -522,22 +472,16 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
// TODO: help with gc work here, if applicable
}
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
if (ptls->sleep_check_state != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = rdtscp() );
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
start_cycles = 0;
}
else {
#ifndef JL_HAVE_ASYNCIFY
// maybe check the kernel for new messages too
if (jl_atomic_load(&jl_uv_n_waiters) == 0)
jl_process_events();
#else
// Yield back to browser event loop
return ptls->root_task;
#endif
jl_process_events();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/threads.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

let cmd = `$(Base.julia_cmd()) --depwarn=error --startup-file=no threads_exec.jl`
for test_nthreads in (1, 2, 8, 8) # run once to try single-threaded mode, then try a couple times to trigger bad races
for test_nthreads in (1, 2, 4, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races
run(pipeline(setenv(cmd, "JULIA_NUM_THREADS" => test_nthreads), stdout = stdout, stderr = stderr))
end
end
Expand Down

0 comments on commit d59e81a

Please sign in to comment.