Skip to content

Commit

Permalink
implement more descriptive task states. fixes #5736
Browse files Browse the repository at this point in the history
fortunately, we were already documenting that ``Task`` accepts a
0-argument function. now we take full advantage of this to simplify
the scheduler by dropping the behavior of passing arguments on the
first call to ``yieldto``.
  • Loading branch information
JeffBezanson committed Feb 10, 2014
1 parent a8caf23 commit b7af701
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 87 deletions.
3 changes: 1 addition & 2 deletions base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ precompile(Dict{Any,Any}, (Int,))
precompile(Set, ())
precompile(setindex!, (Dict{Any,Any}, Bool, Cmd))
precompile(rehash, (Dict{Any,Any}, Int))
precompile(wait, (Int32,))
precompile(wait, ())
precompile(systemerror, (Symbol, Bool))
precompile(SystemError, (ASCIIString,))
precompile(has, (EnvHash, ASCIIString))
Expand All @@ -30,7 +30,6 @@ precompile(open, (ASCIIString, Bool, Bool, Bool, Bool))
precompile(done, (IntSet, Int64))
precompile(next, (IntSet, Int64))
precompile(ht_keyindex, (Dict{Any,Any}, Int32))
precompile(perform_work, (Task,))
precompile(notify_full, (RemoteValue,))
precompile(notify_empty, (RemoteValue,))
precompile(work_result, (RemoteValue,))
Expand Down
8 changes: 3 additions & 5 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,13 @@ function serialize(s, linfo::LambdaStaticData)
end

function serialize(s, t::Task)
if istaskstarted(t) && !t.done
if istaskstarted(t) && !istaskdone(t)
error("cannot serialize a running Task")
end
writetag(s, Task)
serialize(s, t.code)
serialize(s, t.storage)
serialize(s, t.done)
serialize(s, t.runnable)
serialize(s, t.state == :queued || t.state == :waiting ? (:runnable) : t.state)
serialize(s, t.result)
serialize(s, t.exception)
end
Expand Down Expand Up @@ -466,8 +465,7 @@ deserialize{T}(s, ::Type{Ptr{T}}) = pointer(T, 0)
function deserialize(s, ::Type{Task})
t = Task(deserialize(s))
t.storage = deserialize(s)
t.done = deserialize(s)
t.runnable = deserialize(s)
t.state = deserialize(s)
t.result = deserialize(s)
t.exception = deserialize(s)
t
Expand Down
60 changes: 23 additions & 37 deletions base/task.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
## basic task functions and TLS

show(io::IO, t::Task) = print(io, "Task @0x$(hex(unsigned(pointer_from_objref(t)), WORD_SIZE>>2))")
show(io::IO, t::Task) = print(io, "Task ($(t.state)) @0x$(hex(unsigned(pointer_from_objref(t)), WORD_SIZE>>2))")

macro task(ex)
:(Task(()->$(esc(ex))))
end

current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = t.done
istaskstarted(t::Task) = t.started
istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed))
istaskstarted(t::Task) = isdefined(t, :last)

# yield to a task, throwing an exception in it
function throwto(t::Task, exc)
Expand Down Expand Up @@ -39,26 +39,24 @@ end

# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
if istaskdone(t)
if t.exception !== nothing
throw(t.exception)
if !istaskdone(t)
if is(t.donenotify, nothing)
t.donenotify = Condition()
end
return t.result
end
if is(t.donenotify, nothing)
t.donenotify = Condition()
end
while !istaskdone(t)
wait(t.donenotify)
end
t.result
if t.state == :failed
throw(t.exception)
end
return t.result
end

# runtime system hook called when a task finishes
function task_done_hook(t::Task)
err = isdefined(t,:exception) && t.exception !== nothing
result = err ? t.exception : t.result

err = (t.state == :failed)
result = t.result
nexttask = t.last

q = t.consumers
Expand All @@ -70,7 +68,7 @@ function task_done_hook(t::Task)

isa(t.donenotify,Condition) && notify(t.donenotify, result, error=err)

if nexttask.runnable
if nexttask.state == :runnable
if err
nexttask.exception = t.exception
end
Expand All @@ -91,7 +89,7 @@ end
produce(v...) = produce(v)

function consume(P::Task, values...)
if P.done
if istaskdone(P)
return wait(P)
end
if P.consumers === nothing
Expand All @@ -103,7 +101,7 @@ function consume(P::Task, values...)
push!(P.consumers.waitq, ct)
ct.result = length(values)==1 ? values[1] : values

if P.runnable
if P.state == :runnable
return yieldto(P)
else
return wait()
Expand All @@ -129,6 +127,7 @@ end
function wait(c::Condition)
ct = current_task()

ct.state = :waiting
push!(c.waitq, ct)

