diff --git a/base/task.jl b/base/task.jl index c40c118d03bdc..54ae19e48a833 100644 --- a/base/task.jl +++ b/base/task.jl @@ -708,7 +708,7 @@ function wait() W = Workqueues[Threads.threadid()] reftask = poptaskref(W) result = try_yieldto(ensure_rescheduled, reftask) - Sys.isjsvm() || process_events() + process_events() # return when we come out of the queue return result end diff --git a/src/jl_uv.c b/src/jl_uv.c index fb9a708371b70..80a91efc38047 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -209,7 +209,7 @@ JL_DLLEXPORT int jl_process_events(void) uv_loop_t *loop = jl_io_loop; if (loop && (_threadedregion || ptls->tid == 0)) { jl_gc_safepoint_(ptls); - if (jl_mutex_trylock(&jl_uv_mutex)) { + if (jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) { loop->stop_flag = 0; int r = uv_run(loop, UV_RUN_NOWAIT); JL_UV_UNLOCK(); diff --git a/src/julia_internal.h b/src/julia_internal.h index 6ef048935b462..a58e1402157bc 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -91,6 +91,37 @@ void JL_UV_LOCK(void); extern "C" { #endif +//-------------------------------------------------- +// timers +// Returns time in nanosec +JL_DLLEXPORT uint64_t jl_hrtime(void); + +// number of cycles since power-on +static inline uint64_t cycleclock(void) +{ +#if defined(_CPU_X86_64_) + uint64_t low, high; + __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); + return (high << 32) | low; +#elif defined(_CPU_X86_) + int64_t ret; + __asm__ volatile("rdtsc" : "=A"(ret)); + return ret; +#elif defined(_CPU_AARCH64_) + // System timer of ARMv8 runs at a different frequency than the CPU's. + // The frequency is fixed, typically in the range 1-50MHz. It can be + // read at CNTFRQ special register. We assume the OS has set up + // the virtual timer properly. + int64_t virtual_timer_value; + __asm__ volatile("mrs %0, cntvct_el0" : "=r"(virtual_timer_value)); + return virtual_timer_value; +#else + #warning No cycleclock() definition for your platform + // copy from https://github.com/google/benchmark/blob/v1.5.0/src/cycleclock.h + return 0; +#endif +} + #include "timing.h" #ifdef _COMPILER_MICROSOFT_ @@ -829,11 +860,6 @@ void jl_push_excstack(jl_excstack_t **stack JL_REQUIRE_ROOTED_SLOT JL_ROOTING_AR jl_bt_element_t *bt_data, size_t bt_size); void jl_copy_excstack(jl_excstack_t *dest, jl_excstack_t *src) JL_NOTSAFEPOINT; -//-------------------------------------------------- -// timers -// Returns time in nanosec -JL_DLLEXPORT uint64_t jl_hrtime(void); - // congruential random number generator // for a small amount of thread-local randomness // we could just use libc:`rand()`, but we want to ensure this is fast diff --git a/src/julia_threads.h b/src/julia_threads.h index c44799a4ec8d7..9e3216c793894 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -7,9 +7,12 @@ #include // threading ------------------------------------------------------------------ -// WARNING: Threading support is incomplete and experimental -// Nonetheless, we define JL_THREAD and use it to give advanced notice to -// maintainers of what eventual threading support will change. +// JULIA_ENABLE_THREADING may be controlled by altering JULIA_THREADS in Make.user + +// When running into scheduler issues, this may help provide information on the +// sequence of events that led to the issue. Normally, it is empty. +//#define JULIA_DEBUG_SLEEPWAKE(x) x +#define JULIA_DEBUG_SLEEPWAKE(x) // Options for task switching algorithm (in order of preference): // JL_HAVE_ASM -- mostly setjmp @@ -221,6 +224,13 @@ struct _jl_tls_states_t { // Saved exception for previous external API call or NULL if cleared. // Access via jl_exception_occurred(). struct _jl_value_t *previous_exception; + + JULIA_DEBUG_SLEEPWAKE( + uint64_t uv_run_enter; + uint64_t uv_run_leave; + uint64_t sleep_enter; + uint64_t sleep_leave; + ) }; // Update codegen version in `ccall.cpp` after changing either `pause` or `wake` diff --git a/src/partr.c b/src/partr.c index 8c5b95a680417..6d521a177b4e4 100644 --- a/src/partr.c +++ b/src/partr.c @@ -17,16 +17,36 @@ extern "C" { // thread sleep state -static int16_t sleep_check_state; // status of the multi-queue. possible values: - -// no thread should be sleeping--there might be work in the multi-queue. +// thread should not be sleeping--it might need to do work. static const int16_t not_sleeping = 0; -// it is acceptable for a thread to be sleeping if its sticky queue is empty. -// sleep_check_state == sleeping + 1 + tid means thread tid is checking the multi-queue -// to see if it is safe to transition to sleeping. +// it is acceptable for the thread to be sleeping. static const int16_t sleeping = 1; +// invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending). +// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping. +// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it. +// information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue. +// information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons. + +JULIA_DEBUG_SLEEPWAKE( +uint64_t wakeup_enter; +uint64_t wakeup_leave; +uint64_t io_wakeup_enter; +uint64_t io_wakeup_leave; +); + + +JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT +{ + // Try to acquire the lock on this task. + int16_t was = task->tid; + if (was == tid) + return 1; + if (was == -1) + return jl_atomic_bool_compare_exchange(&task->tid, -1, tid); + return 0; +} // GC functions used extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, @@ -160,11 +180,9 @@ static inline jl_task_t *multiq_deletemin(void) return NULL; task = heaps[rn1].tasks[0]; - if (jl_atomic_load_acquire(&task->tid) != ptls->tid) { - if (jl_atomic_compare_exchange(&task->tid, -1, ptls->tid) != -1) { - jl_mutex_unlock_nogc(&heaps[rn1].lock); - goto retry; - } + if (!jl_set_task_tid(task, ptls->tid)) { + jl_mutex_unlock_nogc(&heaps[rn1].lock); + goto retry; } heaps[rn1].tasks[0] = heaps[rn1].tasks[--heaps[rn1].ntasks]; heaps[rn1].tasks[heaps[rn1].ntasks] = NULL; @@ -200,43 +218,6 @@ static int multiq_check_empty(void) } -static int sleep_check_now(int16_t tid) -{ - while (1) { - int16_t state = jl_atomic_load(&sleep_check_state); - if (state > sleeping) { - // if some thread is already checking, the decision of that thread - // is correct for us also - do { - state = jl_atomic_load(&sleep_check_state); - } while (state > sleeping); - if (state == not_sleeping) - return 0; - } - else if (state == not_sleeping) { - int16_t checking_for_sleeping = sleeping + 1 + tid; - // transition from sleeping ==> checking - if (jl_atomic_bool_compare_exchange(&sleep_check_state, not_sleeping, - checking_for_sleeping)) { - if (multiq_check_empty()) { - // transition from checking ==> sleeping - if (jl_atomic_bool_compare_exchange(&sleep_check_state, checking_for_sleeping, - sleeping)) - return 1; - } - else { - // transition from checking ==> not_sleeping - jl_atomic_store(&sleep_check_state, not_sleeping); - return 0; - } - } - continue; - } - assert(state == sleeping); - return 1; - } -} - // parallel task runtime // --- @@ -250,7 +231,6 @@ void jl_init_threadinginfra(void) jl_ptls_t ptls = jl_get_ptls_states(); uv_mutex_init(&ptls->sleep_lock); uv_cond_init(&ptls->wake_signal); - sleep_check_state = not_sleeping; } @@ -298,6 +278,7 @@ JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task) // sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1 static int sleep_check_after_threshold(uint64_t *start_cycles) { + JULIA_DEBUG_SLEEPWAKE( return 1 ); // hammer on the sleep/wake logic much harder if (!(*start_cycles)) { *start_cycles = jl_hrtime(); return 0; @@ -314,25 +295,36 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) static void wake_thread(int16_t tid) { jl_ptls_t other = jl_all_tls_states[tid]; - int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping); - if (state == sleeping) { - uv_mutex_lock(&other->sleep_lock); - uv_cond_signal(&other->wake_signal); - uv_mutex_unlock(&other->sleep_lock); + if (jl_atomic_load(&other->sleep_check_state) != not_sleeping) { + int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping); // prohibit it from sleeping + if (state == sleeping) { // see if it was possibly sleeping before now + uv_mutex_lock(&other->sleep_lock); + uv_cond_signal(&other->wake_signal); + uv_mutex_unlock(&other->sleep_lock); + } } } +static void wake_libuv(void) +{ + JULIA_DEBUG_SLEEPWAKE( io_wakeup_enter = cycleclock() ); + jl_wake_libuv(); + JULIA_DEBUG_SLEEPWAKE( io_wakeup_leave = cycleclock() ); +} + /* ensure thread tid is awake if necessary */ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) { jl_ptls_t ptls = jl_get_ptls_states(); + int16_t uvlock = jl_atomic_load(&jl_uv_mutex.owner); int16_t self = ptls->tid; unsigned long system_self = jl_all_tls_states[self]->system_id; - int16_t uvlock = jl_atomic_load_acquire(&jl_uv_mutex.owner); + JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() ); if (tid == self || tid == -1) { // we're already awake, but make sure we'll exit uv_run - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); + if (ptls->sleep_check_state != not_sleeping) + jl_atomic_store(&ptls->sleep_check_state, not_sleeping); if (uvlock == system_self) uv_stop(jl_global_event_loop()); } @@ -340,38 +332,27 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) // something added to the sticky-queue: notify that thread wake_thread(tid); // check if we need to notify uv_run too - if (uvlock != system_self) - jl_wake_libuv(); + unsigned long system_tid = jl_all_tls_states[tid]->system_id; + if (uvlock != system_self && jl_atomic_load(&jl_uv_mutex.owner) == system_tid) + wake_libuv(); } + // check if the other threads might be sleeping if (tid == -1) { - // check if the other threads might be sleeping - if (jl_atomic_load_acquire(&sleep_check_state) != not_sleeping) { - // something added to the multi-queue: notify all threads - // in the future, we might want to instead wake some fraction of threads, - // and let each of those wake additional threads if they find work - int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping); - if (state == sleeping) { - for (tid = 0; tid < jl_n_threads; tid++) - if (tid != self) - wake_thread(tid); - // check if we need to notify uv_run too - if (uvlock != system_self) - jl_wake_libuv(); - } + // something added to the multi-queue: notify all threads + // in the future, we might want to instead wake some fraction of threads, + // and let each of those wake additional threads if they find work + for (tid = 0; tid < jl_n_threads; tid++) { + if (tid != self) + wake_thread(tid); } + // check if we need to notify uv_run too + if (uvlock != system_self && jl_atomic_load(&jl_uv_mutex.owner) != 0) + wake_libuv(); } + JULIA_DEBUG_SLEEPWAKE( wakeup_leave = cycleclock() ); } -JL_DLLEXPORT void jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT -{ - // Try to acquire the lock on this task. - // If this fails, we'll check for that error later (in jl_switchto). - if (jl_atomic_load_acquire(&task->tid) != tid) { - jl_atomic_compare_exchange(&task->tid, -1, tid); - } -} - // get the next runnable task from the multiq static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) { @@ -389,7 +370,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) static int may_sleep(jl_ptls_t ptls) { - return jl_atomic_load(&sleep_check_state) == sleeping && jl_atomic_load(&ptls->sleep_check_state) == sleeping; + return ptls->sleep_check_state == sleeping; } extern volatile unsigned _threadedregion; @@ -414,12 +395,19 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) jl_cpu_pause(); if (sleep_check_after_threshold(&start_cycles) || (!_threadedregion && ptls->tid == 0)) { - if (!sleep_check_now(ptls->tid)) - continue; jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock + if (!multiq_check_empty()) { + if (ptls->sleep_check_state != not_sleeping) + jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + continue; + } task = get_next_task(trypoptask, q); - if (task) + if (task) { + if (ptls->sleep_check_state != not_sleeping) + jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us return task; + } + // one thread should win this race and watch the event loop // inside a threaded region, any thread can listen for IO messages, // although none are allowed to create new ones @@ -431,7 +419,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) } else if (ptls->tid == 0) { uvlock = 1; - JL_UV_LOCK(); + JL_UV_LOCK(); // jl_mutex_lock(&jl_uv_mutex); } if (uvlock) { int active = 1; @@ -446,11 +434,15 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) jl_gc_safepoint(); if (may_sleep(ptls)) { loop->stop_flag = 0; + JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() ); active = uv_run(loop, UV_RUN_ONCE); + JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() ); } JL_UV_UNLOCK(); // optimization: check again first if we may have work to do if (!may_sleep(ptls)) { + if (ptls->sleep_check_state != not_sleeping) + jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us start_cycles = 0; continue; } @@ -468,30 +460,31 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) if (!_threadedregion && active && ptls->tid == 0) { // thread 0 is the only thread permitted to run the event loop // so it needs to stay alive + if (ptls->sleep_check_state != not_sleeping) + jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us start_cycles = 0; continue; } } // the other threads will just wait for on signal to resume + JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() ); int8_t gc_state = jl_gc_safe_enter(ptls); uv_mutex_lock(&ptls->sleep_lock); while (may_sleep(ptls)) { uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock); + // TODO: help with gc work here, if applicable } + if (ptls->sleep_check_state != not_sleeping) + jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us uv_mutex_unlock(&ptls->sleep_lock); + JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() ); jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint start_cycles = 0; } else { -#ifndef JL_HAVE_ASYNCIFY // maybe check the kernel for new messages too - if (jl_atomic_load(&jl_uv_n_waiters) == 0) - jl_process_events(); -#else - // Yield back to browser event loop - return ptls->root_task; -#endif + jl_process_events(); } } } diff --git a/src/threading.c b/src/threading.c index 2ec6d9ddf2444..f94da39fde864 100644 --- a/src/threading.c +++ b/src/threading.c @@ -507,15 +507,11 @@ JL_DLLEXPORT void jl_threading_run(jl_value_t *func) args2[0] = schd_func; args2[1] = (jl_value_t*)t; jl_apply(args2, 2); - if (i == 1) { - // let threads know work is coming (optimistic) + if (i == 1 && nthreads > 2) { + // hint to threads that work is coming soon jl_wakeup_thread(-1); } } - if (nthreads > 2) { - // let threads know work is ready (guaranteed) - jl_wakeup_thread(-1); - } // join with all tasks JL_TRY { for (int i = 0; i < nthreads; i++) { diff --git a/src/timing.h b/src/timing.h index 6bba66bce13fb..d04aea1b69fbc 100644 --- a/src/timing.h +++ b/src/timing.h @@ -35,31 +35,6 @@ void jl_timing_block_stop(jl_timing_block_t *cur_block); #define JL_TIMING(owner) #else -// number of cycles since power-on -static inline uint64_t cycleclock(void) -{ -#if defined(_CPU_X86_64_) - uint64_t low, high; - __asm__ volatile("rdtsc" : "=a"(low), "=d"(high)); - return (high << 32) | low; -#elif defined(_CPU_X86_) - int64_t ret; - __asm__ volatile("rdtsc" : "=A"(ret)); - return ret; -#elif defined(_CPU_AARCH64_) - // System timer of ARMv8 runs at a different frequency than the CPU's. - // The frequency is fixed, typically in the range 1-50MHz. It can be - // read at CNTFRQ special register. We assume the OS has set up - // the virtual timer properly. - int64_t virtual_timer_value; - asm volatile("mrs %0, cntvct_el0" : "=r"(virtual_timer_value)); - return virtual_timer_value; -#else - #error No cycleclock() definition for your platform - // copy from https://github.com/google/benchmark/blob/v1.5.0/src/cycleclock.h -#endif -} - #define JL_TIMING_OWNERS \ X(ROOT), \ X(GC), \ diff --git a/test/threads.jl b/test/threads.jl index c555363cf9d62..3a3751d21a232 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -1,16 +1,8 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license -using Test, Base.Threads - -let p, cmd = `$(Base.julia_cmd()) --depwarn=error --startup-file=no threads_exec.jl` - # test both nthreads==1 and nthreads>1. spawn a process to test whichever - # case we are not running currently. - other_nthreads = nthreads() == 1 ? 4 : 1 - p = run(pipeline(setenv(cmd, "JULIA_NUM_THREADS" => other_nthreads), stdout = stdout, stderr = stderr), - wait = false) - include("threads_exec.jl") - if !success(p) - error("threads test failed with nthreads == $other_nthreads") +let cmd = `$(Base.julia_cmd()) --depwarn=error --startup-file=no threads_exec.jl` + for test_nthreads in (1, 2, 4, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races + run(pipeline(setenv(cmd, "JULIA_NUM_THREADS" => test_nthreads), stdout = stdout, stderr = stderr)) end end diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 848150a8e8b84..a34ba82b7a935 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -7,6 +7,26 @@ using Base.Threads: SpinLock # for cfunction_closure include("testenv.jl") +function killjob(d) + Core.print(Core.stderr, d) + if Sys.islinux() + SIGINFO = 10 + elseif Sys.isbsd() + SIGINFO = 29 + end + if @isdefined(SIGINFO) + ccall(:uv_kill, Cint, (Cint, Cint), getpid(), SIGINFO) + sleep(1) + end + ccall(:uv_kill, Cint, (Cint, Cint), getpid(), Base.SIGTERM) + nothing +end + +# set up a watchdog alarm for 20 minutes +# so that we can attempt to get a "friendly" backtrace if something gets stuck +# (expected test duration is about 18-180 seconds) +Timer(t -> killjob("KILLING BY THREAD TEST WATCHDOG\n"), 1200) + # threading constructs let a = zeros(Int, 2 * nthreads()) @@ -667,14 +687,11 @@ end # scheduling wake/sleep test (#32511) -let timeout = 300 # this test should take about 1-10 seconds - t = Timer(timeout) do t - ccall(:uv_kill, Cint, (Cint, Cint), getpid(), Base.SIGTERM) - end # set up a watchdog alarm +let t = Timer(t -> killjob("KILLING BY QUICK KILL WATCHDOG\n"), 600) # this test should take about 1-10 seconds for _ = 1:10^5 @threads for idx in 1:1024; #=nothing=# end end - close(t) # stop the watchdog + close(t) # stop the fast watchdog end # issue #32575