From a8caf23a146a2aa421a01e41da941a2b660e61d1 Mon Sep 17 00:00:00 2001 From: Jeff Bezanson Date: Sun, 9 Feb 2014 17:21:26 -0500 Subject: [PATCH] some refactoring of the scheduler - notify(Task) should really be part of the schedule(Task) function - simpler produce/consume that depends on fewer task state variables and moves towards better interaction between scheduled and un-scheduled tasks - the runnable flag now means !scheduled; will rename shortly - simpler event loop. it no longer needs to test t.runnable; instead whenever control returns you automatically know to exit the loop - task exit logic moved fully to task_done_hook - the biggest problem in all of this is hacks to make produce and consume as fast as possible. going through the scheduler still costs about a factor of 2 over what's possible it turns out to be fairly simple to have produce/consume yield manually for performance, and yet interact smoothly with the scheduler. the only problem is handling the producer exiting. we might need custom exit hooks to handle this more generally. --- base/exports.jl | 1 - base/multi.jl | 17 ++-- base/precompile.jl | 1 - base/stream.jl | 4 +- base/task.jl | 190 ++++++++++++++++---------------------------- doc/stdlib/base.rst | 13 ++- src/task.c | 21 ++--- test/spawn.jl | 2 +- 8 files changed, 93 insertions(+), 156 deletions(-) diff --git a/base/exports.jl b/base/exports.jl index 5d1a3f52f57dd..01b90966d6fc1 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1282,7 +1282,6 @@ export @sync, @async, @spawn, - @spawnlocal, # deprecated @spawnat, @fetch, @fetchfrom, diff --git a/base/multi.jl b/base/multi.jl index 41dc755e9e023..a35da74f22ce7 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -288,7 +288,7 @@ function worker_from_id(pg::ProcessGroup, i) end start = time() while (!haskey(map_pid_wrkr, i) && ((time() - start) < 60.0)) - sleep(0.1) + sleep(0.1) yield() end map_pid_wrkr[i] @@ -451,11 +451,10 @@ end any_gc_flag = Condition() function start_gc_msgs_task() - enq_work(Task(()-> - while true - wait(any_gc_flag) - flush_gc_msgs() - end)) + @schedule while true + wait(any_gc_flag) + flush_gc_msgs() + end end function send_del_client(rr::RemoteRef) @@ -577,7 +576,7 @@ function schedule_call(rid, thunk) rv = RemoteValue() (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid[1]) - enq_work(@task(run_work_thunk(rv,thunk))) + schedule(@task(run_work_thunk(rv,thunk))) rv end @@ -673,7 +672,7 @@ function remote_do(w::LocalProcess, f, args...) # does when it gets a :do message. # same for other messages on LocalProcess. thk = local_remotecall_thunk(f, args) - enq_work(Task(thk)) + schedule(Task(thk)) nothing end @@ -766,7 +765,7 @@ function accept_handler(server::TcpServer, status::Int32) end function create_message_handler_loop(sock::AsyncStream) #returns immediately - enq_work(@task begin + schedule(@task begin global PGRP #println("message_handler_loop") start_reading(sock) diff --git a/base/precompile.jl b/base/precompile.jl index ff7de349d8b3d..6b866750ae694 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -4,7 +4,6 @@ precompile(fdio, (Int32,)) precompile(ProcessGroup, (Int, Array{Any,1}, Array{Any,1})) precompile(next, (Dict{Any,Any}, Int)) precompile(start, (Dict{Any,Any},)) -precompile(perform_work, ()) precompile(isempty, (Array{Any,1},)) precompile(getindex, (Dict{Any,Any}, Int32)) precompile(_start, ()) diff --git a/base/stream.jl b/base/stream.jl index 7186550fd3e71..3d3ec3f0785af 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -759,10 +759,10 @@ function _uv_hook_writecb_task(s::AsyncStream,req::Ptr{Void},status::Int32) err = UVError("write",status) close(s) if d != C_NULL - notify_error(unsafe_pointer_to_objref(d)::Task,err) + schedule(unsafe_pointer_to_objref(d)::Task,err,error=true) end elseif d != C_NULL - notify(unsafe_pointer_to_objref(d)::Task) + schedule(unsafe_pointer_to_objref(d)::Task) end end diff --git a/base/task.jl b/base/task.jl index 9f344b5d7a2a1..1fc64adf2c5d3 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1,6 +1,6 @@ ## basic task functions and TLS -show(io::IO, t::Task) = print(io, "Task") +show(io::IO, t::Task) = print(io, "Task @0x$(hex(unsigned(pointer_from_objref(t)), WORD_SIZE>>2))") macro task(ex) :(Task(()->$(esc(ex)))) @@ -54,29 +54,29 @@ function wait(t::Task) t.result end -function notify(t::Task, arg::ANY=nothing; error=false) - if t.runnable - Base.error("tried to resume task that is not stopped") - end - if error - t.exception = arg - else - t.result = arg - end - enq_work(t) - nothing -end -notify_error(t::Task, err) = notify(t, err, error=true) - # runtime system hook called when a task finishes function task_done_hook(t::Task) - if isa(t.donenotify, Condition) - if isdefined(t,:exception) && t.exception !== nothing - # TODO: maybe wrap this in a TaskExited exception - notify_error(t.donenotify, t.exception) - else - notify(t.donenotify, t.result) + err = isdefined(t,:exception) && t.exception !== nothing + result = err ? t.exception : t.result + + nexttask = t.last + + q = t.consumers + + if isa(q,Condition) && !isempty(q.waitq) + nexttask = shift!(q.waitq) + notify(q, result, error=err) + end + + isa(t.donenotify,Condition) && notify(t.donenotify, result, error=err) + + if nexttask.runnable + if err + nexttask.exception = t.exception end + yieldto(nexttask, t.result) + else + wait() end end @@ -84,43 +84,30 @@ end ## produce, consume, and task iteration function produce(v) - ct = current_task() - q = ct.consumers - if isa(q,Condition) - # make a task waiting for us runnable again - notify1(q) - end - yieldto(ct.last, v) + q = current_task().consumers + yieldto(shift!(q.waitq), v) + q.waitq[1].result end produce(v...) = produce(v) function consume(P::Task, values...) - while !(P.runnable || P.done) - if P.consumers === nothing - P.consumers = Condition() - end - wait(P.consumers) + if P.done + return wait(P) end - ct = current_task() - prev = ct.last - ct.runnable = false - local v - try - v = yieldto(P, values...) - if ct.last !== P - v = yield_until((ct)->ct.last === P) - end - finally - ct.last = prev - ct.runnable = true + if P.consumers === nothing + P.consumers = Condition() end - if P.done - q = P.consumers - if !is(q, nothing) - notify(q, P.result) - end + + ct = current_task() + + push!(P.consumers.waitq, ct) + ct.result = length(values)==1 ? values[1] : values + + if P.runnable + return yieldto(P) + else + return wait() end - v end start(t::Task) = nothing @@ -144,9 +131,8 @@ function wait(c::Condition) push!(c.waitq, ct) - ct.runnable = false try - return yield() + return wait() catch filter!(x->x!==ct, c.waitq) rethrow() @@ -156,14 +142,12 @@ end function notify(c::Condition, arg::ANY=nothing; all=true, error=false) if all for t in c.waitq - !error ? (t.result = arg) : (t.exception = arg) - enq_work(t) + schedule(t, arg, error=error) end empty!(c.waitq) elseif !isempty(c.waitq) t = shift!(c.waitq) - !error ? (t.result = arg) : (t.exception = arg) - enq_work(t) + schedule(t, arg, error=error) end nothing end @@ -178,19 +162,11 @@ notify1_error(c::Condition, err) = notify(c, err, error=true, all=false) function enq_work(t::Task) ccall(:uv_stop,Void,(Ptr{Void},),eventloop()) - unshift!(Workqueue, t) -end - -function perform_work() - perform_work(pop!(Workqueue)) + push!(Workqueue, t) + t end function perform_work(t::Task) - ct = current_task() - if ct.runnable && t !== ct - # still runnable; ensure we will return here - enq_work(ct) - end if !istaskstarted(t) # starting new task result = yieldto(t) @@ -215,54 +191,35 @@ end schedule(t::Task) = enq_work(t) -function wait() - ct = current_task() - ct.runnable = false - return yield() +function schedule(t::Task, arg; error=false) + # schedule a task to be (re)started with the given value or exception + if error + t.exception = arg + else + t.result = arg + end + enq_work(t) end -# yield() --- called for all blocking operations -in_scheduler = false -yield() = yield_until() -function yield_until(return_test = (t::Task)->t.runnable) +yield() = (enq_work(current_task()); wait()) + +function wait() ct = current_task() - # preserve Task.last across calls to the scheduler - prev = ct.last - global in_scheduler - if in_scheduler - # we don't want to execute yield recursively, because - # the return condition would be ill-defined - warn("yielding from inside scheduler callback") - end - try - if isempty(Workqueue) && return_test(ct) - process_events(false) - if isempty(Workqueue) && return_test(ct) - return nothing - end - end - in_scheduler = true - while true - if isempty(Workqueue) - c = process_events(true) - if c==0 && eventloop()!=C_NULL && isempty(Workqueue) - # if there are no active handles and no runnable tasks, just - # wait for signals. - pause() - end - else - in_scheduler = false - result = perform_work() - in_scheduler = true - process_events(false) - if return_test(ct) - return result - end + ct.runnable = false + while true + if isempty(Workqueue) + c = process_events(true) + if c==0 && eventloop()!=C_NULL && isempty(Workqueue) + # if there are no active handles and no runnable tasks, just + # wait for signals. + pause() end + else + result = perform_work(shift!(Workqueue)) + process_events(false) + # return when we come out of the queue + return result end - finally - in_scheduler = false - ct.last = prev end assert(false) end @@ -272,14 +229,6 @@ function pause() @windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff) end -# force a task to stop waiting with an exception -function interrupt_waiting_task(t::Task, err) - if !t.runnable - t.exception = err - enq_work(t) - end -end - ## dynamically-scoped waiting for multiple items @@ -325,8 +274,3 @@ macro async(expr) expr = localize_vars(:(()->($expr)), false) :(async_run_thunk($(esc(expr)))) end - -macro spawnlocal(expr) - warn_once("@spawnlocal is deprecated, use @async instead.") - :(@async $(esc(expr))) -end diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index 02af58c01f1c2..6b9f3f0eb0844 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -4468,7 +4468,7 @@ Parallel Computing Call a function asynchronously on the given arguments on the specified processor. Returns a ``RemoteRef``. -.. function:: wait(x) +.. function:: wait([x]) Block the current task until some event occurs, depending on the type of the argument: @@ -4483,6 +4483,9 @@ Parallel Computing * ``RawFD``: Wait for changes on a file descriptor (see `poll_fd` for keyword arguments and return code) + If no argument is passed, the task blocks until it is explicitly restarted, usually + by a call to ``schedule``. + .. function:: fetch(RemoteRef) Wait for and get the value of a remote reference. @@ -5212,7 +5215,7 @@ Tasks .. function:: yield() - For scheduled tasks, switch back to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks. + Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks. .. function:: task_local_storage(symbol) @@ -5246,12 +5249,16 @@ Tasks only one is. If ``error`` is true, the passed value is raised as an exception in the woken tasks. -.. function:: schedule(t::Task) +.. function:: schedule(t::Task, [val]; error=false) Add a task to the scheduler's queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such as ``wait``. + If a second argument is provided, it will be passed to the task (via the + return value of ``yieldto``) when it runs again. If ``error`` is true, + the value is raised as an exception in the woken task. + .. function:: @schedule Wrap an expression in a Task and add it to the scheduler's queue. diff --git a/src/task.c b/src/task.c index d5924cc205b96..41574f841568a 100644 --- a/src/task.c +++ b/src/task.c @@ -386,23 +386,12 @@ static void finish_task(jl_task_t *t, jl_value_t *resultval) #ifdef COPY_STACKS t->stkbuf = NULL; #endif - if (t->donenotify && t->donenotify != jl_nothing) { - if (task_done_hook_func == NULL) { - task_done_hook_func = (jl_function_t*)jl_get_global(jl_base_module, - jl_symbol("task_done_hook")); - } - if (task_done_hook_func != NULL) { - jl_apply(task_done_hook_func, (jl_value_t**)&t, 1); - } + if (task_done_hook_func == NULL) { + task_done_hook_func = (jl_function_t*)jl_get_global(jl_base_module, + jl_symbol("task_done_hook")); } - jl_task_t *cont = jl_current_task->last; - if (!cont->done) { - jl_switchto(cont, t->result); - } - else { - jl_function_t* yield_f = - (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("yield")); - (void)jl_apply(yield_f,NULL,0); + if (task_done_hook_func != NULL) { + jl_apply(task_done_hook_func, (jl_value_t**)&t, 1); } assert(0); } diff --git a/test/spawn.jl b/test/spawn.jl index d4753b0757597..9970f5e843de7 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -113,7 +113,7 @@ t = @async begin @test p.exitcode == 0 end yield() -Base.interrupt_waiting_task(t, InterruptException()) +schedule(t, InterruptException(), error=true) yield() put(r,11) yield()