Skip to content

Commit

Permalink
some refactoring of the scheduler
Browse files Browse the repository at this point in the history
- notify(Task) should really be part of the schedule(Task) function

- simpler produce/consume that depends on fewer task state variables
  and moves towards better interaction between scheduled and
  un-scheduled tasks

- the runnable flag now means !scheduled; will rename shortly

- simpler event loop. it no longer needs to test t.runnable; instead
  whenever control returns you automatically know to exit the loop

- task exit logic moved fully to task_done_hook

- the biggest problem in all of this is hacks to make produce and
  consume as fast as possible. going through the scheduler still costs
  about a factor of 2 over what's possible
  it turns out to be fairly simple to have produce/consume yield manually
  for performance, and yet interact smoothly with the scheduler. the only
  problem is handling the producer exiting. we might need custom exit
  hooks to handle this more generally.
  • Loading branch information
JeffBezanson committed Feb 9, 2014
1 parent d2a4e5c commit a8caf23
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 156 deletions.
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,6 @@ export
@sync,
@async,
@spawn,
@spawnlocal, # deprecated
@spawnat,
@fetch,
@fetchfrom,
Expand Down
17 changes: 8 additions & 9 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ function worker_from_id(pg::ProcessGroup, i)
end
start = time()
while (!haskey(map_pid_wrkr, i) && ((time() - start) < 60.0))
sleep(0.1)
sleep(0.1)
yield()
end
map_pid_wrkr[i]
Expand Down Expand Up @@ -451,11 +451,10 @@ end

any_gc_flag = Condition()
function start_gc_msgs_task()
enq_work(Task(()->
while true
wait(any_gc_flag)
flush_gc_msgs()
end))
@schedule while true
wait(any_gc_flag)
flush_gc_msgs()
end
end

function send_del_client(rr::RemoteRef)
Expand Down Expand Up @@ -577,7 +576,7 @@ function schedule_call(rid, thunk)
rv = RemoteValue()
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid[1])
enq_work(@task(run_work_thunk(rv,thunk)))
schedule(@task(run_work_thunk(rv,thunk)))
rv
end

Expand Down Expand Up @@ -673,7 +672,7 @@ function remote_do(w::LocalProcess, f, args...)
# does when it gets a :do message.
# same for other messages on LocalProcess.
thk = local_remotecall_thunk(f, args)
enq_work(Task(thk))
schedule(Task(thk))
nothing
end

Expand Down Expand Up @@ -766,7 +765,7 @@ function accept_handler(server::TcpServer, status::Int32)
end

