Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove Scheduler #5687

Merged
merged 10 commits into from
Feb 8, 2014
1 change: 1 addition & 0 deletions base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
# last::Task
# storage::Any
# consumers
# started::Bool
# done::Bool
# runnable::Bool
# end
Expand Down
10 changes: 2 additions & 8 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -325,21 +325,15 @@ function init_load_path()
push!(LOAD_PATH,abspath(JULIA_HOME,"..","share","julia","site",vers))
end

function init_sched()
global const Workqueue = Any[]
end
global const Workqueue = Any[]

function init_head_sched()
# start in "head node" mode
global const Scheduler = Task(()->event_loop(true), 1024*1024)
global PGRP
global LPROC
LPROC.id = 1
assert(length(PGRP.workers) == 0)
register_worker(LPROC)
# make scheduler aware of current (root) task
unshift!(Workqueue, roottask)
yield()
end

function init_profiler()
Expand Down Expand Up @@ -376,10 +370,10 @@ function _start()
LinAlg.init()
GMP.gmp_init()
init_profiler()
start_gc_msgs_task()

#atexit(()->flush(STDOUT))
try
init_sched()
any(a->(a=="--worker"), ARGS) || init_head_sched()
init_load_path()
(quiet,repl,startup,color_set,history) = process_options(copy(ARGS))
Expand Down
20 changes: 0 additions & 20 deletions base/inference.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1093,26 +1093,6 @@ f_argnames(ast) =

is_rest_arg(arg::ANY) = (ccall(:jl_is_rest_arg,Int32,(Any,), arg) != 0)

# function typeinf_task(caller)
# result = ()
# while true
# (caller, args) = yieldto(caller, result)
# result = typeinf_ext_(args...)
# end
# end

#Inference_Task = Task(typeinf_task, 2097152)
#yieldto(Inference_Task, current_task())

#function typeinf_ext(linfo, atypes, sparams, cop)
#C = current_task()
#args = (linfo, atypes, sparams, cop)
#if is(C, Inference_Task)
# return typeinf_ext_(args...)
#end
#return yieldto(Inference_Task, C, args)
#end

function typeinf_ext(linfo, atypes::ANY, sparams::ANY, def)
global inference_stack
last = inference_stack
Expand Down
181 changes: 11 additions & 170 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -394,24 +394,6 @@ type RemoteRef
RemoteRef(w::Worker) = RemoteRef(w.id)
RemoteRef() = RemoteRef(myid())

global WeakRemoteRef
function WeakRemoteRef(w, wh, id)
return new(w, wh, id)
end

function WeakRemoteRef(pid::Integer)
rr = WeakRemoteRef(pid, myid(), REQ_ID)
REQ_ID += 1
if mod(REQ_ID,200) == 0
gc()
end
rr
end

WeakRemoteRef(w::LocalProcess) = WeakRemoteRef(myid())
WeakRemoteRef(w::Worker) = WeakRemoteRef(w.id)
WeakRemoteRef() = WeakRemoteRef(myid())

global next_id
next_id() = (id=(myid(),REQ_ID); REQ_ID+=1; id)
end
Expand Down Expand Up @@ -467,7 +449,14 @@ function del_clients(pairs::(Any,Any)...)
end
end

any_gc_flag = false
any_gc_flag = Condition()
function start_gc_msgs_task()
enq_work(Task(()->
while true
wait(any_gc_flag)
flush_gc_msgs()
end))
end

function send_del_client(rr::RemoteRef)
if rr.where == myid()
Expand All @@ -480,7 +469,7 @@ function send_del_client(rr::RemoteRef)
w = worker_from_id(rr.where)
push!(w.del_msgs, (rr2id(rr), myid()))
w.gcflag = true
global any_gc_flag = true
notify(any_gc_flag)
end
end

Expand Down Expand Up @@ -508,7 +497,7 @@ function send_add_client(rr::RemoteRef, i)
#println("$(myid()) adding $((rr2id(rr), i)) for $(rr.where)")
push!(w.add_msgs, (rr2id(rr), i))
w.gcflag = true
global any_gc_flag = true
notify(any_gc_flag)
end
end

Expand Down Expand Up @@ -735,35 +724,6 @@ end
take_ref(rid) = take(lookup_ref(rid))
take(rr::RemoteRef) = call_on_owner(take_ref, rr)

## work queue ##

function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
unshift!(Workqueue, t)
end

function perform_work()
perform_work(pop!(Workqueue))
end

function perform_work(t::Task)
if !istaskstarted(t)
# starting new task
yieldto(t)
else
# continuing interrupted work item
arg = t.result
t.result = nothing
t.runnable = true
yieldto(t, arg)
end
t = current_task().last
if t.runnable
# still runnable; return to queue
enq_work(t)
end
end

function deliver_result(sock::IO, msg, oid, value)
#print("$(myid()) sending result $oid\n")
if is(msg,:call_fetch)
Expand Down Expand Up @@ -956,11 +916,9 @@ function start_worker(out::IO)

ccall(:jl_install_sigint_handler, Void, ())

global const Scheduler = current_task()

