diff --git a/base/precompile.jl b/base/precompile.jl index 6b866750ae694..76c88bcae5ff3 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -14,7 +14,7 @@ precompile(Dict{Any,Any}, (Int,)) precompile(Set, ()) precompile(setindex!, (Dict{Any,Any}, Bool, Cmd)) precompile(rehash, (Dict{Any,Any}, Int)) -precompile(wait, (Int32,)) +precompile(wait, ()) precompile(systemerror, (Symbol, Bool)) precompile(SystemError, (ASCIIString,)) precompile(has, (EnvHash, ASCIIString)) @@ -30,7 +30,6 @@ precompile(open, (ASCIIString, Bool, Bool, Bool, Bool)) precompile(done, (IntSet, Int64)) precompile(next, (IntSet, Int64)) precompile(ht_keyindex, (Dict{Any,Any}, Int32)) -precompile(perform_work, (Task,)) precompile(notify_full, (RemoteValue,)) precompile(notify_empty, (RemoteValue,)) precompile(work_result, (RemoteValue,)) diff --git a/base/serialize.jl b/base/serialize.jl index cd7cdf5fe1cf7..a9852d0515e02 100644 --- a/base/serialize.jl +++ b/base/serialize.jl @@ -235,14 +235,13 @@ function serialize(s, linfo::LambdaStaticData) end function serialize(s, t::Task) - if istaskstarted(t) && !t.done + if istaskstarted(t) && !istaskdone(t) error("cannot serialize a running Task") end writetag(s, Task) serialize(s, t.code) serialize(s, t.storage) - serialize(s, t.done) - serialize(s, t.runnable) + serialize(s, t.state == :queued || t.state == :waiting ? (:runnable) : t.state) serialize(s, t.result) serialize(s, t.exception) end @@ -466,8 +465,7 @@ deserialize{T}(s, ::Type{Ptr{T}}) = pointer(T, 0) function deserialize(s, ::Type{Task}) t = Task(deserialize(s)) t.storage = deserialize(s) - t.done = deserialize(s) - t.runnable = deserialize(s) + t.state = deserialize(s) t.result = deserialize(s) t.exception = deserialize(s) t diff --git a/base/task.jl b/base/task.jl index 1fc64adf2c5d3..2f596c0234d54 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1,14 +1,14 @@ ## basic task functions and TLS -show(io::IO, t::Task) = print(io, "Task @0x$(hex(unsigned(pointer_from_objref(t)), WORD_SIZE>>2))") +show(io::IO, t::Task) = print(io, "Task ($(t.state)) @0x$(hex(unsigned(pointer_from_objref(t)), WORD_SIZE>>2))") macro task(ex) :(Task(()->$(esc(ex)))) end current_task() = ccall(:jl_get_current_task, Any, ())::Task -istaskdone(t::Task) = t.done -istaskstarted(t::Task) = t.started +istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed)) +istaskstarted(t::Task) = isdefined(t, :last) # yield to a task, throwing an exception in it function throwto(t::Task, exc) @@ -39,26 +39,24 @@ end # NOTE: you can only wait for scheduled tasks function wait(t::Task) - if istaskdone(t) - if t.exception !== nothing - throw(t.exception) + if !istaskdone(t) + if is(t.donenotify, nothing) + t.donenotify = Condition() end - return t.result - end - if is(t.donenotify, nothing) - t.donenotify = Condition() end while !istaskdone(t) wait(t.donenotify) end - t.result + if t.state == :failed + throw(t.exception) + end + return t.result end # runtime system hook called when a task finishes function task_done_hook(t::Task) - err = isdefined(t,:exception) && t.exception !== nothing - result = err ? t.exception : t.result - + err = (t.state == :failed) + result = t.result nexttask = t.last q = t.consumers @@ -70,7 +68,7 @@ function task_done_hook(t::Task) isa(t.donenotify,Condition) && notify(t.donenotify, result, error=err) - if nexttask.runnable + if nexttask.state == :runnable if err nexttask.exception = t.exception end @@ -91,7 +89,7 @@ end produce(v...) = produce(v) function consume(P::Task, values...) - if P.done + if istaskdone(P) return wait(P) end if P.consumers === nothing @@ -103,7 +101,7 @@ function consume(P::Task, values...) push!(P.consumers.waitq, ct) ct.result = length(values)==1 ? values[1] : values - if P.runnable + if P.state == :runnable return yieldto(P) else return wait() @@ -129,6 +127,7 @@ end function wait(c::Condition) ct = current_task() + ct.state = :waiting push!(c.waitq, ct) try @@ -158,31 +157,15 @@ notify_error(c::Condition, err) = notify(c, err, error=true) notify1_error(c::Condition, err) = notify(c, err, error=true, all=false) -## work queue +## scheduler and work queue function enq_work(t::Task) ccall(:uv_stop,Void,(Ptr{Void},),eventloop()) push!(Workqueue, t) + t.state = :queued t end -function perform_work(t::Task) - if !istaskstarted(t) - # starting new task - result = yieldto(t) - else - # continuing interrupted work item - arg = t.result - t.result = nothing - t.runnable = true - result = yieldto(t, arg) - end - return result -end - - -## scheduler - # schedule an expression to run asynchronously, with minimal ceremony macro schedule(expr) expr = localize_vars(:(()->($expr)), false) @@ -205,7 +188,6 @@ yield() = (enq_work(current_task()); wait()) function wait() ct = current_task() - ct.runnable = false while true if isempty(Workqueue) c = process_events(true) @@ -215,7 +197,11 @@ function wait() pause() end else - result = perform_work(shift!(Workqueue)) + t = shift!(Workqueue) + arg = t.result + t.result = nothing + t.state = :runnable + result = yieldto(t, arg) process_events(false) # return when we come out of the queue return result diff --git a/doc/manual/control-flow.rst b/doc/manual/control-flow.rst index f43c59a6b17db..6df24630ac430 100644 --- a/doc/manual/control-flow.rst +++ b/doc/manual/control-flow.rst @@ -894,3 +894,19 @@ possible to make the scheduler run a task whenever it can, without necessarily waiting for any events. This is done by calling ``schedule(task)``, or using the ``@schedule`` or ``@async`` macros (see :ref:`man-parallel-computing` for more details). + +Task states +~~~~~~~~~~~ + +Tasks have a ``state`` field that describes their execution status. A task +state is one of the following symbols: + +============= ================================================== +Symbol Meaning +============= ================================================== +``:runnable`` Currently running, or available to be switched to +``:waiting`` Blocked waiting for a specific event +``:queued`` In the scheduler's run queue about to be restarted +``:done`` Successfully finished executing +``:failed`` Finished with an uncaught exception +============= ================================================== diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index 6b9f3f0eb0844..511d2f4cfb8fa 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -5188,11 +5188,11 @@ Tasks .. function:: Task(func) - Create a ``Task`` (i.e. thread, or coroutine) to execute the given function. The task exits when this function returns. + Create a ``Task`` (i.e. thread, or coroutine) to execute the given function (which must be callable with no arguments). The task exits when this function returns. .. function:: yieldto(task, args...) - Switch to the given task. The first time a task is switched to, the task's function is called with ``args``. On subsequent switches, ``args`` are returned from the task's last call to ``yieldto``. + Switch to the given task. The first time a task is switched to, the task's function is called with no arguments. On subsequent switches, ``args`` are returned from the task's last call to ``yieldto``. .. function:: current_task() diff --git a/src/julia.h b/src/julia.h index 56f14d5c2328a..e56dc5e85b312 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1085,12 +1085,10 @@ typedef struct _jl_task_t { struct _jl_task_t *parent; struct _jl_task_t *last; jl_value_t *tls; + jl_sym_t *state; jl_value_t *consumers; - int8_t started; - int8_t done; - int8_t runnable; - jl_value_t *result; jl_value_t *donenotify; + jl_value_t *result; jl_value_t *exception; jl_function_t *start; jl_jmp_buf ctx; diff --git a/src/task.c b/src/task.c index 41574f841568a..c3aa434b4596a 100644 --- a/src/task.c +++ b/src/task.c @@ -135,12 +135,15 @@ static void _probe_arch(void) - stack growth */ +static jl_sym_t *done_sym; +static jl_sym_t *failed_sym; +static jl_sym_t *runnable_sym; + extern size_t jl_page_size; jl_datatype_t *jl_task_type; DLLEXPORT jl_task_t * volatile jl_current_task; jl_task_t *jl_root_task; jl_value_t * volatile jl_task_arg_in_transit; -static volatile int n_args_in_transit; jl_value_t *jl_exception_in_transit; #ifdef JL_GC_MARKSWEEP jl_gcframe_t *jl_pgcstack = NULL; @@ -153,7 +156,7 @@ jl_jmp_buf * volatile jl_jmp_target; static void save_stack(jl_task_t *t) { - if (t->done) + if (t->state == done_sym || t->state == failed_sym) return; volatile int _x; size_t nb = (char*)t->stackbase - (char*)&_x; @@ -265,7 +268,7 @@ static void ctx_switch(jl_task_t *t, jl_jmp_buf *where) extern int jl_in_gc; static jl_value_t *switchto(jl_task_t *t) { - if (t->done) { + if (t->state == done_sym || t->state == failed_sym) { jl_task_arg_in_transit = (jl_value_t*)jl_null; if (t->exception != jl_nothing) jl_throw(t->exception); @@ -370,7 +373,6 @@ static void rebase_state(jl_jmp_buf *ctx, intptr_t local_sp, intptr_t new_sp) jl_value_t *jl_switchto(jl_task_t *t, jl_value_t *arg) { jl_task_arg_in_transit = arg; - n_args_in_transit = 1; return switchto(t); } @@ -378,9 +380,10 @@ static jl_function_t *task_done_hook_func=NULL; static void finish_task(jl_task_t *t, jl_value_t *resultval) { - assert(t->done==0); - t->done = 1; - t->runnable = 0; + if (t->exception != jl_nothing) + t->state = failed_sym; + else + t->state = done_sym; t->result = resultval; // TODO: early free of t->stkbuf #ifdef COPY_STACKS @@ -402,8 +405,6 @@ static void start_task(jl_task_t *t) jl_value_t *arg = jl_task_arg_in_transit; jl_value_t *res; JL_GC_PUSH1(&arg); - assert(!t->started); - t->started = 1; #ifdef COPY_STACKS ptrint_t local_sp = (ptrint_t)jl_pgcstack; @@ -417,17 +418,7 @@ static void start_task(jl_task_t *t) switch_stack(jl_current_task, jl_jmp_target); } #endif - if (n_args_in_transit == 0) { - res = jl_apply(t->start, NULL, 0); - } - else if (n_args_in_transit == 1) { - res = jl_apply(t->start, &arg, 1); - } - else { - assert(jl_is_tuple(jl_task_arg_in_transit)); - res = jl_apply(t->start, &jl_tupleref(jl_task_arg_in_transit,0), - n_args_in_transit); - } + res = jl_apply(t->start, NULL, 0); JL_GC_POP(); finish_task(t, res); assert(0); @@ -784,9 +775,7 @@ jl_task_t *jl_new_task(jl_function_t *start, size_t ssize) t->last = NULL; t->tls = jl_nothing; t->consumers = jl_nothing; - t->started = 0; - t->done = 0; - t->runnable = 1; + t->state = runnable_sym; t->start = start; t->result = jl_nothing; t->donenotify = jl_nothing; @@ -856,12 +845,11 @@ JL_CALLABLE(jl_f_yieldto) { JL_NARGSV(yieldto, 1); JL_TYPECHK(yieldto, task, args[0]); - n_args_in_transit = nargs-1; if (nargs == 2) { jl_task_arg_in_transit = args[1]; } else if (nargs > 2) { - jl_task_arg_in_transit = jl_f_tuple(NULL, &args[1], n_args_in_transit); + jl_task_arg_in_transit = jl_f_tuple(NULL, &args[1], nargs-1); } else { jl_task_arg_in_transit = (jl_value_t*)jl_null; @@ -882,28 +870,29 @@ void jl_init_tasks(void *stack, size_t ssize) jl_task_type = jl_new_datatype(jl_symbol("Task"), jl_any_type, jl_null, - jl_tuple(11, + jl_tuple(9, jl_symbol("parent"), jl_symbol("last"), jl_symbol("storage"), + jl_symbol("state"), jl_symbol("consumers"), - jl_symbol("started"), - jl_symbol("done"), - jl_symbol("runnable"), - jl_symbol("result"), jl_symbol("donenotify"), + jl_symbol("result"), jl_symbol("exception"), jl_symbol("code")), - jl_tuple(11, + jl_tuple(9, jl_any_type, jl_any_type, + jl_any_type, jl_sym_type, jl_any_type, jl_any_type, - jl_bool_type, jl_bool_type, jl_bool_type, - jl_any_type, jl_any_type, - jl_any_type, jl_function_type), + jl_any_type, jl_any_type, jl_function_type), 0, 1); jl_tupleset(jl_task_type->types, 0, (jl_value_t*)jl_task_type); jl_task_type->fptr = jl_f_task; + done_sym = jl_symbol("done"); + failed_sym = jl_symbol("failed"); + runnable_sym = jl_symbol("runnable"); + jl_current_task = (jl_task_t*)allocobj(sizeof(jl_task_t)); jl_current_task->type = (jl_value_t*)jl_task_type; #ifdef COPY_STACKS @@ -920,9 +909,7 @@ void jl_init_tasks(void *stack, size_t ssize) jl_current_task->last = jl_current_task; jl_current_task->tls = NULL; jl_current_task->consumers = NULL; - jl_current_task->started = 1; - jl_current_task->done = 0; - jl_current_task->runnable = 1; + jl_current_task->state = runnable_sym; jl_current_task->start = NULL; jl_current_task->result = NULL; jl_current_task->donenotify = NULL;