function create_message_handler_loop(sock::AsyncStream) #returns immediately
enq_work(@task begin
schedule(@task begin
global PGRP
#println("message_handler_loop")
start_reading(sock)
Expand Down
1 change: 0 additions & 1 deletion base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ precompile(fdio, (Int32,))
precompile(ProcessGroup, (Int, Array{Any,1}, Array{Any,1}))
precompile(next, (Dict{Any,Any}, Int))
precompile(start, (Dict{Any,Any},))
precompile(perform_work, ())
precompile(isempty, (Array{Any,1},))
precompile(getindex, (Dict{Any,Any}, Int32))
precompile(_start, ())
Expand Down
4 changes: 2 additions & 2 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -759,10 +759,10 @@ function _uv_hook_writecb_task(s::AsyncStream,req::Ptr{Void},status::Int32)
err = UVError("write",status)
close(s)
if d != C_NULL
notify_error(unsafe_pointer_to_objref(d)::Task,err)
schedule(unsafe_pointer_to_objref(d)::Task,err,error=true)
end
elseif d != C_NULL
notify(unsafe_pointer_to_objref(d)::Task)
schedule(unsafe_pointer_to_objref(d)::Task)
end
end

Expand Down
190 changes: 67 additions & 123 deletions base/task.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## basic task functions and TLS

show(io::IO, t::Task) = print(io, "Task")
show(io::IO, t::Task) = print(io, "Task @0x$(hex(unsigned(pointer_from_objref(t)), WORD_SIZE>>2))")

macro task(ex)
:(Task(()->$(esc(ex))))
Expand Down Expand Up @@ -54,73 +54,60 @@ function wait(t::Task)
t.result
end

function notify(t::Task, arg::ANY=nothing; error=false)
if t.runnable
Base.error("tried to resume task that is not stopped")
end
if error
t.exception = arg
else
t.result = arg
end
enq_work(t)
nothing
end
notify_error(t::Task, err) = notify(t, err, error=true)

# runtime system hook called when a task finishes
function task_done_hook(t::Task)
if isa(t.donenotify, Condition)
if isdefined(t,:exception) && t.exception !== nothing
# TODO: maybe wrap this in a TaskExited exception
notify_error(t.donenotify, t.exception)
else
notify(t.donenotify, t.result)
err = isdefined(t,:exception) && t.exception !== nothing
result = err ? t.exception : t.result

nexttask = t.last

q = t.consumers

if isa(q,Condition) && !isempty(q.waitq)
nexttask = shift!(q.waitq)
notify(q, result, error=err)
end

isa(t.donenotify,Condition) && notify(t.donenotify, result, error=err)

if nexttask.runnable
if err
nexttask.exception = t.exception
end
yieldto(nexttask, t.result)
else
wait()
end
end


## produce, consume, and task iteration

function produce(v)
ct = current_task()
q = ct.consumers
if isa(q,Condition)
# make a task waiting for us runnable again
notify1(q)
end
yieldto(ct.last, v)
q = current_task().consumers
yieldto(shift!(q.waitq), v)
q.waitq[1].result
end
produce(v...) = produce(v)

function consume(P::Task, values...)
while !(P.runnable || P.done)
if P.consumers === nothing
P.consumers = Condition()
end
wait(P.consumers)
if P.done
return wait(P)
end
ct = current_task()
prev = ct.last
ct.runnable = false
local v
try
v = yieldto(P, values...)
if ct.last !== P
v = yield_until((ct)->ct.last === P)
end
finally
ct.last = prev
ct.runnable = true
if P.consumers === nothing
P.consumers = Condition()
end
if P.done
q = P.consumers
if !is(q, nothing)
notify(q, P.result)
end

ct = current_task()

push!(P.consumers.waitq, ct)
ct.result = length(values)==1 ? values[1] : values

if P.runnable
return yieldto(P)
else
return wait()
end
v
end

start(t::Task) = nothing
Expand All @@ -144,9 +131,8 @@ function wait(c::Condition)

push!(c.waitq, ct)

ct.runnable = false
try
return yield()
return wait()
catch
filter!(x->x!==ct, c.waitq)
rethrow()
Expand All @@ -156,14 +142,12 @@ end
function notify(c::Condition, arg::ANY=nothing; all=true, error=false)
if all
for t in c.waitq
!error ? (t.result = arg) : (t.exception = arg)
enq_work(t)
schedule(t, arg, error=error)
end
empty!(c.waitq)
elseif !isempty(c.waitq)
t = shift!(c.waitq)
!error ? (t.result = arg) : (t.exception = arg)
enq_work(t)
schedule(t, arg, error=error)
end
nothing
end
Expand All @@ -178,19 +162,11 @@ notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)

function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
unshift!(Workqueue, t)
end

function perform_work()
perform_work(pop!(Workqueue))
push!(Workqueue, t)
t
end

function perform_work(t::Task)
ct = current_task()
if ct.runnable && t !== ct
# still runnable; ensure we will return here
enq_work(ct)
end
if !istaskstarted(t)
# starting new task
result = yieldto(t)
Expand All @@ -215,54 +191,35 @@ end

schedule(t::Task) = enq_work(t)

function wait()
ct = current_task()
ct.runnable = false
return yield()
function schedule(t::Task, arg; error=false)
# schedule a task to be (re)started with the given value or exception
if error
t.exception = arg
else
t.result = arg
end
enq_work(t)
end

# yield() --- called for all blocking operations
in_scheduler = false
yield() = yield_until()
function yield_until(return_test = (t::Task)->t.runnable)
yield() = (enq_work(current_task()); wait())

function wait()
ct = current_task()
# preserve Task.last across calls to the scheduler
prev = ct.last
global in_scheduler
if in_scheduler

This comment has been minimized.

Copy link
@vtjnash

vtjnash Feb 9, 2014

Member

if something in here calls wait() (e.g. print), I don't see where you make sure the conditions can only return in the correct order

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Feb 10, 2014

Author Member

You just can't do that. Printing a warning doesn't really help.

This comment has been minimized.

Copy link
@vtjnash

vtjnash Feb 10, 2014

Member

it seems like it should have a warning, if not an error

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Feb 10, 2014

Author Member

Can't this case only be triggered by bugs in the callbacks in the standard library?

This comment has been minimized.

Copy link
@vtjnash

vtjnash Feb 10, 2014

Member

if you assume that only the standard library handles callbacks from libuv, yes.

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Feb 10, 2014

Author Member

Yes, that's how it should be.

# we don't want to execute yield recursively, because
# the return condition would be ill-defined
warn("yielding from inside scheduler callback")
end
try
if isempty(Workqueue) && return_test(ct)
process_events(false)
if isempty(Workqueue) && return_test(ct)
return nothing
end
end
in_scheduler = true
while true
if isempty(Workqueue)
c = process_events(true)
if c==0 && eventloop()!=C_NULL && isempty(Workqueue)
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
in_scheduler = false
result = perform_work()
in_scheduler = true
process_events(false)
if return_test(ct)
return result
end
ct.runnable = false
while true
if isempty(Workqueue)
c = process_events(true)

This comment has been minimized.

Copy link
@vtjnash

vtjnash Mar 15, 2014

Member

@JeffBezanson should this have a try/finally block, in case the user presses ctrl-c? or is no cleanup necessary because we set ct.runnable = true elsewhere?

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Mar 15, 2014

Author Member

That's right; at this point this function hasn't made any state changes so there's nothing to undo. But in wait(::Condition) we might need to reset the task's state on interrupt.

if c==0 && eventloop()!=C_NULL && isempty(Workqueue)
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
result = perform_work(shift!(Workqueue))
process_events(false)
# return when we come out of the queue
return result
end
finally
in_scheduler = false
ct.last = prev
end
assert(false)
end
Expand All @@ -272,14 +229,6 @@ function pause()
@windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff)
end

