From 9cd802fb336d44868c954321e148cd60658a02a1 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Sat, 7 Oct 2023 18:30:01 -0400 Subject: [PATCH] Add heartbeat capability Presence is controlled by a build-time option. Start a separate thread which simply sleeps. When heartbeats are enabled, this thread wakes up at specified intervals to verify that user code is heartbeating as requested and if not, prints task backtraces. Also fixes the call to `maxthreadid` in `generate_precompile.jl`. --- contrib/generate_precompile.jl | 2 +- src/init.c | 4 + src/options.h | 3 + src/threading.c | 221 +++++++++++++++++++++++++++++++++ 4 files changed, 229 insertions(+), 1 deletion(-) diff --git a/contrib/generate_precompile.jl b/contrib/generate_precompile.jl index 11a9c1b552061..a1b44201889e7 100644 --- a/contrib/generate_precompile.jl +++ b/contrib/generate_precompile.jl @@ -1,7 +1,7 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license if Threads.maxthreadid() != 1 - @warn "Running this file with multiple Julia threads may lead to a build error" Base.maxthreadid() + @warn "Running this file with multiple Julia threads may lead to a build error" Base.Threads.maxthreadid() end if Base.isempty(Base.ARGS) || Base.ARGS[1] !== "0" diff --git a/src/init.c b/src/init.c index 0ab6d59e6ec6a..ea344fb9e1e62 100644 --- a/src/init.c +++ b/src/init.c @@ -804,6 +804,8 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel) _finish_julia_init(rel, ptls, ct); } +void jl_init_heartbeat(void); + static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_task_t *ct) { jl_resolve_sysimg_location(rel); @@ -844,6 +846,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ } jl_start_threads(); + jl_init_heartbeat(); + jl_gc_enable(1); if (jl_options.image_file && (!jl_generating_output() || jl_options.incremental) && jl_module_init_order) { diff --git a/src/options.h b/src/options.h index 0e6603c59d682..06af3e33fcbdc 100644 --- a/src/options.h +++ b/src/options.h @@ -141,6 +141,9 @@ #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 +// heartbeats +#define JL_HEARTBEAT_THREAD + // partr -- parallel tasks runtime options ------------------------------------ // multiq diff --git a/src/threading.c b/src/threading.c index 540bcc33bd74a..2dbca51e2e5da 100644 --- a/src/threading.c +++ b/src/threading.c @@ -848,6 +848,227 @@ JL_DLLEXPORT int jl_alignment(size_t sz) return jl_gc_alignment(sz); } +// Heartbeat mechanism for Julia's task scheduler +// --- +// Start a thread that does not participate in running Julia's tasks. This +// thread simply sleeps until the heartbeat mechanism is enabled. When +// enabled, the heartbeat thread enters a loop in which it blocks waiting +// for the specified heartbeat interval. If, within that interval, +// `jl_heartbeat()` is *not* called at least once, then the thread calls +// `jl_print_task_backtraces(0)`. + +#ifdef JL_HEARTBEAT_THREAD + +#include + +volatile int heartbeat_enabled; +uv_sem_t heartbeat_on_sem, // jl_heartbeat_enable -> thread + heartbeat_off_sem; // thread -> jl_heartbeat_enable +int heartbeat_interval_s, + n_loss_reports, + reset_reporting_s; +int last_report_s, report_interval_s, n_reported; +_Atomic(int) heartbeats; + +JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT; +void jl_heartbeat_threadfun(void *arg); + +// start the heartbeat thread with heartbeats disabled +void jl_init_heartbeat(void) +{ + uv_thread_t uvtid; + heartbeat_enabled = 0; + uv_sem_init(&heartbeat_on_sem, 0); + uv_sem_init(&heartbeat_off_sem, 0); + uv_thread_create(&uvtid, jl_heartbeat_threadfun, NULL); + uv_thread_detach(&uvtid); +} + +// enable/disable heartbeats +// heartbeat_s: interval within which jl_heartbeat() must be called +// n_reports: for one heartbeat loss interval, how many times to report +// reset_reporting_after_s: how long to wait after a heartbeat loss +// interval and a return to steady heartbeats, before resetting +// reporting behavior +// +// When disabling heartbeats, the heartbeat thread must wake up, +// find out that heartbeats are now diabled, and reset. For now, we +// handle this by preventing re-enabling of heartbeats until this +// completes. +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports, + int reset_reporting_after_s) +{ + if (heartbeat_s <= 0) { + heartbeat_enabled = 0; + heartbeat_interval_s = n_loss_reports = reset_reporting_s = 0; + } + else { + // must disable before enabling + if (heartbeat_enabled) { + return -1; + } + // heartbeat thread must be ready + if (uv_sem_trywait(&heartbeat_off_sem) != 0) { + return -1; + } + + jl_atomic_store_relaxed(&heartbeats, 0); + heartbeat_interval_s = heartbeat_s; + n_loss_reports = n_reports; + reset_reporting_s = reset_reporting_after_s; + last_report_s = 0; + report_interval_s = heartbeat_interval_s; + heartbeat_enabled = 1; + uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread + } + return 0; +} + +// heartbeat +JL_DLLEXPORT void jl_heartbeat(void) +{ + jl_atomic_fetch_add(&heartbeats, 1); +} + +// sleep the thread for the specified interval +void sleep_for(int secs, int nsecs) +{ + struct timespec rqtp, rmtp; + rqtp.tv_sec = secs; + rqtp.tv_nsec = nsecs; + rmtp.tv_sec = 0; + rmtp.tv_nsec = 0; + for (; ;) { + // this suspends the thread so we aren't using CPU + if (nanosleep(&rqtp, &rmtp) == 0) { + return; + } + // TODO: else if (errno == EINTR) + // this could be SIGTERM and we should shutdown but how to find out? + rqtp = rmtp; + } +} + +// check for heartbeats and maybe report loss +uint8_t check_heartbeats(uint8_t gc_state) +{ + int hb = jl_atomic_exchange(&heartbeats, 0); + uint64_t curr_s = jl_hrtime() / 1e9; + + if (hb <= 0) { + // we didn't get a heartbeat in the last interval; should we report? + if (n_reported < n_loss_reports && + curr_s - last_report_s >= report_interval_s) { + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // exit GC-safe region to report then re-enter + jl_gc_safe_leave(ptls, gc_state); + jl_safe_printf("==== heartbeat loss ====\n"); + jl_print_task_backtraces(0); + gc_state = jl_gc_safe_enter(ptls); + + // we've reported + n_reported++; + + // record the reporting time _after_ the report + last_report_s = jl_hrtime() / 1e9; + + // double the reporting interval up to a maximum + if (report_interval_s < 60 * heartbeat_interval_s) { + report_interval_s *= 2; + } + } + // no heartbeats, don't change reporting state + return gc_state; + } + else { + // we got a heartbeat; reset the report count + n_reported = 0; + } + + // reset the reporting interval only once we're steadily getting + // heartbeats for the requested reset interval + if (curr_s - reset_reporting_s > last_report_s) { + report_interval_s = heartbeat_interval_s; + } + + return gc_state; +} + +// heartbeat thread function +void jl_heartbeat_threadfun(void *arg) +{ + int s, ns = 1e9 - 1, rs; + uint64_t t0, tchb; + + // We need a TLS because backtraces are accumulated into ptls->bt_size + // and ptls->bt_data, so we need to call jl_adopt_thread(). + jl_adopt_thread(); + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // Don't hold up GC, this thread doesn't participate. + uint8_t gc_state = jl_gc_safe_enter(ptls); + + for (;;) { + if (!heartbeat_enabled) { + // post the off semaphore to indicate we're ready to enable + uv_sem_post(&heartbeat_off_sem); + + // sleep the thread here; this semaphore is posted in + // jl_heartbeat_enable() + uv_sem_wait(&heartbeat_on_sem); + + // Set the sleep duration. + s = heartbeat_interval_s - 1; + ns = 1e9 - 1; + continue; + } + + // heartbeat is enabled; sleep, waiting for the desired interval + sleep_for(s, ns); + + // if heartbeats were turned off while we were sleeping, reset + if (!heartbeat_enabled) { + continue; + } + + // check if any heartbeats have happened, report as appropriate + t0 = jl_hrtime(); + gc_state = check_heartbeats(gc_state); + tchb = jl_hrtime() - t0; + + // adjust the next sleep duration based on how long the heartbeat + // check took + rs = 1; + while (tchb > 1e9) { + rs++; + tchb -= 1e9; + } + s = heartbeat_interval_s - rs; + ns = 1e9 - tchb; + } +} + +#else // !JL_HEARTBEAT_THREAD + +void jl_init_heartbeat(void) +{ +} + +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports, + int reset_reporting_after_s) +{ + return -1; +} + +JL_DLLEXPORT void jl_heartbeat(void) +{ +} + +#endif // JL_HEARTBEAT_THREAD + #ifdef __cplusplus } #endif