Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove Scheduler #5687

Merged
merged 10 commits into from
Feb 8, 2014
1 change: 1 addition & 0 deletions base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
# last::Task
# storage::Any
# consumers
# started::Bool
# done::Bool
# runnable::Bool
# end
Expand Down
10 changes: 2 additions & 8 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -325,21 +325,15 @@ function init_load_path()
push!(LOAD_PATH,abspath(JULIA_HOME,"..","share","julia","site",vers))
end

function init_sched()
global const Workqueue = Any[]
end
global const Workqueue = Any[]

function init_head_sched()
# start in "head node" mode
global const Scheduler = Task(()->event_loop(true), 1024*1024)
global PGRP
global LPROC
LPROC.id = 1
assert(length(PGRP.workers) == 0)
register_worker(LPROC)
# make scheduler aware of current (root) task
unshift!(Workqueue, roottask)
yield()
end

function init_profiler()
Expand Down Expand Up @@ -376,10 +370,10 @@ function _start()
LinAlg.init()
GMP.gmp_init()
init_profiler()
start_gc_msgs_task()

#atexit(()->flush(STDOUT))
try
init_sched()
any(a->(a=="--worker"), ARGS) || init_head_sched()
init_load_path()
(quiet,repl,startup,color_set,history) = process_options(copy(ARGS))
Expand Down
126 changes: 62 additions & 64 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,14 @@ function del_clients(pairs::(Any,Any)...)
end
end

any_gc_flag = false
any_gc_flag = Condition()
function start_gc_msgs_task()
enq_work(Task(()->
while true
wait(any_gc_flag)
flush_gc_msgs()
end))
end

function send_del_client(rr::RemoteRef)
if rr.where == myid()
Expand All @@ -480,7 +487,7 @@ function send_del_client(rr::RemoteRef)
w = worker_from_id(rr.where)
push!(w.del_msgs, (rr2id(rr), myid()))
w.gcflag = true
global any_gc_flag = true
notify(any_gc_flag)
end
end

Expand Down Expand Up @@ -508,7 +515,7 @@ function send_add_client(rr::RemoteRef, i)
#println("$(myid()) adding $((rr2id(rr), i)) for $(rr.where)")
push!(w.add_msgs, (rr2id(rr), i))
w.gcflag = true
global any_gc_flag = true
notify(any_gc_flag)
end
end

Expand Down Expand Up @@ -747,21 +754,22 @@ function perform_work()
end

function perform_work(t::Task)
ct = current_task()
if ct.runnable && t !== ct
# still runnable; ensure we will return here
enq_work(ct)
end
if !istaskstarted(t)
# starting new task
yieldto(t)
result = yieldto(t)
else
# continuing interrupted work item
arg = t.result
t.result = nothing
t.runnable = true
yieldto(t, arg)
end
t = current_task().last
if t.runnable
# still runnable; return to queue
enq_work(t)
result = yieldto(t, arg)
end
return result
end

function deliver_result(sock::IO, msg, oid, value)
Expand Down Expand Up @@ -956,11 +964,9 @@ function start_worker(out::IO)

ccall(:jl_install_sigint_handler, Void, ())

global const Scheduler = current_task()

try
check_master_connect(60.0)
event_loop(false)
wait()
catch err
print(STDERR, "unhandled exception on $(myid()): $(err)\nexiting.\n")
end
Expand Down Expand Up @@ -1532,65 +1538,57 @@ end
# end

## event processing, I/O and work scheduling ##

function yield(args...)
in_scheduler = false
yield() = yield_until(nothing)
function yield_until(return_test::Union(Function,Nothing))
ct = current_task()
if return_test === nothing
return_test = (ct)->ct.runnable
end
# preserve Task.last across calls to the scheduler
prev = ct.last
v = yieldto(Scheduler, args...)
ct.last = prev
return v
end

function pause()
@unix_only ccall(:pause, Void, ())
@windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff)
end

function event_loop(isclient)
iserr, lasterr, bt = false, nothing, {}
while true
try
if iserr
display_error(lasterr, bt)
println(STDERR)
iserr, lasterr, bt = false, nothing, {}
global in_scheduler
if in_scheduler
# we don't want to execute yield recursively, because
# the return condition would be ill-defined
warn("yielding from inside scheduler callback")
end
try
if isempty(Workqueue) && return_test(ct)
process_events(false)
end
if isempty(Workqueue) && return_test(ct)
return ()
end
in_scheduler = true
while true
if isempty(Workqueue)
c = process_events(true)
if c==0 && eventloop()!=C_NULL && isempty(Workqueue)
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
while true
if isempty(Workqueue)
if any_gc_flag
flush_gc_msgs()
end
c = process_events(true)
if c==0 && eventloop()!=C_NULL && isempty(Workqueue) && !any_gc_flag
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
perform_work()
process_events(false)
end
in_scheduler = false
result = perform_work()
in_scheduler = true
process_events(false)
if return_test(ct)
return result
end
end
catch err
if iserr
ccall(:jl_, Void, (Any,), (
"\n!!!An ERROR occurred while printing the last error!!!\n",
lasterr,
bt
))
end
iserr, lasterr = true, err
bt = catch_backtrace()
if isclient && isa(err,InterruptException)
# root task is waiting for something on client. allow C-C
# to interrupt.
interrupt_waiting_task(roottask,err)
iserr, lasterr = false, nothing
end
end
finally
in_scheduler = false
ct.last = prev
end
@assert false
end

