diff --git a/base/boot.jl b/base/boot.jl index e8111e71e1240..87f7ba8e5cde0 100644 --- a/base/boot.jl +++ b/base/boot.jl @@ -107,6 +107,7 @@ # last::Task # storage::Any # consumers +# started::Bool # done::Bool # runnable::Bool # end diff --git a/base/client.jl b/base/client.jl index 82f332e9876c0..c026b7c896ce7 100644 --- a/base/client.jl +++ b/base/client.jl @@ -325,21 +325,15 @@ function init_load_path() push!(LOAD_PATH,abspath(JULIA_HOME,"..","share","julia","site",vers)) end -function init_sched() - global const Workqueue = Any[] -end +global const Workqueue = Any[] function init_head_sched() # start in "head node" mode - global const Scheduler = Task(()->event_loop(true), 1024*1024) global PGRP global LPROC LPROC.id = 1 assert(length(PGRP.workers) == 0) register_worker(LPROC) - # make scheduler aware of current (root) task - unshift!(Workqueue, roottask) - yield() end function init_profiler() @@ -376,10 +370,10 @@ function _start() LinAlg.init() GMP.gmp_init() init_profiler() + start_gc_msgs_task() #atexit(()->flush(STDOUT)) try - init_sched() any(a->(a=="--worker"), ARGS) || init_head_sched() init_load_path() (quiet,repl,startup,color_set,history) = process_options(copy(ARGS)) diff --git a/base/inference.jl b/base/inference.jl index ab4d1e2da633e..563b539f48940 100644 --- a/base/inference.jl +++ b/base/inference.jl @@ -1093,26 +1093,6 @@ f_argnames(ast) = is_rest_arg(arg::ANY) = (ccall(:jl_is_rest_arg,Int32,(Any,), arg) != 0) -# function typeinf_task(caller) -# result = () -# while true -# (caller, args) = yieldto(caller, result) -# result = typeinf_ext_(args...) -# end -# end - -#Inference_Task = Task(typeinf_task, 2097152) -#yieldto(Inference_Task, current_task()) - -#function typeinf_ext(linfo, atypes, sparams, cop) - #C = current_task() - #args = (linfo, atypes, sparams, cop) - #if is(C, Inference_Task) - # return typeinf_ext_(args...) - #end - #return yieldto(Inference_Task, C, args) -#end - function typeinf_ext(linfo, atypes::ANY, sparams::ANY, def) global inference_stack last = inference_stack diff --git a/base/multi.jl b/base/multi.jl index 8c78d70a10a60..41dc755e9e023 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -394,24 +394,6 @@ type RemoteRef RemoteRef(w::Worker) = RemoteRef(w.id) RemoteRef() = RemoteRef(myid()) - global WeakRemoteRef - function WeakRemoteRef(w, wh, id) - return new(w, wh, id) - end - - function WeakRemoteRef(pid::Integer) - rr = WeakRemoteRef(pid, myid(), REQ_ID) - REQ_ID += 1 - if mod(REQ_ID,200) == 0 - gc() - end - rr - end - - WeakRemoteRef(w::LocalProcess) = WeakRemoteRef(myid()) - WeakRemoteRef(w::Worker) = WeakRemoteRef(w.id) - WeakRemoteRef() = WeakRemoteRef(myid()) - global next_id next_id() = (id=(myid(),REQ_ID); REQ_ID+=1; id) end @@ -467,7 +449,14 @@ function del_clients(pairs::(Any,Any)...) end end -any_gc_flag = false +any_gc_flag = Condition() +function start_gc_msgs_task() + enq_work(Task(()-> + while true + wait(any_gc_flag) + flush_gc_msgs() + end)) +end function send_del_client(rr::RemoteRef) if rr.where == myid() @@ -480,7 +469,7 @@ function send_del_client(rr::RemoteRef) w = worker_from_id(rr.where) push!(w.del_msgs, (rr2id(rr), myid())) w.gcflag = true - global any_gc_flag = true + notify(any_gc_flag) end end @@ -508,7 +497,7 @@ function send_add_client(rr::RemoteRef, i) #println("$(myid()) adding $((rr2id(rr), i)) for $(rr.where)") push!(w.add_msgs, (rr2id(rr), i)) w.gcflag = true - global any_gc_flag = true + notify(any_gc_flag) end end @@ -735,35 +724,6 @@ end take_ref(rid) = take(lookup_ref(rid)) take(rr::RemoteRef) = call_on_owner(take_ref, rr) -## work queue ## - -function enq_work(t::Task) - ccall(:uv_stop,Void,(Ptr{Void},),eventloop()) - unshift!(Workqueue, t) -end - -function perform_work() - perform_work(pop!(Workqueue)) -end - -function perform_work(t::Task) - if !istaskstarted(t) - # starting new task - yieldto(t) - else - # continuing interrupted work item - arg = t.result - t.result = nothing - t.runnable = true - yieldto(t, arg) - end - t = current_task().last - if t.runnable - # still runnable; return to queue - enq_work(t) - end -end - function deliver_result(sock::IO, msg, oid, value) #print("$(myid()) sending result $oid\n") if is(msg,:call_fetch) @@ -956,11 +916,9 @@ function start_worker(out::IO) ccall(:jl_install_sigint_handler, Void, ()) - global const Scheduler = current_task() - try check_master_connect(60.0) - event_loop(false) + wait() catch err print(STDERR, "unhandled exception on $(myid()): $(err)\nexiting.\n") end @@ -1216,37 +1174,6 @@ end ## higher-level functions: spawn, pmap, pfor, etc. ## -sync_begin() = task_local_storage(:SPAWNS, ({}, get(task_local_storage(), :SPAWNS, ()))) - -function sync_end() - spawns = get(task_local_storage(), :SPAWNS, ()) - if is(spawns,()) - error("sync_end() without sync_begin()") - end - refs = spawns[1] - task_local_storage(:SPAWNS, spawns[2]) - for r in refs - wait(r) - end -end - -macro sync(block) - quote - sync_begin() - v = $(esc(block)) - sync_end() - v - end -end - -function sync_add(r) - spawns = get(task_local_storage(), :SPAWNS, ()) - if !is(spawns,()) - push!(spawns[1], r) - end - r -end - let nextidx = 1 global chooseproc function chooseproc(thunk::Function) @@ -1297,23 +1224,6 @@ macro fetchfrom(p, expr) :(remotecall_fetch($(esc(p)), $(esc(expr)))) end -function spawnlocal(thunk) - t = Task(thunk) - sync_add(t) - enq_work(t) - t -end - -macro async(expr) - expr = localize_vars(:(()->($expr)), false) - :(spawnlocal($(esc(expr)))) -end - -macro spawnlocal(expr) - warn_once("@spawnlocal is deprecated, use @async instead.") - :(@async $(esc(expr))) -end - function at_each(f, args...) for w in PGRP.workers sync_add(remotecall(w.id, f, args...)) @@ -1531,75 +1441,6 @@ end # 2/(nc/niter) # end -## event processing, I/O and work scheduling ## - -function yield(args...) - ct = current_task() - # preserve Task.last across calls to the scheduler - prev = ct.last - v = yieldto(Scheduler, args...) - ct.last = prev - return v -end - -function pause() - @unix_only ccall(:pause, Void, ()) - @windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff) -end - -function event_loop(isclient) - iserr, lasterr, bt = false, nothing, {} - while true - try - if iserr - display_error(lasterr, bt) - println(STDERR) - iserr, lasterr, bt = false, nothing, {} - else - while true - if isempty(Workqueue) - if any_gc_flag - flush_gc_msgs() - end - c = process_events(true) - if c==0 && eventloop()!=C_NULL && isempty(Workqueue) && !any_gc_flag - # if there are no active handles and no runnable tasks, just - # wait for signals. - pause() - end - else - perform_work() - process_events(false) - end - end - end - catch err - if iserr - ccall(:jl_, Void, (Any,), ( - "\n!!!An ERROR occurred while printing the last error!!!\n", - lasterr, - bt - )) - end - iserr, lasterr = true, err - bt = catch_backtrace() - if isclient && isa(err,InterruptException) - # root task is waiting for something on client. allow C-C - # to interrupt. - interrupt_waiting_task(roottask,err) - iserr, lasterr = false, nothing - end - end - end -end - -# force a task to stop waiting with an exception -function interrupt_waiting_task(t::Task, err) - if !t.runnable - t.exception = err - enq_work(t) - end -end function check_master_connect(timeout) # If we do not have at least process 1 connect to us within timeout diff --git a/base/precompile.jl b/base/precompile.jl index bebe27e4c7e76..ff7de349d8b3d 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -7,7 +7,6 @@ precompile(start, (Dict{Any,Any},)) precompile(perform_work, ()) precompile(isempty, (Array{Any,1},)) precompile(getindex, (Dict{Any,Any}, Int32)) -precompile(event_loop, (Bool,)) precompile(_start, ()) precompile(process_options, (Array{Any,1},)) precompile(run_repl, ()) diff --git a/base/stream.jl b/base/stream.jl index 03f2432068548..7186550fd3e71 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -723,35 +723,23 @@ end write(s::TTY, p::Ptr, nb::Integer) = @uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb::Ptr{Void}) function write(s::AsyncStream, b::Uint8) - if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler - @uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void}) - uv_req_set_data(uvw,current_task()) - wait() - else - @uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb::Ptr{Void}) - end + @uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void}) + uv_req_set_data(uvw,current_task()) + wait() return 1 end function write(s::AsyncStream, c::Char) - if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler - @uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void}) - uv_req_set_data(uvw,current_task()) - wait() - else - @uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb::Ptr{Void}) - end + @uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void}) + uv_req_set_data(uvw,current_task()) + wait() return utf8sizeof(c) end function write{T}(s::AsyncStream, a::Array{T}) if isbits(T) - if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler - n = uint(length(a)*sizeof(T)) - @uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void}) - uv_req_set_data(uvw,current_task()) - wait() - else - write!(s,copy(a)) - end + n = uint(length(a)*sizeof(T)) + @uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void}) + uv_req_set_data(uvw,current_task()) + wait() return int(length(a)*sizeof(T)) else check_open(s) @@ -759,14 +747,9 @@ function write{T}(s::AsyncStream, a::Array{T}) end end function write(s::AsyncStream, p::Ptr, nb::Integer) - if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler - @uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void}) - uv_req_set_data(uvw,current_task()) - wait() - else - check_open(s) - ccall(:jl_write, Uint, (Ptr{Void},Ptr{Void},Uint), handle(s), p, uint(nb)) - end + @uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void}) + uv_req_set_data(uvw,current_task()) + wait() return int(nb) end diff --git a/base/sysimg.jl b/base/sysimg.jl index ddb907ae6db82..d99d4ee48fab0 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -103,7 +103,7 @@ include("process.jl") include("multimedia.jl") importall .Multimedia reinit_stdio() -ccall(:jl_get_uv_hooks, Void, ()) +ccall(:jl_get_uv_hooks, Void, (Cint,), 0) include("grisu.jl") import .Grisu.print_shortest include("printf.jl") diff --git a/base/task.jl b/base/task.jl index fee81419dd696..9f344b5d7a2a1 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1,8 +1,14 @@ +## basic task functions and TLS + show(io::IO, t::Task) = print(io, "Task") +macro task(ex) + :(Task(()->$(esc(ex)))) +end + current_task() = ccall(:jl_get_current_task, Any, ())::Task istaskdone(t::Task) = t.done -istaskstarted(t::Task) = isdefined(t,:parent) +istaskstarted(t::Task) = t.started # yield to a task, throwing an exception in it function throwto(t::Task, exc) @@ -33,6 +39,12 @@ end # NOTE: you can only wait for scheduled tasks function wait(t::Task) + if istaskdone(t) + if t.exception !== nothing + throw(t.exception) + end + return t.result + end if is(t.donenotify, nothing) t.donenotify = Condition() end @@ -42,6 +54,35 @@ function wait(t::Task) t.result end +function notify(t::Task, arg::ANY=nothing; error=false) + if t.runnable + Base.error("tried to resume task that is not stopped") + end + if error + t.exception = arg + else + t.result = arg + end + enq_work(t) + nothing +end +notify_error(t::Task, err) = notify(t, err, error=true) + +# runtime system hook called when a task finishes +function task_done_hook(t::Task) + if isa(t.donenotify, Condition) + if isdefined(t,:exception) && t.exception !== nothing + # TODO: maybe wrap this in a TaskExited exception + notify_error(t.donenotify, t.exception) + else + notify(t.donenotify, t.result) + end + end +end + + +## produce, consume, and task iteration + function produce(v) ct = current_task() q = ct.consumers @@ -49,9 +90,7 @@ function produce(v) # make a task waiting for us runnable again notify1(q) end - r = yieldto(ct.last, v) - ct.parent = ct.last # always exit to last consumer - r + yieldto(ct.last, v) end produce(v...) = produce(v) @@ -65,9 +104,16 @@ function consume(P::Task, values...) ct = current_task() prev = ct.last ct.runnable = false - v = yieldto(P, values...) - ct.last = prev - ct.runnable = true + local v + try + v = yieldto(P, values...) + if ct.last !== P + v = yield_until((ct)->ct.last === P) + end + finally + ct.last = prev + ct.runnable = true + end if P.done q = P.consumers if !is(q, nothing) @@ -84,17 +130,6 @@ function done(t::Task, val) end next(t::Task, val) = (t.result, nothing) -macro task(ex) - :(Task(()->$(esc(ex)))) -end - -# schedule an expression to run asynchronously, with minimal ceremony -macro schedule(expr) - expr = localize_vars(:(()->($expr)), false) - :(enq_work(Task($(esc(expr))))) -end - -schedule(t::Task) = enq_work(t) ## condition variables @@ -106,44 +141,18 @@ end function wait(c::Condition) ct = current_task() - if ct === Scheduler - error("cannot execute blocking function from scheduler") - end push!(c.waitq, ct) ct.runnable = false try - yield(c) + return yield() catch filter!(x->x!==ct, c.waitq) rethrow() end end -function wait() - ct = current_task() - if ct === Scheduler - error("cannot execute blocking function from scheduler") - end - ct.runnable = false - yield() -end - -function notify(t::Task, arg::ANY=nothing; error=false) - if t.runnable == true - Base.error("tried to resume task that is not stopped") - end - if error - t.exception = arg - else - t.result = arg - end - enq_work(t) - nothing -end -notify_error(t::Task, err) = notify(t, err, error=true) - function notify(c::Condition, arg::ANY=nothing; all=true, error=false) if all for t in c.waitq @@ -153,7 +162,7 @@ function notify(c::Condition, arg::ANY=nothing; all=true, error=false) empty!(c.waitq) elseif !isempty(c.waitq) t = shift!(c.waitq) - !error? (t.result = arg) : (t.exception = arg) + !error ? (t.result = arg) : (t.exception = arg) enq_work(t) end nothing @@ -164,13 +173,160 @@ notify1(c::Condition, arg=nothing) = notify(c, arg, all=false) notify_error(c::Condition, err) = notify(c, err, error=true) notify1_error(c::Condition, err) = notify(c, err, error=true, all=false) -function task_done_hook(t::Task) - if isa(t.donenotify, Condition) - if isdefined(t,:exception) && t.exception !== nothing - # TODO: maybe wrap this in a TaskExited exception - notify_error(t.donenotify, t.exception) - else - notify(t.donenotify, t.result) + +## work queue + +function enq_work(t::Task) + ccall(:uv_stop,Void,(Ptr{Void},),eventloop()) + unshift!(Workqueue, t) +end + +function perform_work() + perform_work(pop!(Workqueue)) +end + +function perform_work(t::Task) + ct = current_task() + if ct.runnable && t !== ct + # still runnable; ensure we will return here + enq_work(ct) + end + if !istaskstarted(t) + # starting new task + result = yieldto(t) + else + # continuing interrupted work item + arg = t.result + t.result = nothing + t.runnable = true + result = yieldto(t, arg) + end + return result +end + + +## scheduler + +# schedule an expression to run asynchronously, with minimal ceremony +macro schedule(expr) + expr = localize_vars(:(()->($expr)), false) + :(enq_work(Task($(esc(expr))))) +end + +schedule(t::Task) = enq_work(t) + +function wait() + ct = current_task() + ct.runnable = false + return yield() +end + +# yield() --- called for all blocking operations +in_scheduler = false +yield() = yield_until() +function yield_until(return_test = (t::Task)->t.runnable) + ct = current_task() + # preserve Task.last across calls to the scheduler + prev = ct.last + global in_scheduler + if in_scheduler + # we don't want to execute yield recursively, because + # the return condition would be ill-defined + warn("yielding from inside scheduler callback") + end + try + if isempty(Workqueue) && return_test(ct) + process_events(false) + if isempty(Workqueue) && return_test(ct) + return nothing + end + end + in_scheduler = true + while true + if isempty(Workqueue) + c = process_events(true) + if c==0 && eventloop()!=C_NULL && isempty(Workqueue) + # if there are no active handles and no runnable tasks, just + # wait for signals. + pause() + end + else + in_scheduler = false + result = perform_work() + in_scheduler = true + process_events(false) + if return_test(ct) + return result + end + end end + finally + in_scheduler = false + ct.last = prev + end + assert(false) +end + +function pause() + @unix_only ccall(:pause, Void, ()) + @windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff) +end + +# force a task to stop waiting with an exception +function interrupt_waiting_task(t::Task, err) + if !t.runnable + t.exception = err + enq_work(t) + end +end + + +## dynamically-scoped waiting for multiple items + +sync_begin() = task_local_storage(:SPAWNS, ({}, get(task_local_storage(), :SPAWNS, ()))) + +function sync_end() + spawns = get(task_local_storage(), :SPAWNS, ()) + if is(spawns,()) + error("sync_end() without sync_begin()") + end + refs = spawns[1] + task_local_storage(:SPAWNS, spawns[2]) + for r in refs + wait(r) + end +end + +macro sync(block) + quote + sync_begin() + v = $(esc(block)) + sync_end() + v end end + +function sync_add(r) + spawns = get(task_local_storage(), :SPAWNS, ()) + if !is(spawns,()) + push!(spawns[1], r) + end + r +end + +function async_run_thunk(thunk) + t = Task(thunk) + sync_add(t) + enq_work(t) + t +end + +macro async(expr) + expr = localize_vars(:(()->($expr)), false) + :(async_run_thunk($(esc(expr)))) +end + +macro spawnlocal(expr) + warn_once("@spawnlocal is deprecated, use @async instead.") + :(@async $(esc(expr))) +end diff --git a/src/builtins.c b/src/builtins.c index acacc793e0f71..7557b3ef1d6c0 100644 --- a/src/builtins.c +++ b/src/builtins.c @@ -373,17 +373,20 @@ JL_CALLABLE(jl_f_top_eval) return v; } jl_module_t *last_m = jl_current_module; + jl_module_t *task_last_m = jl_current_task->current_module; JL_TRY { - jl_current_module = m; + jl_current_task->current_module = jl_current_module = m; v = jl_toplevel_eval(ex); } JL_CATCH { jl_lineno = last_lineno; jl_current_module = last_m; + jl_current_task->current_module = task_last_m; jl_rethrow(); } jl_lineno = last_lineno; jl_current_module = last_m; + jl_current_task->current_module = task_last_m; assert(v); return v; } diff --git a/src/dump.c b/src/dump.c index e77dee9dec3e0..04b591ba944ef 100644 --- a/src/dump.c +++ b/src/dump.c @@ -985,7 +985,7 @@ extern jl_function_t *jl_typeinf_func; extern int jl_boot_file_loaded; extern void jl_get_builtin_hooks(void); extern void jl_get_system_hooks(void); -extern void jl_get_uv_hooks(void); +extern void jl_get_uv_hooks(int); DLLEXPORT void jl_restore_system_image(char *fname) @@ -1047,7 +1047,7 @@ void jl_restore_system_image(char *fname) jl_get_builtin_hooks(); jl_get_system_hooks(); - jl_get_uv_hooks(); + jl_get_uv_hooks(1); jl_boot_file_loaded = 1; jl_typeinf_func = (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("typeinf_ext")); diff --git a/src/gc.c b/src/gc.c index 72852e7dd3ec3..2ac17d527c01c 100644 --- a/src/gc.c +++ b/src/gc.c @@ -639,7 +639,7 @@ static void gc_mark_module(jl_module_t *m, int d) static void gc_mark_task(jl_task_t *ta, int d) { if (ta->parent) gc_push_root(ta->parent, d); - gc_push_root(ta->last, d); + if (ta->last) gc_push_root(ta->last, d); gc_push_root(ta->tls, d); gc_push_root(ta->consumers, d); gc_push_root(ta->donenotify, d); diff --git a/src/init.c b/src/init.c index e5704f34641af..50a8dd04e2fe6 100644 --- a/src/init.c +++ b/src/init.c @@ -745,6 +745,7 @@ void julia_init(char *imageFile) // eval() uses Main by default, so Main.eval === Core.eval jl_module_import(jl_main_module, jl_core_module, jl_symbol("eval")); jl_current_module = jl_main_module; + jl_root_task->current_module = jl_current_module; #ifndef _OS_WINDOWS_ diff --git a/src/interpreter.c b/src/interpreter.c index fd53de907e78b..64ab27bb87052 100644 --- a/src/interpreter.c +++ b/src/interpreter.c @@ -32,15 +32,18 @@ jl_value_t *jl_interpret_toplevel_expr_in(jl_module_t *m, jl_value_t *e, { jl_value_t *v=NULL; jl_module_t *last_m = jl_current_module; + jl_module_t *task_last_m = jl_current_task->current_module; JL_TRY { - jl_current_module = m; + jl_current_task->current_module = jl_current_module = m; v = eval(e, locals, nl); } JL_CATCH { jl_current_module = last_m; + jl_current_task->current_module = task_last_m; jl_rethrow(); } jl_current_module = last_m; + jl_current_task->current_module = task_last_m; assert(v); return v; } diff --git a/src/jl_uv.c b/src/jl_uv.c index 8553772dd9235..b624608bd314e 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -73,9 +73,9 @@ enum CALLBACK_TYPE { CB_PTR, CB_INT32, CB_INT64 }; #define XX(hook) static jl_function_t *JULIA_HOOK(hook) = 0; JL_CB_TYPES(XX) #undef XX -DLLEXPORT void jl_get_uv_hooks() +DLLEXPORT void jl_get_uv_hooks(int force) { - if (JULIA_HOOK(close)) return; // only do this once + if (!force && JULIA_HOOK(close)) return; // only do this once #define XX(hook) JULIA_HOOK(hook) = JULIA_HOOK_(jl_base_module, hook); JL_CB_TYPES(XX) #undef XX diff --git a/src/julia.h b/src/julia.h index 12643871afbc2..56f14d5c2328a 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1086,6 +1086,7 @@ typedef struct _jl_task_t { struct _jl_task_t *last; jl_value_t *tls; jl_value_t *consumers; + int8_t started; int8_t done; int8_t runnable; jl_value_t *result; @@ -1106,7 +1107,7 @@ typedef struct _jl_task_t { jl_handler_t *eh; // saved gc stack top for context switches jl_gcframe_t *gcstack; - // saved current module + // current module, or NULL if this task has not set one jl_module_t *current_module; } jl_task_t; diff --git a/src/task.c b/src/task.c index a59b87a45b29a..d5924cc205b96 100644 --- a/src/task.c +++ b/src/task.c @@ -238,16 +238,18 @@ static void ctx_switch(jl_task_t *t, jl_jmp_buf *where) jl_current_task->gcstack = jl_pgcstack; jl_pgcstack = t->gcstack; #endif - jl_current_task->current_module = jl_current_module; - t->last = jl_current_task; - // by default, parent is first task to switch to this one - if (t->parent == NULL) { - t->parent = jl_current_task; - t->current_module = jl_current_module; + + // restore task's current module, looking at parent tasks + // if it hasn't set one. + jl_task_t *last = t; + while (last->current_module == NULL && last != jl_root_task) { + last = last->parent; } - else { - jl_current_module = t->current_module; + if (last->current_module != NULL) { + jl_current_module = last->current_module; } + + t->last = jl_current_task; jl_current_task = t; #ifdef COPY_STACKS @@ -265,6 +267,8 @@ static jl_value_t *switchto(jl_task_t *t) { if (t->done) { jl_task_arg_in_transit = (jl_value_t*)jl_null; + if (t->exception != jl_nothing) + jl_throw(t->exception); return t->result; } if (jl_in_gc) { @@ -391,6 +395,16 @@ static void finish_task(jl_task_t *t, jl_value_t *resultval) jl_apply(task_done_hook_func, (jl_value_t**)&t, 1); } } + jl_task_t *cont = jl_current_task->last; + if (!cont->done) { + jl_switchto(cont, t->result); + } + else { + jl_function_t* yield_f = + (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("yield")); + (void)jl_apply(yield_f,NULL,0); + } + assert(0); } static void start_task(jl_task_t *t) @@ -399,6 +413,8 @@ static void start_task(jl_task_t *t) jl_value_t *arg = jl_task_arg_in_transit; jl_value_t *res; JL_GC_PUSH1(&arg); + assert(!t->started); + t->started = 1; #ifdef COPY_STACKS ptrint_t local_sp = (ptrint_t)jl_pgcstack; @@ -425,11 +441,6 @@ static void start_task(jl_task_t *t) } JL_GC_POP(); finish_task(t, res); - jl_task_t *cont = t->parent; - // if parent task has exited, try its parent, and so on - while (cont->done) - cont = cont->parent; - jl_switchto(cont, t->result); assert(0); } @@ -742,15 +753,9 @@ void NORETURN throw_internal(jl_value_t *e) JL_PRINTF(JL_STDERR, "\n"); exit(1); } - jl_task_t *cont = jl_current_task->parent; - while (cont->done || cont->eh == NULL) - cont = cont->parent; - // for now, exit the task jl_current_task->exception = e; finish_task(jl_current_task, e); - jl_current_task->exception = jl_nothing; - ctx_switch(cont, &cont->eh->eh_ctx); - // TODO: continued exception + assert(0); } jl_exit(1); } @@ -785,10 +790,12 @@ jl_task_t *jl_new_task(jl_function_t *start, size_t ssize) t->type = (jl_value_t*)jl_task_type; ssize = LLT_ALIGN(ssize, pagesz); t->ssize = ssize; - t->parent = NULL; - t->last = jl_current_task; + t->current_module = NULL; + t->parent = jl_current_task; + t->last = NULL; t->tls = jl_nothing; t->consumers = jl_nothing; + t->started = 0; t->done = 0; t->runnable = 1; t->start = start; @@ -886,21 +893,22 @@ void jl_init_tasks(void *stack, size_t ssize) jl_task_type = jl_new_datatype(jl_symbol("Task"), jl_any_type, jl_null, - jl_tuple(10, + jl_tuple(11, jl_symbol("parent"), jl_symbol("last"), jl_symbol("storage"), jl_symbol("consumers"), + jl_symbol("started"), jl_symbol("done"), jl_symbol("runnable"), jl_symbol("result"), jl_symbol("donenotify"), jl_symbol("exception"), jl_symbol("code")), - jl_tuple(10, + jl_tuple(11, jl_any_type, jl_any_type, jl_any_type, jl_any_type, - jl_bool_type, jl_bool_type, + jl_bool_type, jl_bool_type, jl_bool_type, jl_any_type, jl_any_type, jl_any_type, jl_function_type), 0, 1); @@ -923,6 +931,7 @@ void jl_init_tasks(void *stack, size_t ssize) jl_current_task->last = jl_current_task; jl_current_task->tls = NULL; jl_current_task->consumers = NULL; + jl_current_task->started = 1; jl_current_task->done = 0; jl_current_task->runnable = 1; jl_current_task->start = NULL; diff --git a/src/toplevel.c b/src/toplevel.c index b87d6d06010cd..87bd9ebb3e077 100644 --- a/src/toplevel.c +++ b/src/toplevel.c @@ -33,6 +33,8 @@ void jl_add_standard_imports(jl_module_t *m) jl_symbol("Operators"))); } +extern void jl_get_system_hooks(void); +extern void jl_get_uv_hooks(int); extern int base_module_conflict; jl_value_t *jl_eval_module_expr(jl_expr_t *ex) { @@ -55,11 +57,13 @@ jl_value_t *jl_eval_module_expr(jl_expr_t *ex) jl_module_t *newm = jl_new_module(name); newm->parent = parent_module; b->value = (jl_value_t*)newm; + int newbase = 0; if (parent_module == jl_main_module && name == jl_symbol("Base")) { base_module_conflict = (jl_base_module != NULL); jl_old_base_module = jl_base_module; // pick up Base module during bootstrap jl_base_module = newm; + newbase = 1; } // export all modules from Main if (parent_module == jl_main_module) @@ -73,7 +77,8 @@ jl_value_t *jl_eval_module_expr(jl_expr_t *ex) } JL_GC_PUSH1(&last_module); - jl_current_module = newm; + jl_module_t *task_last_m = jl_current_task->current_module; + jl_current_task->current_module = jl_current_module = newm; jl_array_t *exprs = ((jl_expr_t*)jl_exprarg(ex, 2))->args; JL_TRY { @@ -85,10 +90,21 @@ jl_value_t *jl_eval_module_expr(jl_expr_t *ex) } JL_CATCH { jl_current_module = last_module; + jl_current_task->current_module = task_last_m; jl_rethrow(); } JL_GC_POP(); jl_current_module = last_module; + jl_current_task->current_module = task_last_m; + + if (newbase) { + // reinitialize global variables + // to pick up new types from Base + jl_errorexception_type = NULL; + jl_get_system_hooks(); + jl_get_uv_hooks(1); + jl_current_task->tls = jl_nothing; + } #if 0 // some optional post-processing steps diff --git a/test/.gitignore b/test/.gitignore index 00e869a2da043..e382bb3b9d25c 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -1 +1,2 @@ /ccall +/ccalltest