try
Expand Down Expand Up @@ -158,31 +157,15 @@ notify_error(c::Condition, err) = notify(c, err, error=true)
notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)


## work queue
## scheduler and work queue

function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
push!(Workqueue, t)
t.state = :queued
t
end

function perform_work(t::Task)
if !istaskstarted(t)
# starting new task
result = yieldto(t)
else
# continuing interrupted work item
arg = t.result
t.result = nothing
t.runnable = true
result = yieldto(t, arg)
end
return result
end


## scheduler

# schedule an expression to run asynchronously, with minimal ceremony
macro schedule(expr)
expr = localize_vars(:(()->($expr)), false)
Expand All @@ -205,7 +188,6 @@ yield() = (enq_work(current_task()); wait())

function wait()
ct = current_task()
ct.runnable = false
while true
if isempty(Workqueue)
c = process_events(true)
Expand All @@ -215,7 +197,11 @@ function wait()
pause()
end
else
result = perform_work(shift!(Workqueue))
t = shift!(Workqueue)
arg = t.result
t.result = nothing
t.state = :runnable
result = yieldto(t, arg)
process_events(false)
# return when we come out of the queue
return result
Expand Down
16 changes: 16 additions & 0 deletions doc/manual/control-flow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -894,3 +894,19 @@ possible to make the scheduler run a task whenever it can, without necessarily
waiting for any events. This is done by calling ``schedule(task)``, or using
the ``@schedule`` or ``@async`` macros (see :ref:`man-parallel-computing` for
more details).

Task states
~~~~~~~~~~~

Tasks have a ``state`` field that describes their execution status. A task
state is one of the following symbols:

============= ==================================================
Symbol Meaning
============= ==================================================
``:runnable`` Currently running, or available to be switched to
``:waiting`` Blocked waiting for a specific event
``:queued`` In the scheduler's run queue about to be restarted
``:done`` Successfully finished executing
``:failed`` Finished with an uncaught exception
============= ==================================================
4 changes: 2 additions & 2 deletions doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5188,11 +5188,11 @@ Tasks

.. function:: Task(func)

Create a ``Task`` (i.e. thread, or coroutine) to execute the given function. The task exits when this function returns.
Create a ``Task`` (i.e. thread, or coroutine) to execute the given function (which must be callable with no arguments). The task exits when this function returns.

.. function:: yieldto(task, args...)

Switch to the given task. The first time a task is switched to, the task's function is called with ``args``. On subsequent switches, ``args`` are returned from the task's last call to ``yieldto``.
Switch to the given task. The first time a task is switched to, the task's function is called with no arguments. On subsequent switches, ``args`` are returned from the task's last call to ``yieldto``.

.. function:: current_task()