function pause()
@unix_only ccall(:pause, Void, ())
@windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff)
end

# force a task to stop waiting with an exception
Expand Down
1 change: 0 additions & 1 deletion base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ precompile(start, (Dict{Any,Any},))
precompile(perform_work, ())
precompile(isempty, (Array{Any,1},))
precompile(getindex, (Dict{Any,Any}, Int32))
precompile(event_loop, (Bool,))
precompile(_start, ())
precompile(process_options, (Array{Any,1},))
precompile(run_repl, ())
Expand Down
43 changes: 13 additions & 30 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -723,50 +723,33 @@ end
write(s::TTY, p::Ptr, nb::Integer) = @uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb::Ptr{Void})

function write(s::AsyncStream, b::Uint8)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb::Ptr{Void})
end
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return 1
end
function write(s::AsyncStream, c::Char)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb::Ptr{Void})
end
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return utf8sizeof(c)
end
function write{T}(s::AsyncStream, a::Array{T})
if isbits(T)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
n = uint(length(a)*sizeof(T))
@uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
write!(s,copy(a))
end
n = uint(length(a)*sizeof(T))
@uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return int(length(a)*sizeof(T))
else
check_open(s)
invoke(write,(IO,Array),s,a)
end
end
function write(s::AsyncStream, p::Ptr, nb::Integer)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
check_open(s)
ccall(:jl_write, Uint, (Ptr{Void},Ptr{Void},Uint), handle(s), p, uint(nb))
end
@uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return int(nb)
end

Expand Down
2 changes: 1 addition & 1 deletion base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ include("process.jl")
include("multimedia.jl")
importall .Multimedia
reinit_stdio()
ccall(:jl_get_uv_hooks, Void, ())
ccall(:jl_get_uv_hooks, Void, (Cint,), 0)
include("grisu.jl")
import .Grisu.print_shortest
include("printf.jl")
Expand Down
29 changes: 14 additions & 15 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ show(io::IO, t::Task) = print(io, "Task")

current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = t.done
istaskstarted(t::Task) = isdefined(t,:parent)
istaskstarted(t::Task) = t.started

# yield to a task, throwing an exception in it
function throwto(t::Task, exc)
Expand Down Expand Up @@ -49,9 +49,7 @@ function produce(v)
# make a task waiting for us runnable again
notify1(q)
end
r = yieldto(ct.last, v)
ct.parent = ct.last # always exit to last consumer
r
yieldto(ct.last, v)
end
produce(v...) = produce(v)

Expand All @@ -65,9 +63,16 @@ function consume(P::Task, values...)
ct = current_task()
prev = ct.last
ct.runnable = false
v = yieldto(P, values...)
ct.last = prev
ct.runnable = true
local v
try
v = yieldto(P, values...)
if ct.last !== P
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to change this now, but this almost seems more like an error --- there should be no reason a task other than P would yield to this one. Something is really broken if that happens.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was needed to pass pollfd test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but we should look into why that happens.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. for some reason the old tasks get woken up, finish their work, and then wake up their parent (who was intending to wait for its new task)

v = yield_until((ct)->ct.last === P)
end
finally
ct.last = prev
ct.runnable = true
end
if P.done
q = P.consumers
if !is(q, nothing)
Expand Down Expand Up @@ -106,15 +111,12 @@ end

function wait(c::Condition)
ct = current_task()
if ct === Scheduler
error("cannot execute blocking function from scheduler")
end

push!(c.waitq, ct)

ct.runnable = false
try
yield(c)
return yield()
catch
filter!(x->x!==ct, c.waitq)
rethrow()
Expand All @@ -123,11 +125,8 @@ end

function wait()
ct = current_task()
if ct === Scheduler
error("cannot execute blocking function from scheduler")
end
ct.runnable = false
yield()
return yield()
end

function notify(t::Task, arg::ANY=nothing; error=false)
Expand Down
4 changes: 2 additions & 2 deletions src/dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ extern jl_function_t *jl_typeinf_func;
extern int jl_boot_file_loaded;
extern void jl_get_builtin_hooks(void);
extern void jl_get_system_hooks(void);
extern void jl_get_uv_hooks(void);
extern void jl_get_uv_hooks(int);

DLLEXPORT
void jl_restore_system_image(char *fname)
Expand Down Expand Up @@ -1047,7 +1047,7 @@ void jl_restore_system_image(char *fname)

jl_get_builtin_hooks();
jl_get_system_hooks();
jl_get_uv_hooks();
jl_get_uv_hooks(1);
jl_boot_file_loaded = 1;
jl_typeinf_func = (jl_function_t*)jl_get_global(jl_base_module,
jl_symbol("typeinf_ext"));
Expand Down
Loading