From ce667bce9441b77c780f9b74e394784dc4cb3631 Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Mon, 9 Dec 2024 17:01:00 +0000 Subject: [PATCH] Add per-Task cpu time metric (#56320) (#194) * Add per-task metrics (#56320) Close https://github.com/JuliaLang/julia/issues/47351 (builds on top of https://github.com/JuliaLang/julia/pull/48416) Adds two per-task metrics: - running time = amount of time the task was actually running (according to our scheduler). Note: currently inclusive of GC time, but would be good to be able to separate that out (in a future PR) - wall time = amount of time between the scheduler becoming aware of this task and the task entering a terminal state (i.e. done or failed). We record running time in `wait()`, where the scheduler stops running the task as well as in `yield(t)`, `yieldto(t)` and `throwto(t)`, which bypass the scheduler. Other places where a task stops running (for `Channel`, `ReentrantLock`, `Event`, `Timer` and `Semaphore` are all implemented in terms of `wait(Condition)`, which in turn calls `wait()`. `LibuvStream` similarly calls `wait()`. This should capture everything (albeit, slightly over-counting task CPU time by including any enqueuing work done before we hit `wait()`). The various metrics counters could be a separate inlined struct if we think that's a useful abstraction, but for now i've just put them directly in `jl_task_t`. They are all atomic, except the `metrics_enabled` flag itself (which we now have to check on task start/switch/done even if metrics are not enabled) which is set on task construction and marked `const` on the julia side. In future PRs we could add more per-task metrics, e.g. compilation time, GC time, allocations, potentially a wait-time breakdown (time waiting on locks, channels, in the scheduler run queue, etc.), potentially the number of yields. Perhaps in future there could be ways to enable this on a per-thread and per-task basis. And potentially in future these same timings could be used by `@time` (e.g. writing this same timing data to a ScopedValue like in https://github.com/JuliaLang/julia/pull/55103 but only for tasks lexically scoped to inside the `@time` block). Timings are off by default but can be turned on globally via starting Julia with `--task-metrics=yes` or calling `Base.Experimental.task_metrics(true)`. Metrics are collected for all tasks created when metrics are enabled. In other words, enabling/disabling timings via `Base.Experimental.task_metrics` does not affect existing `Task`s, only new `Task`s. The other new APIs are `Base.Experimental.task_running_time_ns(::Task)` and `Base.Experimental.task_wall_time_ns(::Task)` for retrieving the new metrics. These are safe to call on any task (including the current task, or a task running on another thread). All these are in `Base.Experimental` to give us room to change up the APIs as we add more metrics in future PRs (without worrying about release timelines). cc @NHDaly @kpamnany @d-netto --------- Co-authored-by: Pete Vilter Co-authored-by: K Pamnany Co-authored-by: Nathan Daly Co-authored-by: Valentin Churavy * Address review comments --------- Co-authored-by: Pete Vilter Co-authored-by: K Pamnany Co-authored-by: Nathan Daly Co-authored-by: Valentin Churavy --- base/boot.jl | 28 ++- base/experimental.jl | 74 ++++++++ base/options.jl | 1 + base/task.jl | 53 +++++- doc/man/julia.1 | 4 + doc/src/base/multi-threading.md | 8 + doc/src/manual/command-line-interface.md | 1 + src/init.c | 4 + src/jlapi.c | 30 +++ src/jloptions.c | 38 ++-- src/jloptions.h | 1 + src/jltypes.c | 46 ++++- src/julia.h | 26 ++- src/julia_internal.h | 3 + src/task.c | 35 +++- src/threading.c | 2 + test/cmdlineargs.jl | 9 + test/core.jl | 2 + test/threads_exec.jl | 223 +++++++++++++++++++++++ 19 files changed, 551 insertions(+), 37 deletions(-) diff --git a/base/boot.jl b/base/boot.jl index 561be564d58f4..aff2a30504549 100644 --- a/base/boot.jl +++ b/base/boot.jl @@ -156,15 +156,33 @@ #end #mutable struct Task -# parent::Task +# next::Any +# queue::Any # storage::Any -# state::Symbol # donenotify::Any # result::Any -# exception::Any -# backtrace::Any -# logstate::Any +# scope::Any # code::Any +# @atomic _state::UInt8 +# sticky::UInt8 +# priority::UInt16 +# @atomic _isexception::UInt8 +# pad00::UInt8 +# pad01::UInt8 +# pad02::UInt8 +# rngState0::UInt64 +# rngState1::UInt64 +# rngState2::UInt64 +# rngState3::UInt64 +# rngState4::UInt64 +# const metrics_enabled::Bool +# pad10::UInt8 +# pad11::UInt8 +# pad12::UInt8 +# @atomic first_enqueued_at::UInt64 +# @atomic last_started_running_at::UInt64 +# @atomic running_time_ns::UInt64 +# @atomic finished_at::UInt64 #end export diff --git a/base/experimental.jl b/base/experimental.jl index cc8d368023b49..d876662ab048c 100644 --- a/base/experimental.jl +++ b/base/experimental.jl @@ -368,4 +368,78 @@ adding them to the global method table. """ :@MethodTable +### Task metrics + +""" + Base.Experimental.task_metrics(::Bool) + +Enable or disable the collection of per-task metrics. +A `Task` created when `Base.Experimental.task_metrics(true)` is in effect will have +[`Base.Experimental.task_running_time_ns`](@ref) and [`Base.Experimental.task_wall_time_ns`](@ref) +timing information available. + +!!! note + Task metrics can be enabled at start-up via the `--task-metrics=yes` command line option. +""" +function task_metrics(b::Bool) + if b + ccall(:jl_task_metrics_enable, Cvoid, ()) + else + ccall(:jl_task_metrics_disable, Cvoid, ()) + end + return nothing +end + +""" + Base.Experimental.task_running_time_ns(t::Task) -> Union{UInt64, Nothing} + +Return the total nanoseconds that the task `t` has spent running. +This metric is only updated when `t` yields or completes unless `t` is the current task, in +which it will be updated continuously. +See also [`Base.Experimental.task_wall_time_ns`](@ref). + +Returns `nothing` if task timings are not enabled. +See [`Base.Experimental.task_metrics`](@ref). + +!!! note "This metric is from the Julia scheduler" + A task may be running on an OS thread that is descheduled by the OS + scheduler, this time still counts towards the metric. + +!!! compat "Julia 1.12" + This method was added in Julia 1.12. +""" +function task_running_time_ns(t::Task=current_task()) + t.metrics_enabled || return nothing + if t == current_task() + # These metrics fields can't update while we're running. + # But since we're running we need to include the time since we last started running! + return t.running_time_ns + (time_ns() - t.last_started_running_at) + else + return t.running_time_ns + end end + +""" + Base.Experimental.task_wall_time_ns(t::Task) -> Union{UInt64, Nothing} + +Return the total nanoseconds that the task `t` was runnable. +This is the time since the task first entered the run queue until the time at which it +completed, or until the current time if the task has not yet completed. +See also [`Base.Experimental.task_running_time_ns`](@ref). + +Returns `nothing` if task timings are not enabled. +See [`Base.Experimental.task_metrics`](@ref). + +!!! compat "Julia 1.12" + This method was added in Julia 1.12. +""" +function task_wall_time_ns(t::Task=current_task()) + t.metrics_enabled || return nothing + start_at = t.first_enqueued_at + start_at == 0 && return UInt64(0) + end_at = t.finished_at + end_at == 0 && return time_ns() - start_at + return end_at - start_at +end + +end # module diff --git a/base/options.jl b/base/options.jl index a01a3f553b157..2e779c964e7fd 100644 --- a/base/options.jl +++ b/base/options.jl @@ -60,6 +60,7 @@ struct JLOptions heap_size_hint::UInt64 trace_compile_timing::Int8 safe_crash_log_file::Ptr{UInt8} + task_metrics::Int8 end # This runs early in the sysimage != is not defined yet diff --git a/base/task.jl b/base/task.jl index 137b0f7c4a3f6..2d0babbb9fd4c 100644 --- a/base/task.jl +++ b/base/task.jl @@ -810,7 +810,11 @@ function enq_work(t::Task) return t end -schedule(t::Task) = enq_work(t) +function schedule(t::Task) + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) + enq_work(t) +end """ schedule(t::Task, [val]; error=false) @@ -857,6 +861,8 @@ function schedule(t::Task, @nospecialize(arg); error=false) t.queue === nothing || Base.error("schedule: Task not runnable") setfield!(t, :result, arg) end + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) enq_work(t) return t end @@ -888,9 +894,15 @@ A fast, unfair-scheduling version of `schedule(t, arg); yield()` which immediately yields to `t` before calling the scheduler. """ function yield(t::Task, @nospecialize(x=nothing)) - (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable") + ct = current_task() + t === ct && throw(ConcurrencyViolationError("Cannot yield to currently running task!")) + (t._state === task_state_runnable && t.queue === nothing) || throw(ConcurrencyViolationError("yield: Task not runnable")) + # [task] user_time -yield-> wait_time + record_running_time!(ct) + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) t.result = x - enq_work(current_task()) + enq_work(ct) set_next_task(t) return try_yieldto(ensure_rescheduled) end @@ -904,6 +916,7 @@ call to `yieldto`. This is a low-level call that only switches tasks, not consid or scheduling in any way. Its use is discouraged. """ function yieldto(t::Task, @nospecialize(x=nothing)) + ct = current_task() # TODO: these are legacy behaviors; these should perhaps be a scheduler # state error instead. if t._state === task_state_done @@ -911,6 +924,10 @@ function yieldto(t::Task, @nospecialize(x=nothing)) elseif t._state === task_state_failed throw(t.result) end + # [task] user_time -yield-> wait_time + record_running_time!(ct) + # [task] created -scheduled-unfairly-> wait_time + maybe_record_enqueued!(t) t.result = x set_next_task(t) return try_yieldto(identity) @@ -924,6 +941,10 @@ function try_yieldto(undo) rethrow() end ct = current_task() + # [task] wait_time -(re)started-> user_time + if ct.metrics_enabled + @atomic :monotonic ct.last_started_running_at = time_ns() + end if ct._isexception exc = ct.result ct.result = nothing @@ -937,6 +958,11 @@ end # yield to a task, throwing an exception in it function throwto(t::Task, @nospecialize exc) + ct = current_task() + # [task] user_time -yield-> wait_time + record_running_time!(ct) + # [task] created -scheduled-unfairly-> wait_time + maybe_record_enqueued!(t) t.result = exc t._isexception = true set_next_task(t) @@ -989,6 +1015,9 @@ checktaskempty = Partr.multiq_check_empty end function wait() + ct = current_task() + # [task] user_time -yield-or-done-> wait_time + record_running_time!(ct) GC.safepoint() W = workqueue_for(Threads.threadid()) poptask(W) @@ -1003,3 +1032,21 @@ if Sys.iswindows() else pause() = ccall(:pause, Cvoid, ()) end + +# update the `running_time_ns` field of `t` to include the time since it last started running. +function record_running_time!(t::Task) + if t.metrics_enabled && !istaskdone(t) + @atomic :monotonic t.running_time_ns += time_ns() - t.last_started_running_at + end + return t +end + +# if this is the first time `t` has been added to the run queue +# (or the first time it has been unfairly yielded to without being added to the run queue) +# then set the `first_enqueued_at` field to the current time. +function maybe_record_enqueued!(t::Task) + if t.metrics_enabled && t.first_enqueued_at == 0 + @atomic :monotonic t.first_enqueued_at = time_ns() + end + return t +end diff --git a/doc/man/julia.1 b/doc/man/julia.1 index e7da6e352a96a..483b7d76ac04f 100644 --- a/doc/man/julia.1 +++ b/doc/man/julia.1 @@ -270,6 +270,10 @@ Print precompile statements for methods compiled during execution or save to a p --trace-compile-timing= If --trace-compile is enabled show how long each took to compile in ms +.TP +--task-metrics={yes|no*} +Enable the collection of per-task metrics. + .TP -image-codegen Force generate code in imaging mode diff --git a/doc/src/base/multi-threading.md b/doc/src/base/multi-threading.md index 45a60b14d541a..6758cafd9b059 100644 --- a/doc/src/base/multi-threading.md +++ b/doc/src/base/multi-threading.md @@ -71,3 +71,11 @@ These building blocks are used to create the regular synchronization objects. ```@docs Base.Threads.SpinLock ``` + +## Task metrics (Experimental) + +```@docs +Base.Experimental.task_metrics +Base.Experimental.task_running_time_ns +Base.Experimental.task_wall_time_ns +``` diff --git a/doc/src/manual/command-line-interface.md b/doc/src/manual/command-line-interface.md index d08e3fbfd2181..0c380fa2c0688 100644 --- a/doc/src/manual/command-line-interface.md +++ b/doc/src/manual/command-line-interface.md @@ -130,6 +130,7 @@ The following is a complete list of command-line switches available when launchi |`--code-coverage=tracefile.info` |Append coverage information to the LCOV tracefile (filename supports format tokens).| |`--track-allocation[={none*\|user\|all}]` |Count bytes allocated by each source line (omitting setting is equivalent to "user")| |`--track-allocation=@` |Count bytes but only in files that fall under the given file path/directory. The `@` prefix is required to select this option. A `@` with no path will track the current directory.| +|`--task-metrics={yes\|no*}` |Enable the collection of per-task metrics| |`--bug-report=KIND` |Launch a bug report session. It can be used to start a REPL, run a script, or evaluate expressions. It first tries to use BugReporting.jl installed in current environment and falls back to the latest compatible BugReporting.jl if not. For more information, see `--bug-report=help`.| |`--compile={yes*\|no\|all\|min}` |Enable or disable JIT compiler, or request exhaustive or minimal compilation| |`--output-o ` |Generate an object file (including system image data)| diff --git a/src/init.c b/src/init.c index 9a49ad60c02f4..dae5afb375916 100644 --- a/src/init.c +++ b/src/init.c @@ -851,6 +851,10 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel) #if defined(_COMPILER_GCC_) && __GNUC__ >= 12 #pragma GCC diagnostic ignored "-Wdangling-pointer" #endif + if (jl_options.task_metrics == JL_OPTIONS_TASK_METRICS_ON) { + // enable before creating the root task so it gets timings too. + jl_atomic_fetch_add(&jl_task_metrics_enabled, 1); + } // warning: this changes `jl_current_task`, so be careful not to call that from this function jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi); #pragma GCC diagnostic pop diff --git a/src/jlapi.c b/src/jlapi.c index 0dffaac627288..670408d2260fb 100644 --- a/src/jlapi.c +++ b/src/jlapi.c @@ -509,6 +509,36 @@ JL_DLLEXPORT uint64_t jl_cumulative_recompile_time_ns(void) return jl_atomic_load_relaxed(&jl_cumulative_recompile_time); } +/** + * @brief Enable per-task timing. + */ +JL_DLLEXPORT void jl_task_metrics_enable(void) +{ + // Increment the flag to allow reentrant callers. + jl_atomic_fetch_add(&jl_task_metrics_enabled, 1); +} + +/** + * @brief Disable per-task timing. + */ +JL_DLLEXPORT void jl_task_metrics_disable(void) +{ + // Prevent decrementing the counter below zero + uint8_t enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled); + while (enabled > 0) { + if (jl_atomic_cmpswap(&jl_task_metrics_enabled, &enabled, enabled-1)) + break; + } +} + +/** + * @brief Retrieve floating-point environment constants. + * + * Populates an array with constants related to the floating-point environment, + * such as rounding modes and exception flags. + * + * @param ret An array of integers to be populated with floating-point environment constants. + */ JL_DLLEXPORT void jl_get_fenv_consts(int *ret) { ret[0] = FE_INEXACT; diff --git a/src/jloptions.c b/src/jloptions.c index 29fd171db1437..364c37cebcb6d 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -93,6 +93,7 @@ JL_DLLEXPORT void jl_init_options(void) 0, // heap-size-hint 0, // trace_compile_timing NULL, // safe_crash_log_file + 0, // task_metrics }; jl_options_initialized = 1; } @@ -206,17 +207,22 @@ static const char opts_hidden[] = " --strip-metadata Remove docstrings and source location info from system image\n" " --strip-ir Remove IR (intermediate representation) of compiled functions\n\n" - // compiler debugging (see the devdocs for tips on using these options) - " --output-unopt-bc Generate unoptimized LLVM bitcode (.bc)\n" - " --output-bc Generate LLVM bitcode (.bc)\n" - " --output-asm Generate an assembly file (.s)\n" - " --output-incremental={yes|no*}\n" - " Generate an incremental output file (rather than complete)\n" - " --trace-compile={stderr,name}\n" - " Print precompile statements for methods compiled during execution or save to a path\n" - " --trace-compile-timing If --trace-compile is enabled show how long each took to compile in ms\n" - " --image-codegen Force generate code in imaging mode\n" - " --permalloc-pkgimg={yes|no*} Copy the data section of package images into memory\n" + // compiler debugging and experimental (see the devdocs for tips on using these options) + " --experimental Enable the use of experimental (alpha) features\n" + " --output-unopt-bc Generate unoptimized LLVM bitcode (.bc)\n" + " --output-bc Generate LLVM bitcode (.bc)\n" + " --output-asm Generate an assembly file (.s)\n" + " --output-incremental={yes|no*} Generate an incremental output file (rather than\n" + " complete)\n" + " --trace-compile={stderr|name} Print precompile statements for methods compiled\n" + " during execution or save to stderr or a path. Methods that\n" + " were recompiled are printed in yellow or with a trailing\n" + " comment if color is not supported\n" + " --trace-compile-timing If --trace-compile is enabled show how long each took to\n" + " compile in ms\n" + " --task-metrics={yes|no*} Enable collection of per-task timing data.\n" + " --image-codegen Force generate code in imaging mode\n" + " --permalloc-pkgimg={yes|no*} Copy the data section of package images into memory\n" ; JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) @@ -239,6 +245,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) opt_trace_compile, opt_trace_compile_timing, opt_trace_dispatch, + opt_task_metrics, opt_math_mode, opt_worker, opt_bind_to, @@ -317,6 +324,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) { "trace-compile", required_argument, 0, opt_trace_compile }, { "trace-compile-timing", no_argument, 0, opt_trace_compile_timing }, { "trace-dispatch", required_argument, 0, opt_trace_dispatch }, + { "task-metrics", required_argument, 0, opt_task_metrics }, { "math-mode", required_argument, 0, opt_math_mode }, { "handle-signals", required_argument, 0, opt_handle_signals }, // hidden command line options @@ -873,6 +881,14 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) if (jl_options.safe_crash_log_file == NULL) jl_error("julia: failed to allocate memory for --safe-crash-log-file"); break; + case opt_task_metrics: + if (!strcmp(optarg, "no")) + jl_options.task_metrics = JL_OPTIONS_TASK_METRICS_OFF; + else if (!strcmp(optarg, "yes")) + jl_options.task_metrics = JL_OPTIONS_TASK_METRICS_ON; + else + jl_errorf("julia: invalid argument to --task-metrics={yes|no} (%s)", optarg); + break; default: jl_errorf("julia: unhandled option -- %c\n" "This is a bug, please report it.", c); diff --git a/src/jloptions.h b/src/jloptions.h index 63f639805b300..d9cf96e82f2a4 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -64,6 +64,7 @@ typedef struct { uint64_t heap_size_hint; int8_t trace_compile_timing; const char *safe_crash_log_file; + int8_t task_metrics; } jl_options_t; #endif diff --git a/src/jltypes.c b/src/jltypes.c index e9d843c383a9c..f67c90fba9be2 100644 --- a/src/jltypes.c +++ b/src/jltypes.c @@ -3222,7 +3222,7 @@ void jl_init_types(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(16, + jl_perm_symsvec(27, "next", "queue", "storage", @@ -3230,16 +3230,27 @@ void jl_init_types(void) JL_GC_DISABLED "result", "logstate", "code", + "_state", + "sticky", + "priority", + "_isexception", + "pad00", + "pad01", + "pad02", "rngState0", "rngState1", "rngState2", "rngState3", "rngState4", - "_state", - "sticky", - "_isexception", - "priority"), - jl_svec(16, + "metrics_enabled", + "pad10", + "pad11", + "pad12", + "first_enqueued_at", + "last_started_running_at", + "running_time_ns", + "finished_at"), + jl_svec(27, jl_any_type, jl_any_type, jl_any_type, @@ -3247,20 +3258,37 @@ void jl_init_types(void) JL_GC_DISABLED jl_any_type, jl_any_type, jl_any_type, + jl_uint8_type, + jl_bool_type, + jl_uint16_type, + jl_bool_type, + jl_uint8_type, + jl_uint8_type, + jl_uint8_type, jl_uint64_type, jl_uint64_type, jl_uint64_type, jl_uint64_type, jl_uint64_type, - jl_uint8_type, jl_bool_type, - jl_bool_type, - jl_uint16_type), + jl_uint8_type, + jl_uint8_type, + jl_uint8_type, + jl_uint64_type, + jl_uint64_type, + jl_uint64_type, + jl_uint64_type), jl_emptysvec, 0, 1, 6); XX(task); jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_nothing_type); jl_svecset(jl_task_type->types, 0, listt); + // Set field 20 (metrics_enabled) as const + // Set fields 8 (_state) and 24-27 (metric counters) as atomic + const static uint32_t task_constfields[1] = { 0b00000000000010000000000000000000 }; + const static uint32_t task_atomicfields[1] = { 0b00000111100000000000000010000000 }; + jl_task_type->name->constfields = task_constfields; + jl_task_type->name->atomicfields = task_atomicfields; jl_binding_type = jl_new_datatype(jl_symbol("Binding"), core, jl_any_type, jl_emptysvec, diff --git a/src/julia.h b/src/julia.h index f34bb1623eed5..3dc8552240f9e 100644 --- a/src/julia.h +++ b/src/julia.h @@ -2033,16 +2033,25 @@ typedef struct _jl_task_t { jl_value_t *result; jl_value_t *logstate; jl_function_t *start; - // 4 byte padding on 32-bit systems - // uint32_t padding0; - uint64_t rngState[JL_RNG_SIZE]; _Atomic(uint8_t) _state; uint8_t sticky; // record whether this Task can be migrated to a new thread - _Atomic(uint8_t) _isexception; // set if `result` is an exception to throw or that we exited with - // 1 byte padding - // uint8_t padding1; - // multiqueue priority uint16_t priority; + _Atomic(uint8_t) _isexception; // set if `result` is an exception to throw or that we exited with + uint8_t pad0[3]; + // === 64 bytes (cache line) + uint64_t rngState[JL_RNG_SIZE]; + // flag indicating whether or not to record timing metrics for this task + uint8_t metrics_enabled; + uint8_t pad1[3]; + // timestamp this task first entered the run queue + _Atomic(uint64_t) first_enqueued_at; + // timestamp this task was most recently scheduled to run + _Atomic(uint64_t) last_started_running_at; + // time this task has spent running; updated when it yields or finishes. + _Atomic(uint64_t) running_time_ns; + // === 64 bytes (cache line) + // timestamp this task finished (i.e. entered state DONE or FAILED). + _Atomic(uint64_t) finished_at; // hidden state: @@ -2324,6 +2333,9 @@ JL_DLLEXPORT int jl_generating_output(void) JL_NOTSAFEPOINT; #define JL_OPTIONS_USE_PKGIMAGES_YES 1 #define JL_OPTIONS_USE_PKGIMAGES_NO 0 +#define JL_OPTIONS_TASK_METRICS_OFF 0 +#define JL_OPTIONS_TASK_METRICS_ON 1 + // Version information #include // Generated file diff --git a/src/julia_internal.h b/src/julia_internal.h index 3a66da555ea52..36d25f502cd2a 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -305,6 +305,9 @@ extern JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled; extern JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time; extern JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time; +// Global *atomic* integer controlling *process-wide* task timing. +extern JL_DLLEXPORT _Atomic(uint8_t) jl_task_metrics_enabled; + #define jl_return_address() ((uintptr_t)__builtin_return_address(0)) STATIC_INLINE uint32_t jl_int32hash_fast(uint32_t a) diff --git a/src/task.c b/src/task.c index 86033a81ddf41..e614d4205e937 100644 --- a/src/task.c +++ b/src/task.c @@ -295,8 +295,15 @@ void JL_NORETURN jl_finish_task(jl_task_t *t) jl_task_t *ct = jl_current_task; JL_PROBE_RT_FINISH_TASK(ct); JL_SIGATOMIC_BEGIN(); - if (jl_atomic_load_relaxed(&t->_isexception)) - jl_atomic_store_release(&t->_state, JL_TASK_STATE_FAILED); + if (ct->metrics_enabled) { + // [task] user_time -finished-> wait_time + assert(jl_atomic_load_relaxed(&ct->first_enqueued_at) != 0); + uint64_t now = jl_hrtime(); + jl_atomic_store_relaxed(&ct->finished_at, now); + jl_atomic_fetch_add_relaxed(&ct->running_time_ns, now - jl_atomic_load_relaxed(&ct->last_started_running_at)); + } + if (jl_atomic_load_relaxed(&ct->_isexception)) + jl_atomic_store_release(&ct->_state, JL_TASK_STATE_FAILED); else jl_atomic_store_release(&t->_state, JL_TASK_STATE_DONE); if (t->copy_stack) { // early free of stkbuf @@ -1084,6 +1091,11 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion t->ptls = NULL; t->world_age = ct->world_age; t->reentrant_timing = 0; + t->metrics_enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled) != 0; + jl_atomic_store_relaxed(&t->first_enqueued_at, 0); + jl_atomic_store_relaxed(&t->last_started_running_at, 0); + jl_atomic_store_relaxed(&t->running_time_ns, 0); + jl_atomic_store_relaxed(&t->finished_at, 0); jl_timing_task_init(t); #ifdef COPY_STACKS @@ -1220,6 +1232,12 @@ CFI_NORETURN #endif ct->started = 1; + if (ct->metrics_enabled) { + // [task] wait_time -started-> user_time + assert(jl_atomic_load_relaxed(&ct->first_enqueued_at) != 0); + assert(jl_atomic_load_relaxed(&ct->last_started_running_at) == 0); + jl_atomic_store_relaxed(&ct->last_started_running_at, jl_hrtime()); + } JL_PROBE_RT_START_TASK(ct); jl_timing_block_task_enter(ct, ptls, NULL); if (jl_atomic_load_relaxed(&ct->_isexception)) { @@ -1680,6 +1698,19 @@ jl_task_t *jl_init_root_task(jl_ptls_t ptls, void *stack_lo, void *stack_hi) ct->ptls = ptls; ct->world_age = 1; // OK to run Julia code on this task ct->reentrant_timing = 0; + jl_atomic_store_relaxed(&ct->running_time_ns, 0); + jl_atomic_store_relaxed(&ct->finished_at, 0); + ct->metrics_enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled) != 0; + if (ct->metrics_enabled) { + // [task] created -started-> user_time + uint64_t now = jl_hrtime(); + jl_atomic_store_relaxed(&ct->first_enqueued_at, now); + jl_atomic_store_relaxed(&ct->last_started_running_at, now); + } + else { + jl_atomic_store_relaxed(&ct->first_enqueued_at, 0); + jl_atomic_store_relaxed(&ct->last_started_running_at, 0); + } ptls->root_task = ct; jl_atomic_store_relaxed(&ptls->current_task, ct); JL_GC_PROMISE_ROOTED(ct); diff --git a/src/threading.c b/src/threading.c index d0f25de374462..5a8ae7825354f 100644 --- a/src/threading.c +++ b/src/threading.c @@ -49,6 +49,8 @@ JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0; +JL_DLLEXPORT _Atomic(uint8_t) jl_task_metrics_enabled = 0; + JL_DLLEXPORT void *jl_get_ptls_states(void) { // mostly deprecated: use current_task instead diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index 094fa8741087f..3597d99eb6d96 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -746,6 +746,15 @@ let exename = `$(Base.julia_cmd()) --startup-file=no --color=no` "Int(Base.JLOptions().fast_math)"`)) == JL_OPTIONS_FAST_MATH_DEFAULT end + let JL_OPTIONS_TASK_METRICS_OFF = 0, JL_OPTIONS_TASK_METRICS_ON = 1 + @test parse(Int,readchomp(`$exename -E + "Int(Base.JLOptions().task_metrics)"`)) == JL_OPTIONS_TASK_METRICS_OFF + @test parse(Int, readchomp(`$exename --task-metrics=yes -E + "Int(Base.JLOptions().task_metrics)"`)) == JL_OPTIONS_TASK_METRICS_ON + @test !parse(Bool, readchomp(`$exename -E "current_task().metrics_enabled"`)) + @test parse(Bool, readchomp(`$exename --task-metrics=yes -E "current_task().metrics_enabled"`)) + end + # --worker takes default / custom as argument (default/custom arguments # tested in test/parallel.jl) @test errors_not_signals(`$exename --worker=true`) diff --git a/test/core.jl b/test/core.jl index 0212193a0206b..0549c1eec32df 100644 --- a/test/core.jl +++ b/test/core.jl @@ -23,6 +23,7 @@ for (T, c) in ( (Core.TypeName, [:name, :module, :names, :atomicfields, :constfields, :wrapper, :mt, :hash, :n_uninitialized, :flags]), (DataType, [:name, :super, :parameters, :instance, :hash]), (TypeVar, [:name, :ub, :lb]), + (Task, [:metrics_enabled]), ) @test Set((fieldname(T, i) for i in 1:fieldcount(T) if isconst(T, i))) == Set(c) end @@ -38,6 +39,7 @@ for (T, c) in ( (Core.TypeMapLevel, [:arg1, :targ, :name1, :tname, :list, :any]), (Core.TypeName, [:cache, :linearcache]), (DataType, [:types, :layout]), + (Task, [:_state, :running_time_ns, :finished_at, :first_enqueued_at, :last_started_running_at]), ) @test Set((fieldname(T, i) for i in 1:fieldcount(T) if Base.isfieldatomic(T, i))) == Set(c) end diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 9c7c524febeff..6fbad062901cf 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -3,6 +3,7 @@ using Test using Base.Threads using Base.Threads: SpinLock, threadpoolsize +using LinearAlgebra: peakflops # for cfunction_closure include("testenv.jl") @@ -1090,4 +1091,226 @@ end end end +@testset "Base.Experimental.task_metrics" begin + t = Task(() -> nothing) + @test_throws "const field" t.metrics_enabled = true + is_task_metrics_enabled() = fetch(Threads.@spawn current_task().metrics_enabled) + @test !is_task_metrics_enabled() + try + @testset "once" begin + Base.Experimental.task_metrics(true) + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) + @test !is_task_metrics_enabled() + end + @testset "multiple" begin + Base.Experimental.task_metrics(true) # 1 + Base.Experimental.task_metrics(true) # 2 + Base.Experimental.task_metrics(true) # 3 + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) # 2 + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) # 1 + @test is_task_metrics_enabled() + @sync for i in 1:5 # 0 (not negative) + Threads.@spawn Base.Experimental.task_metrics(false) + end + @test !is_task_metrics_enabled() + Base.Experimental.task_metrics(true) # 1 + @test is_task_metrics_enabled() + end + finally + while is_task_metrics_enabled() + Base.Experimental.task_metrics(false) + end + end +end + +@testset "task time counters" begin + @testset "enabled" begin + try + Base.Experimental.task_metrics(true) + start_time = time_ns() + t = Threads.@spawn peakflops() + wait(t) + end_time = time_ns() + wall_time_delta = end_time - start_time + @test t.metrics_enabled + @test Base.Experimental.task_running_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) >= Base.Experimental.task_running_time_ns(t) + @test wall_time_delta > Base.Experimental.task_wall_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end + @testset "disabled" begin + t = Threads.@spawn peakflops() + wait(t) + @test !t.metrics_enabled + @test isnothing(Base.Experimental.task_running_time_ns(t)) + @test isnothing(Base.Experimental.task_wall_time_ns(t)) + end + @testset "task not run" begin + t1 = Task(() -> nothing) + @test !t1.metrics_enabled + @test isnothing(Base.Experimental.task_running_time_ns(t1)) + @test isnothing(Base.Experimental.task_wall_time_ns(t1)) + try + Base.Experimental.task_metrics(true) + t2 = Task(() -> nothing) + @test t2.metrics_enabled + @test Base.Experimental.task_running_time_ns(t2) == 0 + @test Base.Experimental.task_wall_time_ns(t2) == 0 + finally + Base.Experimental.task_metrics(false) + end + end + @testset "task failure" begin + try + Base.Experimental.task_metrics(true) + t = Threads.@spawn error("this task failed") + @test_throws "this task failed" wait(t) + @test Base.Experimental.task_running_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) >= Base.Experimental.task_running_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end + @testset "direct yield(t)" begin + try + Base.Experimental.task_metrics(true) + start = time_ns() + t_outer = Threads.@spawn begin + t_inner = Task(() -> peakflops()) + t_inner.sticky = false + # directly yield to `t_inner` rather calling `schedule(t_inner)` + yield(t_inner) + wait(t_inner) + @test Base.Experimental.task_running_time_ns(t_inner) > 0 + @test Base.Experimental.task_wall_time_ns(t_inner) > 0 + @test Base.Experimental.task_wall_time_ns(t_inner) >= Base.Experimental.task_running_time_ns(t_inner) + end + wait(t_outer) + delta = time_ns() - start + @test Base.Experimental.task_running_time_ns(t_outer) > 0 + @test Base.Experimental.task_wall_time_ns(t_outer) > 0 + @test Base.Experimental.task_wall_time_ns(t_outer) >= Base.Experimental.task_running_time_ns(t_outer) + @test Base.Experimental.task_wall_time_ns(t_outer) < delta + finally + Base.Experimental.task_metrics(false) + end + end + @testset "bad schedule" begin + try + Base.Experimental.task_metrics(true) + t1 = Task((x) -> 1) + schedule(t1) # MethodError + yield() + @assert istaskfailed(t1) + @test Base.Experimental.task_running_time_ns(t1) > 0 + @test Base.Experimental.task_wall_time_ns(t1) > 0 + foo(a, b) = a + b + t2 = Task(() -> (peakflops(); foo(wait()))) + schedule(t2) + yield() + @assert istaskstarted(t1) && !istaskdone(t2) + schedule(t2, 1) + yield() + @assert istaskfailed(t2) + @test Base.Experimental.task_running_time_ns(t2) > 0 + @test Base.Experimental.task_wall_time_ns(t2) > 0 + finally + Base.Experimental.task_metrics(false) + end + end + @testset "continuously update until task done" begin + try + Base.Experimental.task_metrics(true) + last_running_time = Ref(typemax(Int)) + last_wall_time = Ref(typemax(Int)) + t = Threads.@spawn begin + running_time = Base.Experimental.task_running_time_ns() + wall_time = Base.Experimental.task_wall_time_ns() + for _ in 1:5 + x = time_ns() + while time_ns() < x + 100 + end + new_running_time = Base.Experimental.task_running_time_ns() + new_wall_time = Base.Experimental.task_wall_time_ns() + @test new_running_time > running_time + @test new_wall_time > wall_time + running_time = new_running_time + wall_time = new_wall_time + end + last_running_time[] = running_time + last_wall_time[] = wall_time + end + wait(t) + final_running_time = Base.Experimental.task_running_time_ns(t) + final_wall_time = Base.Experimental.task_wall_time_ns(t) + @test last_running_time[] < final_running_time + @test last_wall_time[] < final_wall_time + # ensure many more tasks are run to make sure the counters are + # not being updated after a task is done e.g. only when a new task is found + @sync for _ in 1:Threads.nthreads() + Threads.@spawn rand() + end + @test final_running_time == Base.Experimental.task_running_time_ns(t) + @test final_wall_time == Base.Experimental.task_wall_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end +end + +@testset "task time counters: lots of spawns" begin + using Dates + try + Base.Experimental.task_metrics(true) + # create more tasks than we have threads. + # - all tasks must have: cpu time <= wall time + # - some tasks must have: cpu time < wall time + # - summing across all tasks we must have: total cpu time <= available cpu time + n_tasks = 2 * Threads.nthreads(:default) + cpu_times = Vector{UInt64}(undef, n_tasks) + wall_times = Vector{UInt64}(undef, n_tasks) + start_time = time_ns() + @sync begin + for i in 1:n_tasks + start_time_i = time_ns() + task_i = Threads.@spawn peakflops() + Threads.@spawn begin + wait(task_i) + end_time_i = time_ns() + wall_time_delta_i = end_time_i - start_time_i + cpu_times[$i] = cpu_time_i = Base.Experimental.task_running_time_ns(task_i) + wall_times[$i] = wall_time_i = Base.Experimental.task_wall_time_ns(task_i) + # task should have recorded some cpu-time and some wall-time + @test cpu_time_i > 0 + @test wall_time_i > 0 + # task cpu-time cannot be greater than its wall-time + @test wall_time_i >= cpu_time_i + # task wall-time must be less than our manually measured wall-time + # between calling `@spawn` and returning from `wait`. + @test wall_time_delta_i > wall_time_i + end + end + end + end_time = time_ns() + wall_time_delta = (end_time - start_time) + available_cpu_time = wall_time_delta * Threads.nthreads(:default) + summed_cpu_time = sum(cpu_times) + # total CPU time from all tasks can't exceed what was actually available. + @test available_cpu_time > summed_cpu_time + # some tasks must have cpu-time less than their wall-time, because we had more tasks + # than threads. + summed_wall_time = sum(wall_times) + @test summed_wall_time > summed_cpu_time + finally + Base.Experimental.task_metrics(false) + end +end + end # main testset