From 34294e293fe28070ec40181d993347cd9360a524 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Wed, 16 Mar 2022 14:40:18 -0400 Subject: [PATCH] port partr multiq to julia Direct translation, not necessarily fully idiomatic. In preparation for future improvements. --- base/Base.jl | 1 + base/boot.jl | 2 +- base/partr.jl | 166 +++++++++++++++++++++++++++++ base/task.jl | 40 ++++--- src/builtins.c | 5 +- src/gc.c | 4 - src/init.c | 6 +- src/jl_exported_funcs.inc | 1 - src/jltypes.c | 14 ++- src/julia.h | 4 +- src/partr.c | 215 ++++---------------------------------- src/staticdata.c | 2 +- src/task.c | 2 +- 13 files changed, 229 insertions(+), 233 deletions(-) create mode 100644 base/partr.jl diff --git a/base/Base.jl b/base/Base.jl index ed50dd7a21bc29..42e740d15916cc 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -277,6 +277,7 @@ include("condition.jl") include("threads.jl") include("lock.jl") include("channels.jl") +include("partr.jl") include("task.jl") include("threads_overloads.jl") include("weakkeydict.jl") diff --git a/base/boot.jl b/base/boot.jl index d797e0a815a814..c994a491f438cd 100644 --- a/base/boot.jl +++ b/base/boot.jl @@ -224,7 +224,7 @@ primitive type Char <: AbstractChar 32 end primitive type Int8 <: Signed 8 end #primitive type UInt8 <: Unsigned 8 end primitive type Int16 <: Signed 16 end -primitive type UInt16 <: Unsigned 16 end +#primitive type UInt16 <: Unsigned 16 end #primitive type Int32 <: Signed 32 end #primitive type UInt32 <: Unsigned 32 end #primitive type Int64 <: Signed 64 end diff --git a/base/partr.jl b/base/partr.jl new file mode 100644 index 00000000000000..159cba1e9021a0 --- /dev/null +++ b/base/partr.jl @@ -0,0 +1,166 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +module Partr + +using ..Threads: SpinLock + +# a task heap +mutable struct taskheap + const lock::SpinLock + const tasks::Vector{Task} + @atomic ntasks::Int32 + @atomic priority::UInt16 + taskheap() = new(SpinLock(), Vector{Task}(undef, tasks_per_heap), zero(Int32), typemax(UInt16)) +end + +# multiqueue parameters +const heap_d = UInt32(8) +const heap_c = UInt32(2) + +# size of each heap +const tasks_per_heap = Int32(65536) # TODO: this should be smaller by default, but growable! + +# the multiqueue's heaps +global heaps::Vector{taskheap} +global heap_p::UInt32 = 0 + +# unbias state for the RNG +global cong_unbias::UInt32 = 0 + + +cong(max::UInt32, unbias::UInt32) = ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1) + +function unbias_cong(max::UInt32) + return typemax(UInt32) - ((typemax(UInt32) % max) + UInt32(1)) +end + + +function multiq_init(nthreads) + global heap_p = heap_c * nthreads + global cong_unbias = unbias_cong(UInt32(heap_p)) + global heaps = Vector{taskheap}(undef, heap_p) + for i = UInt32(1):heap_p + heaps[i] = taskheap() + end + nothing +end + + +function sift_up(heap::taskheap, idx::Int32) + while idx > Int32(1) + parent = (idx - Int32(2)) รท heap_d + Int32(1) + if heap.tasks[idx].priority < heap.tasks[parent].priority + t = heap.tasks[parent] + heap.tasks[parent] = heap.tasks[idx] + heap.tasks[idx] = t + idx = parent + else + break + end + end +end + + +function sift_down(heap::taskheap, idx::Int32) + if idx <= heap.ntasks + for child = (heap_d * idx - heap_d + Int32(2)):(heap_d * idx + Int32(1)) + child > tasks_per_heap && break + if isassigned(heap.tasks, child) && + heap.tasks[child].priority < heap.tasks[idx].priority + t = heap.tasks[idx] + heap.tasks[idx] = heap.tasks[child] + heap.tasks[child] = t + sift_down(heap, child) + end + end + end +end + + +function multiq_insert(task::Task, priority::UInt16) + task.priority = priority + + rn = cong(heap_p, cong_unbias) + while !trylock(heaps[rn].lock) + rn = cong(heap_p, cong_unbias) + end + + if heaps[rn].ntasks >= tasks_per_heap + unlock(heaps[rn].lock) + # multiq insertion failed, increase #tasks per heap + return false + end + + ntasks = heaps[rn].ntasks + Int32(1) + @atomic :monotonic heaps[rn].ntasks = ntasks + heaps[rn].tasks[ntasks] = task + sift_up(heaps[rn], ntasks) + priority = heaps[rn].priority + if task.priority < priority + @atomic :monotonic heaps[rn].priority = task.priority + end + unlock(heaps[rn].lock) + return true +end + + +function multiq_deletemin() + local rn1, rn2 + local prio1, prio2 + + @label retry + GC.safepoint() + for i = UInt32(1):heap_p + if i == heap_p + return nothing + end + rn1 = cong(heap_p, cong_unbias) + rn2 = cong(heap_p, cong_unbias) + prio1 = heaps[rn1].priority + prio2 = heaps[rn2].priority + if prio1 > prio2 + prio1 = prio2 + rn1 = rn2 + elseif prio1 == prio2 && prio1 == typemax(UInt16) + continue + end + if trylock(heaps[rn1].lock) + if prio1 == heaps[rn1].priority + break + end + unlock(heaps[rn1].lock) + end + end + + task = heaps[rn1].tasks[1] + tid = Threads.threadid() + if ccall(:jl_set_task_tid, Cint, (Any, Cint), task, tid-1) == 0 + unlock(heaps[rn1].lock) + @goto retry + end + ntasks = heaps[rn1].ntasks + @atomic :monotonic heaps[rn1].ntasks = ntasks - Int32(1) + heaps[rn1].tasks[1] = heaps[rn1].tasks[ntasks] + Base._unsetindex!(heaps[rn1].tasks, Int(ntasks)) + prio1 = typemax(UInt16) + if ntasks > 1 + sift_down(heaps[rn1], Int32(1)) + prio1 = heaps[rn1].tasks[1].priority + end + @atomic :monotonic heaps[rn1].priority = prio1 + unlock(heaps[rn1].lock) + + return task +end + + +function multiq_check_empty() + for i = UInt32(1):heap_p + if heaps[i].ntasks != 0 + return false + end + end + return true +end + +end diff --git a/base/task.jl b/base/task.jl index fe091fd4c7ad35..ad45b6820e85ec 100644 --- a/base/task.jl +++ b/base/task.jl @@ -676,12 +676,14 @@ const StickyWorkqueue = InvasiveLinkedListSynchronized{Task} global const Workqueues = [StickyWorkqueue()] global const Workqueue = Workqueues[1] # default work queue is thread 1 function __preinit_threads__() - if length(Workqueues) < Threads.nthreads() - resize!(Workqueues, Threads.nthreads()) - for i = 2:length(Workqueues) + nt = Threads.nthreads() + if length(Workqueues) < nt + resize!(Workqueues, nt) + for i = 2:nt Workqueues[i] = StickyWorkqueue() end end + Partr.multiq_init(nt) nothing end @@ -706,7 +708,7 @@ function enq_work(t::Task) end push!(Workqueues[tid], t) else - if ccall(:jl_enqueue_task, Cint, (Any,), t) != 0 + if !Partr.multiq_insert(t, t.priority) # if multiq is full, give to a random thread (TODO fix) if tid == 0 tid = mod(time_ns() % Int, Threads.nthreads()) + 1 @@ -872,24 +874,30 @@ function ensure_rescheduled(othertask::Task) end function trypoptask(W::StickyWorkqueue) - isempty(W) && return - t = popfirst!(W) - if t._state !== task_state_runnable - # assume this somehow got queued twice, - # probably broken now, but try discarding this switch and keep going - # can't throw here, because it's probably not the fault of the caller to wait - # and don't want to use print() here, because that may try to incur a task switch - ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), - "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable\n") - return + while !isempty(W) + t = popfirst!(W) + if t._state !== task_state_runnable + # assume this somehow got queued twice, + # probably broken now, but try discarding this switch and keep going + # can't throw here, because it's probably not the fault of the caller to wait + # and don't want to use print() here, because that may try to incur a task switch + ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), + "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable\n") + continue + end + return t end - return t + return Partr.multiq_deletemin() +end + +function checktaskempty() + return Partr.multiq_check_empty() end @noinline function poptask(W::StickyWorkqueue) task = trypoptask(W) if !(task isa Task) - task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any), trypoptask, W) + task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty) end set_next_task(task) nothing diff --git a/src/builtins.c b/src/builtins.c index f81069424d784c..f8a78485551adb 100644 --- a/src/builtins.c +++ b/src/builtins.c @@ -2021,10 +2021,11 @@ void jl_init_primitives(void) JL_GC_DISABLED add_builtin("Bool", (jl_value_t*)jl_bool_type); add_builtin("UInt8", (jl_value_t*)jl_uint8_type); - add_builtin("Int32", (jl_value_t*)jl_int32_type); - add_builtin("Int64", (jl_value_t*)jl_int64_type); + add_builtin("UInt16", (jl_value_t*)jl_uint16_type); add_builtin("UInt32", (jl_value_t*)jl_uint32_type); add_builtin("UInt64", (jl_value_t*)jl_uint64_type); + add_builtin("Int32", (jl_value_t*)jl_int32_type); + add_builtin("Int64", (jl_value_t*)jl_int64_type); #ifdef _P64 add_builtin("Int", (jl_value_t*)jl_int64_type); #else diff --git a/src/gc.c b/src/gc.c index e41a2ee04c255b..d4af5f443764cf 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2824,7 +2824,6 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception); } -void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp); extern jl_value_t *cmpswap_names JL_GLOBALLY_ROOTED; // mark the initial root set @@ -2833,9 +2832,6 @@ static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) // modules gc_mark_queue_obj(gc_cache, sp, jl_main_module); - // tasks - jl_gc_mark_enqueued_tasks(gc_cache, sp); - // invisible builtin values if (jl_an_empty_vec_any != NULL) gc_mark_queue_obj(gc_cache, sp, jl_an_empty_vec_any); diff --git a/src/init.c b/src/init.c index 98d5081c1daafe..64af677564209f 100644 --- a/src/init.c +++ b/src/init.c @@ -780,7 +780,6 @@ static void post_boot_hooks(void) jl_char_type = (jl_datatype_t*)core("Char"); jl_int8_type = (jl_datatype_t*)core("Int8"); jl_int16_type = (jl_datatype_t*)core("Int16"); - jl_uint16_type = (jl_datatype_t*)core("UInt16"); jl_float16_type = (jl_datatype_t*)core("Float16"); jl_float32_type = (jl_datatype_t*)core("Float32"); jl_float64_type = (jl_datatype_t*)core("Float64"); @@ -792,10 +791,11 @@ static void post_boot_hooks(void) jl_bool_type->super = jl_integer_type; jl_uint8_type->super = jl_unsigned_type; - jl_int32_type->super = jl_signed_type; - jl_int64_type->super = jl_signed_type; + jl_uint16_type->super = jl_unsigned_type; jl_uint32_type->super = jl_unsigned_type; jl_uint64_type->super = jl_unsigned_type; + jl_int32_type->super = jl_signed_type; + jl_int64_type->super = jl_signed_type; jl_errorexception_type = (jl_datatype_t*)core("ErrorException"); jl_stackovf_exception = jl_new_struct_uninit((jl_datatype_t*)core("StackOverflowError")); diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 2aed69f47c30a9..7d8488bcea73c5 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -119,7 +119,6 @@ XX(jl_egal__bits) \ XX(jl_egal__special) \ XX(jl_eh_restore_state) \ - XX(jl_enqueue_task) \ XX(jl_enter_handler) \ XX(jl_enter_threaded_region) \ XX(jl_environ) \ diff --git a/src/jltypes.c b/src/jltypes.c index 5e84b200af937a..29729237a242db 100644 --- a/src/jltypes.c +++ b/src/jltypes.c @@ -2144,6 +2144,8 @@ void jl_init_types(void) JL_GC_DISABLED jl_any_type, jl_emptysvec, 64); jl_uint8_type = jl_new_primitivetype((jl_value_t*)jl_symbol("UInt8"), core, jl_any_type, jl_emptysvec, 8); + jl_uint16_type = jl_new_primitivetype((jl_value_t*)jl_symbol("UInt16"), core, + jl_any_type, jl_emptysvec, 16); jl_ssavalue_type = jl_new_datatype(jl_symbol("SSAValue"), core, jl_any_type, jl_emptysvec, jl_perm_symsvec(1, "id"), @@ -2508,7 +2510,7 @@ void jl_init_types(void) JL_GC_DISABLED "inferred", //"edges", //"absolute_max", - "ipo_purity_bits", "purity_bits", + "ipo_purity_bits", "purity_bits", "argescapes", "isspecsig", "precompile", "invoke", "specptr", // function object decls "relocatability"), @@ -2602,7 +2604,7 @@ void jl_init_types(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(14, + jl_perm_symsvec(15, "next", "queue", "storage", @@ -2616,8 +2618,9 @@ void jl_init_types(void) JL_GC_DISABLED "rngState3", "_state", "sticky", - "_isexception"), - jl_svec(14, + "_isexception", + "priority"), + jl_svec(15, jl_any_type, jl_any_type, jl_any_type, @@ -2631,7 +2634,8 @@ void jl_init_types(void) JL_GC_DISABLED jl_uint64_type, jl_uint8_type, jl_bool_type, - jl_bool_type), + jl_bool_type, + jl_uint16_type), jl_emptysvec, 0, 1, 6); jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_nothing_type); diff --git a/src/julia.h b/src/julia.h index 3153b87c3a9b9b..863e9c8de7cbb0 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1881,12 +1881,12 @@ typedef struct _jl_task_t { _Atomic(uint8_t) _state; uint8_t sticky; // record whether this Task can be migrated to a new thread _Atomic(uint8_t) _isexception; // set if `result` is an exception to throw or that we exited with + // multiqueue priority + uint16_t priority; // hidden state: // id of owning thread - does not need to be defined until the task runs _Atomic(int16_t) tid; - // multiqueue priority - int16_t prio; // saved gc stack top for context switches jl_gcframe_t *gcstack; size_t world_age; diff --git a/src/partr.c b/src/partr.c index c8cc3245ebb4c0..74b0e2c8db6a9e 100644 --- a/src/partr.c +++ b/src/partr.c @@ -34,11 +34,9 @@ static const int16_t sleeping = 1; // information: These observations require sequentially-consistent fences to be inserted between each of those operational phases. // [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where // * Dequeuer: -// * 1a: `jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping)` -// * 1b: `multiq_check_empty` returns true +// * 1: `jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping)` // * Enqueuer: -// * 2a: `multiq_insert` -// * 2b: `jl_atomic_load_relaxed(&ptls->sleep_check_state)` in `jl_wakeup_thread` returns `not_sleeping` +// * 2: `jl_atomic_load_relaxed(&ptls->sleep_check_state)` in `jl_wakeup_thread` returns `not_sleeping` // i.e., the dequeuer misses the enqueue and enqueuer misses the sleep state transition. @@ -67,187 +65,21 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp, jl_value_t *obj) JL_NOTSAFEPOINT; -// multiq -// --- - -/* a task heap */ -typedef struct taskheap_tag { - uv_mutex_t lock; - jl_task_t **tasks; - _Atomic(int32_t) ntasks; - _Atomic(int16_t) prio; -} taskheap_t; - -/* multiqueue parameters */ -static const int32_t heap_d = 8; -static const int heap_c = 2; - -/* size of each heap */ -static const int tasks_per_heap = 65536; // TODO: this should be smaller by default, but growable! - -/* the multiqueue's heaps */ -static taskheap_t *heaps; -static int32_t heap_p; - -/* unbias state for the RNG */ -static uint64_t cong_unbias; - - -static inline void multiq_init(void) -{ - heap_p = heap_c * jl_n_threads; - heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t)); - for (int32_t i = 0; i < heap_p; ++i) { - uv_mutex_init(&heaps[i].lock); - heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*)); - jl_atomic_store_relaxed(&heaps[i].ntasks, 0); - jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX); - } - unbias_cong(heap_p, &cong_unbias); -} - - -static inline void sift_up(taskheap_t *heap, int32_t idx) -{ - if (idx > 0) { - int32_t parent = (idx-1)/heap_d; - if (heap->tasks[idx]->prio < heap->tasks[parent]->prio) { - jl_task_t *t = heap->tasks[parent]; - heap->tasks[parent] = heap->tasks[idx]; - heap->tasks[idx] = t; - sift_up(heap, parent); - } - } -} - - -static inline void sift_down(taskheap_t *heap, int32_t idx) -{ - if (idx < jl_atomic_load_relaxed(&heap->ntasks)) { - for (int32_t child = heap_d*idx + 1; - child < tasks_per_heap && child <= heap_d*idx + heap_d; - ++child) { - if (heap->tasks[child] - && heap->tasks[child]->prio < heap->tasks[idx]->prio) { - jl_task_t *t = heap->tasks[idx]; - heap->tasks[idx] = heap->tasks[child]; - heap->tasks[child] = t; - sift_down(heap, child); - } - } - } -} - - -static inline int multiq_insert(jl_task_t *task, int16_t priority) -{ - jl_ptls_t ptls = jl_current_task->ptls; - uint64_t rn; - - task->prio = priority; - do { - rn = cong(heap_p, cong_unbias, &ptls->rngseed); - } while (uv_mutex_trylock(&heaps[rn].lock) != 0); - - if (jl_atomic_load_relaxed(&heaps[rn].ntasks) >= tasks_per_heap) { - uv_mutex_unlock(&heaps[rn].lock); - // multiq insertion failed, increase #tasks per heap - return -1; - } - - int32_t ntasks = jl_atomic_load_relaxed(&heaps[rn].ntasks); - jl_atomic_store_relaxed(&heaps[rn].ntasks, ntasks + 1); - heaps[rn].tasks[ntasks] = task; - sift_up(&heaps[rn], ntasks); - int16_t prio = jl_atomic_load_relaxed(&heaps[rn].prio); - if (task->prio < prio) - jl_atomic_store_relaxed(&heaps[rn].prio, task->prio); - uv_mutex_unlock(&heaps[rn].lock); - - return 0; -} +// parallel task runtime +// --- -static inline jl_task_t *multiq_deletemin(void) +JL_DLLEXPORT uint32_t jl_rand_ptls(uint32_t max, uint32_t unbias) { jl_ptls_t ptls = jl_current_task->ptls; - uint64_t rn1 = 0, rn2; - int32_t i; - int16_t prio1, prio2; - jl_task_t *task; - retry: - jl_gc_safepoint(); - for (i = 0; i < heap_p; ++i) { - rn1 = cong(heap_p, cong_unbias, &ptls->rngseed); - rn2 = cong(heap_p, cong_unbias, &ptls->rngseed); - prio1 = jl_atomic_load_relaxed(&heaps[rn1].prio); - prio2 = jl_atomic_load_relaxed(&heaps[rn2].prio); - if (prio1 > prio2) { - prio1 = prio2; - rn1 = rn2; - } - else if (prio1 == prio2 && prio1 == INT16_MAX) - continue; - if (uv_mutex_trylock(&heaps[rn1].lock) == 0) { - if (prio1 == jl_atomic_load_relaxed(&heaps[rn1].prio)) - break; - uv_mutex_unlock(&heaps[rn1].lock); - } - } - if (i == heap_p) - return NULL; - - task = heaps[rn1].tasks[0]; - if (!jl_set_task_tid(task, ptls->tid)) { - uv_mutex_unlock(&heaps[rn1].lock); - goto retry; - } - int32_t ntasks = jl_atomic_load_relaxed(&heaps[rn1].ntasks) - 1; - jl_atomic_store_relaxed(&heaps[rn1].ntasks, ntasks); - heaps[rn1].tasks[0] = heaps[rn1].tasks[ntasks]; - heaps[rn1].tasks[ntasks] = NULL; - prio1 = INT16_MAX; - if (ntasks > 0) { - sift_down(&heaps[rn1], 0); - prio1 = heaps[rn1].tasks[0]->prio; - } - jl_atomic_store_relaxed(&heaps[rn1].prio, prio1); - uv_mutex_unlock(&heaps[rn1].lock); - - return task; -} - - -void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) -{ - int32_t i, j; - for (i = 0; i < heap_p; ++i) - for (j = 0; j < jl_atomic_load_relaxed(&heaps[i].ntasks); ++j) - jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)heaps[i].tasks[j]); + return cong(max, -(uint64_t)-unbias, &ptls->rngseed); } - -static int multiq_check_empty(void) -{ - int32_t i; - for (i = 0; i < heap_p; ++i) { - if (jl_atomic_load_relaxed(&heaps[i].ntasks) != 0) - return 0; - } - return 1; -} - - - -// parallel task runtime -// --- - // initialize the threading infrastructure // (used only by the main thread) void jl_init_threadinginfra(void) { - /* initialize the synchronization trees pool and the multiqueue */ - multiq_init(); + /* initialize the synchronization trees pool */ sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD; char *cp = getenv(THREAD_SLEEP_THRESHOLD_NAME); @@ -299,18 +131,6 @@ void jl_threadfun(void *arg) } -// enqueue the specified task for execution -JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task) -{ - char failed; - if (multiq_insert(task, task->prio) == -1) - failed = 1; - failed = 0; - JL_PROBE_RT_TASKQ_INSERT(jl_current_task->ptls, task); - return failed; -} - - int jl_running_under_rr(int recheck) { #ifdef _OS_LINUX_ @@ -439,21 +259,22 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) } -// get the next runnable task from the multiq +// get the next runnable task static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) { jl_gc_safepoint(); - jl_value_t *args[2] = { trypoptask, q }; - jl_task_t *task = (jl_task_t*)jl_apply(args, 2); + jl_task_t *task = (jl_task_t*)jl_apply_generic(trypoptask, &q, 1); if (jl_typeis(task, jl_task_type)) { int self = jl_atomic_load_relaxed(&jl_current_task->tid); jl_set_task_tid(task, self); return task; } - task = multiq_deletemin(); - if (task) - JL_PROBE_RT_TASKQ_GET(jl_current_task->ptls, task); - return task; + return NULL; +} + +static int check_empty(jl_value_t *checkempty) +{ + return jl_apply_generic(checkempty, NULL, 0) == jl_true; } static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT @@ -468,7 +289,7 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT extern _Atomic(unsigned) _threadedregion; -JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) +JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty) { jl_task_t *ct = jl_current_task; uint64_t start_cycles = 0; @@ -480,7 +301,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) // quick, race-y check to see if there seems to be any stuff in there jl_cpu_pause(); - if (!multiq_check_empty()) { + if (!check_empty(checkempty)) { start_cycles = 0; continue; } @@ -492,7 +313,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping); jl_fence(); // [^store_buffering_1] JL_PROBE_RT_SLEEP_CHECK_SLEEP(ptls); - if (!multiq_check_empty()) { // uses relaxed loads + if (!check_empty(checkempty)) { // uses relaxed loads if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) { jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us JL_PROBE_RT_SLEEP_CHECK_TASKQ_WAKE(ptls); diff --git a/src/staticdata.c b/src/staticdata.c index e72f29257ce19b..f697bc88e313d7 100644 --- a/src/staticdata.c +++ b/src/staticdata.c @@ -164,13 +164,13 @@ jl_value_t **const*const get_tags(void) { INSERT_TAG(jl_int64_type); INSERT_TAG(jl_bool_type); INSERT_TAG(jl_uint8_type); + INSERT_TAG(jl_uint16_type); INSERT_TAG(jl_uint32_type); INSERT_TAG(jl_uint64_type); INSERT_TAG(jl_char_type); INSERT_TAG(jl_weakref_type); INSERT_TAG(jl_int8_type); INSERT_TAG(jl_int16_type); - INSERT_TAG(jl_uint16_type); INSERT_TAG(jl_float16_type); INSERT_TAG(jl_float32_type); INSERT_TAG(jl_float64_type); diff --git a/src/task.c b/src/task.c index 1dd4e76b8ba1ce..dbafa0ee29e0a1 100644 --- a/src/task.c +++ b/src/task.c @@ -798,7 +798,7 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion t->gcstack = NULL; t->excstack = NULL; t->started = 0; - t->prio = -1; + t->priority = 0; jl_atomic_store_relaxed(&t->tid, t->copy_stack ? jl_atomic_load_relaxed(&ct->tid) : -1); // copy_stacks are always pinned since they can't be moved t->ptls = NULL; t->world_age = ct->world_age;