From 9b68b76c7caef6ebe37fc331e056cf3d24ef8750 Mon Sep 17 00:00:00 2001 From: Kiran Pamnany Date: Tue, 24 Sep 2024 12:47:53 -0400 Subject: [PATCH] RAI: Adjust heartbeat behavior * Add heartbeat pause/resume capability * Add check to avoid negative sleep duration * Disable heartbeats in `jl_print_task_backtraces()` `jl_print_task_backtraces()` can take long enough that there can be heartbeat loss, which can trigger printing task backtraces again, unless it is called from the heartbeat thread which takes care of that possible problem. * Pause heartbeats for GC * Address review comment * Address review comment --- src/gc-stock.c | 5 +++++ src/stackwalk.c | 19 +++++++++++++++- src/threading.c | 59 +++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/src/gc-stock.c b/src/gc-stock.c index 6a51339f85795..8b9382a9255ec 100644 --- a/src/gc-stock.c +++ b/src/gc-stock.c @@ -3305,6 +3305,9 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) return recollect; } +extern int jl_heartbeat_pause(void); +extern int jl_heartbeat_resume(void); + JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) { JL_PROBE_GC_BEGIN(collection); @@ -3347,6 +3350,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) // existence of the thread in the jl_n_threads count. // // TODO: concurrently queue objects + jl_heartbeat_pause(); jl_fence(); gc_n_threads = jl_atomic_load_acquire(&jl_n_threads); gc_all_tls_states = jl_atomic_load_relaxed(&jl_all_tls_states); @@ -3378,6 +3382,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) gc_n_threads = 0; gc_all_tls_states = NULL; + jl_heartbeat_resume(); jl_safepoint_end_gc(); jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING); JL_PROBE_GC_END(); diff --git a/src/stackwalk.c b/src/stackwalk.c index 0d5e649c8bf70..647b7ec400b76 100644 --- a/src/stackwalk.c +++ b/src/stackwalk.c @@ -1369,9 +1369,22 @@ JL_DLLEXPORT void jl_print_backtrace(void) JL_NOTSAFEPOINT jlbacktrace(); } -// Print backtraces for all live tasks, for all threads, to jl_safe_printf stderr +extern int jl_inside_heartbeat_thread(void); +extern int jl_heartbeat_pause(void); +extern int jl_heartbeat_resume(void); + +// Print backtraces for all live tasks, for all threads, to jl_safe_printf +// stderr. This can take a _long_ time! JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT { + // disable heartbeats to prevent heartbeat loss while running this, + // unless this is called from the heartbeat thread itself; in that + // situation, the thread is busy running this and it will not be + // updating the missed heartbeats counter + if (!jl_inside_heartbeat_thread()) { + jl_heartbeat_pause(); + } + size_t nthreads = jl_atomic_load_acquire(&jl_n_threads); jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states); int ctid = jl_threadid() + 1; @@ -1430,6 +1443,10 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT jl_safe_printf("thread (%d) ==== End thread %d\n", ctid, ptls2->tid + 1); } jl_safe_printf("thread (%d) ++++ Done\n", ctid); + + if (!jl_inside_heartbeat_thread()) { + jl_heartbeat_resume(); + } } #ifdef __cplusplus diff --git a/src/threading.c b/src/threading.c index e5d01124742c7..962b4da91bb62 100644 --- a/src/threading.c +++ b/src/threading.c @@ -1152,6 +1152,45 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n, return 0; } +// temporarily pause the heartbeat thread +JL_DLLEXPORT int jl_heartbeat_pause(void) +{ + if (!heartbeat_enabled) { + return -1; + } + heartbeat_enabled = 0; + return 0; +} + +// resume the paused heartbeat thread +JL_DLLEXPORT int jl_heartbeat_resume(void) +{ + // cannot resume if the heartbeat thread is already running + if (heartbeat_enabled) { + return -1; + } + + // cannot resume if we weren't paused (disabled != paused) + if (heartbeat_interval_s == 0) { + return -1; + } + + // heartbeat thread must be ready + if (uv_sem_trywait(&heartbeat_off_sem) != 0) { + return -1; + } + + // reset state as we've been paused + n_hbs_missed = 0; + n_hbs_recvd = 0; + tasks_showed = 0; + + // resume + heartbeat_enabled = 1; + uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread + return 0; +} + // heartbeat JL_DLLEXPORT void jl_heartbeat(void) { @@ -1243,7 +1282,7 @@ void jl_heartbeat_threadfun(void *arg) uv_sem_post(&heartbeat_off_sem); // sleep the thread here; this semaphore is posted in - // jl_heartbeat_enable() + // jl_heartbeat_enable() or jl_heartbeat_resume() uv_sem_wait(&heartbeat_on_sem); // Set the sleep duration. @@ -1255,7 +1294,7 @@ void jl_heartbeat_threadfun(void *arg) // heartbeat is enabled; sleep, waiting for the desired interval sleep_for(s, ns); - // if heartbeats were turned off while we were sleeping, reset + // if heartbeats were turned off/paused while we were sleeping, reset if (!heartbeat_enabled) { continue; } @@ -1266,13 +1305,15 @@ void jl_heartbeat_threadfun(void *arg) tchb = jl_hrtime() - t0; // adjust the next sleep duration based on how long the heartbeat - // check took + // check took, but if it took too long then use the normal duration rs = 1; while (tchb > 1e9) { rs++; tchb -= 1e9; } - s = heartbeat_interval_s - rs; + if (rs < heartbeat_interval_s) { + s = heartbeat_interval_s - rs; + } ns = 1e9 - tchb; } } @@ -1294,6 +1335,16 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n, return -1; } +JL_DLLEXPORT int jl_heartbeat_pause(void) +{ + return -1; +} + +JL_DLLEXPORT int jl_heartbeat_resume(void) +{ + return -1; +} + JL_DLLEXPORT void jl_heartbeat(void) { }