# force a task to stop waiting with an exception
function interrupt_waiting_task(t::Task, err)
if !t.runnable
t.exception = err
enq_work(t)
end
end


## dynamically-scoped waiting for multiple items

Expand Down Expand Up @@ -325,8 +274,3 @@ macro async(expr)
expr = localize_vars(:(()->($expr)), false)
:(async_run_thunk($(esc(expr))))
end

macro spawnlocal(expr)
warn_once("@spawnlocal is deprecated, use @async instead.")
:(@async $(esc(expr)))
end
13 changes: 10 additions & 3 deletions doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4468,7 +4468,7 @@ Parallel Computing

Call a function asynchronously on the given arguments on the specified processor. Returns a ``RemoteRef``.

.. function:: wait(x)
.. function:: wait([x])

Block the current task until some event occurs, depending on the type
of the argument:
Expand All @@ -4483,6 +4483,9 @@ Parallel Computing

* ``RawFD``: Wait for changes on a file descriptor (see `poll_fd` for keyword arguments and return code)

If no argument is passed, the task blocks until it is explicitly restarted, usually
by a call to ``schedule``.

.. function:: fetch(RemoteRef)

Wait for and get the value of a remote reference.
Expand Down Expand Up @@ -5212,7 +5215,7 @@ Tasks

.. function:: yield()

For scheduled tasks, switch back to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks.
Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks.

.. function:: task_local_storage(symbol)

Expand Down Expand Up @@ -5246,12 +5249,16 @@ Tasks
only one is. If ``error`` is true, the passed value is raised as an
exception in the woken tasks.

.. function:: schedule(t::Task)
.. function:: schedule(t::Task, [val]; error=false)

Add a task to the scheduler's queue. This causes the task to run constantly
when the system is otherwise idle, unless the task performs a blocking
operation such as ``wait``.

If a second argument is provided, it will be passed to the task (via the
return value of ``yieldto``) when it runs again. If ``error`` is true,
the value is raised as an exception in the woken task.

.. function:: @schedule

Wrap an expression in a Task and add it to the scheduler's queue.
Expand Down
Loading

1 comment on commit a8caf23

@StefanKarpinski
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a major improvement. Very nice.

Please sign in to comment.