From 8043d04ce0258c5516897c49ea94e0f8e4af3705 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Sat, 7 Oct 2023 18:30:01 -0400 Subject: [PATCH] Add heartbeat capability Presence is controlled by a build-time option. Start a separate thread which simply sleeps. When heartbeats are enabled, this thread wakes up at specified intervals to verify that user code is heartbeating as requested and if not, prints task backtraces. Also fixes the call to `maxthreadid` in `generate_precompile.jl`. --- contrib/generate_precompile.jl | 2 +- src/init.c | 4 + src/options.h | 3 + src/threading.c | 221 +++++++++++++++++++++++++++++++++ 4 files changed, 229 insertions(+), 1 deletion(-) diff --git a/contrib/generate_precompile.jl b/contrib/generate_precompile.jl index 82c9ea2aba037f..c476e2d5ef8308 100644 --- a/contrib/generate_precompile.jl +++ b/contrib/generate_precompile.jl @@ -4,7 +4,7 @@ @eval Module() begin if Threads.maxthreadid() != 1 - @warn "Running this file with multiple Julia threads may lead to a build error" Threads.maxthreadid() + @warn "Running this file with multiple Julia threads may lead to a build error" Base.Threads.maxthreadid() end if Base.isempty(Base.ARGS) || Base.ARGS[1] !== "0" diff --git a/src/init.c b/src/init.c index fef3e73bd2d06a..9616c0d6e07d94 100644 --- a/src/init.c +++ b/src/init.c @@ -837,6 +837,8 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel) _finish_julia_init(rel, ptls, ct); } +void jl_init_heartbeat(void); + static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_task_t *ct) { JL_TIMING(JULIA_INIT, JULIA_INIT); @@ -883,6 +885,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ } jl_start_threads(); + jl_init_heartbeat(); + jl_gc_enable(1); if (jl_options.image_file && (!jl_generating_output() || jl_options.incremental) && jl_module_init_order) { diff --git a/src/options.h b/src/options.h index b535d5ad4566f1..de20097a0b986d 100644 --- a/src/options.h +++ b/src/options.h @@ -138,6 +138,9 @@ #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 +// heartbeats +#define JL_HEARTBEAT_THREAD + // partr -- parallel tasks runtime options ------------------------------------ // multiq diff --git a/src/threading.c b/src/threading.c index e4735594ffaebb..ef6612eba37223 100644 --- a/src/threading.c +++ b/src/threading.c @@ -925,6 +925,227 @@ JL_DLLEXPORT int jl_alignment(size_t sz) return jl_gc_alignment(sz); } +// Heartbeat mechanism for Julia's task scheduler +// --- +// Start a thread that does not participate in running Julia's tasks. This +// thread simply sleeps until the heartbeat mechanism is enabled. When +// enabled, the heartbeat thread enters a loop in which it blocks waiting +// for the specified heartbeat interval. If, within that interval, +// `jl_heartbeat()` is *not* called at least once, then the thread calls +// `jl_print_task_backtraces(0)`. + +#ifdef JL_HEARTBEAT_THREAD + +#include + +volatile int heartbeat_enabled; +uv_sem_t heartbeat_on_sem, // jl_heartbeat_enable -> thread + heartbeat_off_sem; // thread -> jl_heartbeat_enable +int heartbeat_interval_s, + n_loss_reports, + reset_reporting_s; +int last_report_s, report_interval_s, n_reported; +_Atomic(int) heartbeats; + +JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT; +void jl_heartbeat_threadfun(void *arg); + +// start the heartbeat thread with heartbeats disabled +void jl_init_heartbeat(void) +{ + uv_thread_t uvtid; + heartbeat_enabled = 0; + uv_sem_init(&heartbeat_on_sem, 0); + uv_sem_init(&heartbeat_off_sem, 0); + uv_thread_create(&uvtid, jl_heartbeat_threadfun, NULL); + uv_thread_detach(&uvtid); +} + +// enable/disable heartbeats +// heartbeat_s: interval within which jl_heartbeat() must be called +// n_reports: for one heartbeat loss interval, how many times to report +// reset_reporting_after_s: how long to wait after a heartbeat loss +// interval and a return to steady heartbeats, before resetting +// reporting behavior +// +// When disabling heartbeats, the heartbeat thread must wake up, +// find out that heartbeats are now diabled, and reset. For now, we +// handle this by preventing re-enabling of heartbeats until this +// completes. +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports, + int reset_reporting_after_s) +{ + if (heartbeat_s <= 0) { + heartbeat_enabled = 0; + heartbeat_interval_s = n_loss_reports = reset_reporting_s = 0; + } + else { + // must disable before enabling + if (heartbeat_enabled) { + return -1; + } + // heartbeat thread must be ready + if (uv_sem_trywait(&heartbeat_off_sem) != 0) { + return -1; + } + + jl_atomic_store_relaxed(&heartbeats, 0); + heartbeat_interval_s = heartbeat_s; + n_loss_reports = n_reports; + reset_reporting_s = reset_reporting_after_s; + last_report_s = 0; + report_interval_s = heartbeat_interval_s; + heartbeat_enabled = 1; + uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread + } + return 0; +} + +// heartbeat +JL_DLLEXPORT void jl_heartbeat(void) +{ + jl_atomic_fetch_add(&heartbeats, 1); +} + +// sleep the thread for the specified interval +void sleep_for(int secs, int nsecs) +{ + struct timespec rqtp, rmtp; + rqtp.tv_sec = secs; + rqtp.tv_nsec = nsecs; + rmtp.tv_sec = 0; + rmtp.tv_nsec = 0; + for (; ;) { + // this suspends the thread so we aren't using CPU + if (nanosleep(&rqtp, &rmtp) == 0) { + return; + } + // TODO: else if (errno == EINTR) + // this could be SIGTERM and we should shutdown but how to find out? + rqtp = rmtp; + } +} + +// check for heartbeats and maybe report loss +uint8_t check_heartbeats(uint8_t gc_state) +{ + int hb = jl_atomic_exchange(&heartbeats, 0); + uint64_t curr_s = jl_hrtime() / 1e9; + + if (hb <= 0) { + // we didn't get a heartbeat in the last interval; should we report? + if (n_reported < n_loss_reports && + curr_s - last_report_s >= report_interval_s) { + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // exit GC-safe region to report then re-enter + jl_gc_safe_leave(ptls, gc_state); + jl_safe_printf("==== heartbeat loss ====\n"); + jl_print_task_backtraces(0); + gc_state = jl_gc_safe_enter(ptls); + + // we've reported + n_reported++; + + // record the reporting time _after_ the report + last_report_s = jl_hrtime() / 1e9; + + // double the reporting interval up to a maximum + if (report_interval_s < 60 * heartbeat_interval_s) { + report_interval_s *= 2; + } + } + // no heartbeats, don't change reporting state + return gc_state; + } + else { + // we got a heartbeat; reset the report count + n_reported = 0; + } + + // reset the reporting interval only once we're steadily getting + // heartbeats for the requested reset interval + if (curr_s - reset_reporting_s > last_report_s) { + report_interval_s = heartbeat_interval_s; + } + + return gc_state; +} + +// heartbeat thread function +void jl_heartbeat_threadfun(void *arg) +{ + int s, ns = 1e9 - 1, rs; + uint64_t t0, tchb; + + // We need a TLS because backtraces are accumulated into ptls->bt_size + // and ptls->bt_data, so we need to call jl_adopt_thread(). + jl_adopt_thread(); + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // Don't hold up GC, this thread doesn't participate. + uint8_t gc_state = jl_gc_safe_enter(ptls); + + for (;;) { + if (!heartbeat_enabled) { + // post the off semaphore to indicate we're ready to enable + uv_sem_post(&heartbeat_off_sem); + + // sleep the thread here; this semaphore is posted in + // jl_heartbeat_enable() + uv_sem_wait(&heartbeat_on_sem); + + // Set the sleep duration. + s = heartbeat_interval_s - 1; + ns = 1e9 - 1; + continue; + } + + // heartbeat is enabled; sleep, waiting for the desired interval + sleep_for(s, ns); + + // if heartbeats were turned off while we were sleeping, reset + if (!heartbeat_enabled) { + continue; + } + + // check if any heartbeats have happened, report as appropriate + t0 = jl_hrtime(); + gc_state = check_heartbeats(gc_state); + tchb = jl_hrtime() - t0; + + // adjust the next sleep duration based on how long the heartbeat + // check took + rs = 1; + while (tchb > 1e9) { + rs++; + tchb -= 1e9; + } + s = heartbeat_interval_s - rs; + ns = 1e9 - tchb; + } +} + +#else // !JL_HEARTBEAT_THREAD + +void jl_init_heartbeat(void) +{ +} + +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports, + int reset_reporting_after_s) +{ + return -1; +} + +JL_DLLEXPORT void jl_heartbeat(void) +{ +} + +#endif // JL_HEARTBEAT_THREAD + #ifdef __cplusplus } #endif