Expand Down
6 changes: 2 additions & 4 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1085,12 +1085,10 @@ typedef struct _jl_task_t {
struct _jl_task_t *parent;
struct _jl_task_t *last;
jl_value_t *tls;
jl_sym_t *state;
jl_value_t *consumers;
int8_t started;
int8_t done;
int8_t runnable;
jl_value_t *result;
jl_value_t *donenotify;
jl_value_t *result;
jl_value_t *exception;
jl_function_t *start;
jl_jmp_buf ctx;
Expand Down
61 changes: 24 additions & 37 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,15 @@ static void _probe_arch(void)
- stack growth
*/

static jl_sym_t *done_sym;
static jl_sym_t *failed_sym;
static jl_sym_t *runnable_sym;

extern size_t jl_page_size;
jl_datatype_t *jl_task_type;
DLLEXPORT jl_task_t * volatile jl_current_task;
jl_task_t *jl_root_task;
jl_value_t * volatile jl_task_arg_in_transit;
static volatile int n_args_in_transit;
jl_value_t *jl_exception_in_transit;
#ifdef JL_GC_MARKSWEEP
jl_gcframe_t *jl_pgcstack = NULL;
Expand All @@ -153,7 +156,7 @@ jl_jmp_buf * volatile jl_jmp_target;

static void save_stack(jl_task_t *t)
{
if (t->done)
if (t->state == done_sym || t->state == failed_sym)
return;
volatile int _x;
size_t nb = (char*)t->stackbase - (char*)&_x;
Expand Down Expand Up @@ -265,7 +268,7 @@ static void ctx_switch(jl_task_t *t, jl_jmp_buf *where)
extern int jl_in_gc;
static jl_value_t *switchto(jl_task_t *t)
{
if (t->done) {
if (t->state == done_sym || t->state == failed_sym) {
jl_task_arg_in_transit = (jl_value_t*)jl_null;
if (t->exception != jl_nothing)
jl_throw(t->exception);
Expand Down Expand Up @@ -370,17 +373,17 @@ static void rebase_state(jl_jmp_buf *ctx, intptr_t local_sp, intptr_t new_sp)
jl_value_t *jl_switchto(jl_task_t *t, jl_value_t *arg)
{
jl_task_arg_in_transit = arg;
n_args_in_transit = 1;
return switchto(t);
}

static jl_function_t *task_done_hook_func=NULL;

static void finish_task(jl_task_t *t, jl_value_t *resultval)
{
assert(t->done==0);
t->done = 1;
t->runnable = 0;
if (t->exception != jl_nothing)
t->state = failed_sym;
else
t->state = done_sym;
t->result = resultval;
// TODO: early free of t->stkbuf
#ifdef COPY_STACKS
Expand All @@ -402,8 +405,6 @@ static void start_task(jl_task_t *t)
jl_value_t *arg = jl_task_arg_in_transit;
jl_value_t *res;
JL_GC_PUSH1(&arg);
assert(!t->started);
t->started = 1;

#ifdef COPY_STACKS
ptrint_t local_sp = (ptrint_t)jl_pgcstack;
Expand All @@ -417,17 +418,7 @@ static void start_task(jl_task_t *t)
switch_stack(jl_current_task, jl_jmp_target);
}
#endif
if (n_args_in_transit == 0) {
res = jl_apply(t->start, NULL, 0);
}
else if (n_args_in_transit == 1) {
res = jl_apply(t->start, &arg, 1);
}
else {
assert(jl_is_tuple(jl_task_arg_in_transit));
res = jl_apply(t->start, &jl_tupleref(jl_task_arg_in_transit,0),
n_args_in_transit);
}
res = jl_apply(t->start, NULL, 0);
JL_GC_POP();
finish_task(t, res);
assert(0);
Expand Down Expand Up @@ -784,9 +775,7 @@ jl_task_t *jl_new_task(jl_function_t *start, size_t ssize)
t->last = NULL;
t->tls = jl_nothing;
t->consumers = jl_nothing;
t->started = 0;
t->done = 0;
t->runnable = 1;
t->state = runnable_sym;
t->start = start;
t->result = jl_nothing;
t->donenotify = jl_nothing;
Expand Down Expand Up @@ -856,12 +845,11 @@ JL_CALLABLE(jl_f_yieldto)
{
JL_NARGSV(yieldto, 1);
JL_TYPECHK(yieldto, task, args[0]);
n_args_in_transit = nargs-1;
if (nargs == 2) {
jl_task_arg_in_transit = args[1];
}
else if (nargs > 2) {
jl_task_arg_in_transit = jl_f_tuple(NULL, &args[1], n_args_in_transit);
jl_task_arg_in_transit = jl_f_tuple(NULL, &args[1], nargs-1);
}
else {
jl_task_arg_in_transit = (jl_value_t*)jl_null;
Expand All @@ -882,28 +870,29 @@ void jl_init_tasks(void *stack, size_t ssize)
jl_task_type = jl_new_datatype(jl_symbol("Task"),
jl_any_type,
jl_null,
jl_tuple(11,
jl_tuple(9,
jl_symbol("parent"),
jl_symbol("last"),
jl_symbol("storage"),
jl_symbol("state"),
jl_symbol("consumers"),
jl_symbol("started"),
jl_symbol("done"),
jl_symbol("runnable"),
jl_symbol("result"),
jl_symbol("donenotify"),
jl_symbol("result"),
jl_symbol("exception"),
jl_symbol("code")),
jl_tuple(11,
jl_tuple(9,
jl_any_type, jl_any_type,
jl_any_type, jl_sym_type,
jl_any_type, jl_any_type,
jl_bool_type, jl_bool_type, jl_bool_type,
jl_any_type, jl_any_type,
jl_any_type, jl_function_type),
jl_any_type, jl_any_type, jl_function_type),
0, 1);
jl_tupleset(jl_task_type->types, 0, (jl_value_t*)jl_task_type);
jl_task_type->fptr = jl_f_task;

done_sym = jl_symbol("done");
failed_sym = jl_symbol("failed");
runnable_sym = jl_symbol("runnable");

jl_current_task = (jl_task_t*)allocobj(sizeof(jl_task_t));
jl_current_task->type = (jl_value_t*)jl_task_type;
#ifdef COPY_STACKS
Expand All @@ -920,9 +909,7 @@ void jl_init_tasks(void *stack, size_t ssize)
jl_current_task->last = jl_current_task;
jl_current_task->tls = NULL;
jl_current_task->consumers = NULL;
jl_current_task->started = 1;
jl_current_task->done = 0;
jl_current_task->runnable = 1;
jl_current_task->state = runnable_sym;
jl_current_task->start = NULL;
jl_current_task->result = NULL;
jl_current_task->donenotify = NULL;
Expand Down

0 comments on commit b7af701

Please sign in to comment.