try
check_master_connect(60.0)
event_loop(false)
wait()
catch err
print(STDERR, "unhandled exception on $(myid()): $(err)\nexiting.\n")
end
Expand Down Expand Up @@ -1216,37 +1174,6 @@ end

## higher-level functions: spawn, pmap, pfor, etc. ##

sync_begin() = task_local_storage(:SPAWNS, ({}, get(task_local_storage(), :SPAWNS, ())))

function sync_end()
spawns = get(task_local_storage(), :SPAWNS, ())
if is(spawns,())
error("sync_end() without sync_begin()")
end
refs = spawns[1]
task_local_storage(:SPAWNS, spawns[2])
for r in refs
wait(r)
end
end

macro sync(block)
quote
sync_begin()
v = $(esc(block))
sync_end()
v
end
end

function sync_add(r)
spawns = get(task_local_storage(), :SPAWNS, ())
if !is(spawns,())
push!(spawns[1], r)
end
r
end

let nextidx = 1
global chooseproc
function chooseproc(thunk::Function)
Expand Down Expand Up @@ -1297,23 +1224,6 @@ macro fetchfrom(p, expr)
:(remotecall_fetch($(esc(p)), $(esc(expr))))
end

function spawnlocal(thunk)
t = Task(thunk)
sync_add(t)
enq_work(t)
t
end

macro async(expr)
expr = localize_vars(:(()->($expr)), false)
:(spawnlocal($(esc(expr))))
end

macro spawnlocal(expr)
warn_once("@spawnlocal is deprecated, use @async instead.")
:(@async $(esc(expr)))
end

function at_each(f, args...)
for w in PGRP.workers
sync_add(remotecall(w.id, f, args...))
Expand Down Expand Up @@ -1531,75 +1441,6 @@ end
# 2/(nc/niter)
# end

## event processing, I/O and work scheduling ##

function yield(args...)
ct = current_task()
# preserve Task.last across calls to the scheduler
prev = ct.last
v = yieldto(Scheduler, args...)
ct.last = prev
return v
end

function pause()
@unix_only ccall(:pause, Void, ())
@windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff)
end

function event_loop(isclient)
iserr, lasterr, bt = false, nothing, {}
while true
try
if iserr
display_error(lasterr, bt)
println(STDERR)
iserr, lasterr, bt = false, nothing, {}
else
while true
if isempty(Workqueue)
if any_gc_flag
flush_gc_msgs()
end
c = process_events(true)
if c==0 && eventloop()!=C_NULL && isempty(Workqueue) && !any_gc_flag
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
perform_work()
process_events(false)
end
end
end
catch err
if iserr
ccall(:jl_, Void, (Any,), (
"\n!!!An ERROR occurred while printing the last error!!!\n",
lasterr,
bt
))
end
iserr, lasterr = true, err
bt = catch_backtrace()
if isclient && isa(err,InterruptException)
# root task is waiting for something on client. allow C-C
# to interrupt.
interrupt_waiting_task(roottask,err)
iserr, lasterr = false, nothing
end
end
end
end

# force a task to stop waiting with an exception
function interrupt_waiting_task(t::Task, err)
if !t.runnable
t.exception = err
enq_work(t)
end
end

function check_master_connect(timeout)
# If we do not have at least process 1 connect to us within timeout
Expand Down
1 change: 0 additions & 1 deletion base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ precompile(start, (Dict{Any,Any},))
precompile(perform_work, ())
precompile(isempty, (Array{Any,1},))
precompile(getindex, (Dict{Any,Any}, Int32))
precompile(event_loop, (Bool,))
precompile(_start, ())
precompile(process_options, (Array{Any,1},))
precompile(run_repl, ())
Expand Down
43 changes: 13 additions & 30 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -723,50 +723,33 @@ end
write(s::TTY, p::Ptr, nb::Integer) = @uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb::Ptr{Void})

function write(s::AsyncStream, b::Uint8)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb::Ptr{Void})
end
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return 1
end
function write(s::AsyncStream, c::Char)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb::Ptr{Void})
end
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return utf8sizeof(c)
end
function write{T}(s::AsyncStream, a::Array{T})
if isbits(T)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
n = uint(length(a)*sizeof(T))
@uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
write!(s,copy(a))
end
n = uint(length(a)*sizeof(T))
@uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return int(length(a)*sizeof(T))
else
check_open(s)
invoke(write,(IO,Array),s,a)
end
end
function write(s::AsyncStream, p::Ptr, nb::Integer)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
check_open(s)
ccall(:jl_write, Uint, (Ptr{Void},Ptr{Void},Uint), handle(s), p, uint(nb))
end
@uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return int(nb)
end

Expand Down
2 changes: 1 addition & 1 deletion base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ include("process.jl")
include("multimedia.jl")
importall .Multimedia
reinit_stdio()
ccall(:jl_get_uv_hooks, Void, ())
ccall(:jl_get_uv_hooks, Void, (Cint,), 0)
include("grisu.jl")
import .Grisu.print_shortest
include("printf.jl")
Expand Down
Loading