Skip to content

Commit

Permalink
delete task->last and the :waiting state
Browse files Browse the repository at this point in the history
the task->last field was unnecessary, and could result in a task staying alive in the gc unnecessarily

:runnable was almost equivalent to :waiting,
except that it might "be restarted unpredictably" (via the task->last field)
  • Loading branch information
vtjnash committed Sep 25, 2015
1 parent 9162a1e commit a7fd45e
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 107 deletions.
1 change: 0 additions & 1 deletion base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@

# type Task
# parent::Task
# last::Task
# storage::Any
# consumers
# started::Bool
Expand Down
13 changes: 2 additions & 11 deletions base/docs/helpdb.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6957,10 +6957,8 @@ of the argument:
* `Task`: Wait for a `Task` to finish, returning its result value. If the task fails with an exception, the exception is propagated (re-thrown in the task that called `wait`).
* `RawFD`: Wait for changes on a file descriptor (see `poll_fd` for keyword arguments and return code)
If no argument is passed, the task blocks for an undefined period. If the task's
state is set to `:waiting`, it can only be restarted by an explicit call to
`schedule` or `yieldto`. If the task's state is `:runnable`, it might be
restarted unpredictably.
If no argument is passed, the task blocks for an undefined period.
A task can only be restarted by an explicit call to `schedule` or `yieldto`.
Often `wait` is called within a `while` loop to ensure a waited-for condition
is met before proceeding.
Expand Down Expand Up @@ -10837,13 +10835,6 @@ Returns `true` if the value of the sign of `x` is negative, otherwise `false`.
"""
signbit

doc"""
istaskstarted(task) -> Bool
Tell whether a task has started executing.
"""
istaskstarted

doc"""
clamp(x, lo, hi)
Expand Down
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,6 @@ export
Condition,
consume,
current_task,
istaskstarted,
istaskdone,
lock,
notify,
Expand Down
4 changes: 2 additions & 2 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,13 @@ end

function serialize(s::SerializationState, t::Task)
serialize_cycle(s, t) && return
if istaskstarted(t) && !istaskdone(t)
if !istaskdone(t)
error("cannot serialize a running Task")
end
writetag(s.io, TASK_TAG)
serialize(s, t.code)
serialize(s, t.storage)
serialize(s, t.state == :queued || t.state == :waiting ? (:runnable) : t.state)
serialize(s, t.state == :queued || t.state == :runnable ? (:runnable) : t.state)
serialize(s, t.result)
serialize(s, t.exception)
end
Expand Down
1 change: 0 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,6 @@ function uv_write(s::LibuvStream, p::Ptr, n::UInt)
end
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
return Int(n)
end
Expand Down
95 changes: 38 additions & 57 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ end

current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed))
istaskstarted(t::Task) = isdefined(t, :last)

yieldto(t::Task, x::ANY = nothing) = ccall(:jl_switchto, Any, (Any, Any), t, x)

Expand Down Expand Up @@ -115,61 +114,52 @@ suppress_excp_printing(t::Task) = isa(t.storage, ObjectIdDict) ? get(get_task_tl
function task_done_hook(t::Task)
err = (t.state == :failed)
result = t.result
nexttask = t.last
handled = true
handled = false
if err
t.backtrace = catch_backtrace()
end

q = t.consumers
t.consumers = nothing

if isa(t.donenotify, Condition) && !isempty(t.donenotify.waitq)
handled = true
notify(t.donenotify, result, error=err)
end

#### un-optimized version
#isa(q,Condition) && notify(q, result, error=err)
if isa(q,Task)
handled = true
nexttask = q
nexttask.state = :runnable
if err
nexttask.exception = result
end
yieldto(nexttask, result) # this terminates the task
elseif isa(q,Condition) && !isempty(q.waitq)
handled = true
notify(q, result, error=err)
else
handled = false
end

t.consumers = nothing

if isa(t.donenotify,Condition)
handled |= !isempty(t.donenotify.waitq)
notify(t.donenotify, result, error=err)
end

if nexttask.state == :runnable
if err
nexttask.exception = result
if err && !handled
if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
active_repl_backend.backend_task.state == :runnable && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result)
end
yieldto(nexttask, result)
else
if err && !handled
if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
active_repl_backend.backend_task.state == :waiting && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result)
end
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(:red, STDERR) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
end
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(:red, STDERR) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
end
end
end
# if a finished task accidentally gets into the queue, wait()
# could return. in that case just take the next task off the queue.
while true
wait()
end
end
wait()
end


Expand Down Expand Up @@ -197,13 +187,9 @@ function produce(v)
wait()
end

t.state = :runnable
t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable"))
if empty
if isempty(Workqueue)
yieldto(t, v)
else
schedule_and_wait(t, v)
end
schedule_and_wait(t, v)
while true
# wait until there are more consumers
q = ct.consumers
Expand Down Expand Up @@ -250,9 +236,8 @@ function consume(P::Task, values...)
end
push!(P.consumers.waitq, ct)
end
ct.state = :waiting

schedule_and_wait(P)
P.state == :runnable ? schedule_and_wait(P) : wait() # don't attempt to queue it twice
end

start(t::Task) = nothing
Expand All @@ -275,16 +260,12 @@ end
function wait(c::Condition)
ct = current_task()

ct.state = :waiting
push!(c.waitq, ct)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
if ct.state == :waiting
ct.state = :runnable
end
rethrow()
end
end
Expand Down Expand Up @@ -314,7 +295,8 @@ notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)
global const Workqueue = Any[]

function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
t.state == :runnable || error("schedule: Task not runnable")
ccall(:uv_stop, Void, (Ptr{Void},), eventloop())
push!(Workqueue, t)
t.state = :queued
t
Expand All @@ -334,16 +316,13 @@ end

# fast version of schedule(t,v);wait()
function schedule_and_wait(t, v=nothing)
t.state == :runnable || error("schedule: Task not runnable")
if isempty(Workqueue)
if t.state == :runnable
return yieldto(t, v)
end
return yieldto(t, v)
else
if t.state == :runnable
t.result = v
push!(Workqueue, t)
t.state = :queued
end
t.result = v
push!(Workqueue, t)
t.state = :queued
end
wait()
end
Expand All @@ -361,10 +340,12 @@ function wait()
end
else
t = shift!(Workqueue)
t.state == :queued || throw(AssertionError("shift!(Workqueue).state == :queued"))
arg = t.result
t.result = nothing
t.state = :runnable
result = yieldto(t, arg)
current_task().state == :runnable || throw(AssertionError("current_task().state == :runnable"))
process_events(false)
# return when we come out of the queue
return result
Expand Down
22 changes: 11 additions & 11 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1559,25 +1559,25 @@ NOINLINE static int gc_mark_module(jl_module_t *m, int d)

static void gc_mark_task_stack(jl_task_t *ta, int d)
{
if (ta->stkbuf != NULL || ta == jl_current_task || ta == jl_root_task) {
if (ta == jl_current_task) {
gc_mark_stack((jl_value_t*)ta, jl_pgcstack, 0, d);
}
else {
ptrint_t offset = 0;
if (ta == jl_current_task) {
gc_mark_stack((jl_value_t*)ta, jl_pgcstack, 0, d);
}
else if (ta == jl_root_task) {
gc_mark_stack((jl_value_t*)ta, ta->gcstack, 0, d);
}
else if (ta->stkbuf != NULL && ta->stkbuf != (void*)(intptr_t)-1) {
#ifdef COPY_STACKS
if (ta != jl_root_task)
offset = (char *)ta->stkbuf + ta->ssize - (char *)jl_stackbase;
ptrint_t offset = (char *)ta->stkbuf + ta->ssize - (char *)jl_stackbase;
#else
ptrint_t offset = 0;
#endif
gc_mark_stack((jl_value_t*)ta, ta->gcstack, offset, d);
}
gc_mark_stack((jl_value_t*)ta, ta->gcstack, offset, d);
}
}

NOINLINE static void gc_mark_task(jl_task_t *ta, int d)
{
if (ta->parent) gc_push_root(ta->parent, d);
if (ta->last) gc_push_root(ta->last, d);
gc_push_root(ta->tls, d);
gc_push_root(ta->consumers, d);
gc_push_root(ta->donenotify, d);
Expand Down
1 change: 0 additions & 1 deletion src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,6 @@ typedef struct _jl_handler_t {
typedef struct _jl_task_t {
JL_DATA_TYPE
struct _jl_task_t *parent;
struct _jl_task_t *last;
jl_value_t *tls;
jl_sym_t *state;
jl_value_t *consumers;
Expand Down
30 changes: 14 additions & 16 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ static void NORETURN finish_task(jl_task_t *t, jl_value_t *resultval)
#ifdef COPY_STACKS
// early free of stkbuf
void *stkbuf = t->stkbuf;
if (stkbuf != NULL) {
t->stkbuf = NULL;
if (stkbuf != NULL && stkbuf != (void*)(intptr_t*)-1) {
t->stkbuf = (void*)(intptr_t)-1;
free(stkbuf - sizeof(size_t));
}
#endif
Expand Down Expand Up @@ -344,8 +344,6 @@ static void ctx_switch(jl_task_t *t)
jl_current_module = last->current_module;
}

t->last = jl_current_task;
jl_gc_wb(t, t->last);
jl_current_task = t;

#if defined(_OS_WINDOWS_) && !defined(COPY_STACKS)
Expand Down Expand Up @@ -424,9 +422,12 @@ JL_THREAD jl_value_t * volatile jl_task_arg_in_transit;
extern int jl_in_gc;
DLLEXPORT jl_value_t *jl_switchto(jl_task_t *t, jl_value_t *arg)
{
if (t == jl_current_task) {
throw_if_exception_set(t);
return arg;
}
if (t->state == done_sym || t->state == failed_sym ||
// task started but stkbuf NULL'd => finish_task ran
(t->last != NULL && t->stkbuf == NULL && t != jl_current_task)) {
(t->stkbuf == (void*)(intptr_t)-1)) {
if (t->exception != jl_nothing)
jl_throw(t->exception);
return t->result;
Expand Down Expand Up @@ -824,7 +825,6 @@ DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize)
t->ssize = ssize;
t->current_module = NULL;
t->parent = jl_current_task;
t->last = NULL;
t->tls = jl_nothing;
t->consumers = jl_nothing;
t->state = runnable_sym;
Expand Down Expand Up @@ -856,8 +856,8 @@ DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize)
static void jl_unprotect_stack(jl_task_t *t)
{
void *stk = t->stkbuf;
if (stk) {
t->stkbuf = NULL;
if (stk && stk != (void*)(intptr_t)-1) {
t->stkbuf = (void*)(intptr_t)-1;
#ifdef COPY_STACKS
free(stk - sizeof(size_t));
#else
Expand All @@ -882,9 +882,8 @@ void jl_init_tasks(void)
jl_task_type = jl_new_datatype(jl_symbol("Task"),
jl_any_type,
jl_emptysvec,
jl_svec(10,
jl_svec(9,
jl_symbol("parent"),
jl_symbol("last"),
jl_symbol("storage"),
jl_symbol("state"),
jl_symbol("consumers"),
Expand All @@ -893,13 +892,13 @@ void jl_init_tasks(void)
jl_symbol("exception"),
jl_symbol("backtrace"),
jl_symbol("code")),
jl_svec(10,
jl_any_type, jl_any_type,
jl_svec(9,
jl_any_type,
jl_any_type, jl_sym_type,
jl_any_type, jl_any_type,
jl_any_type, jl_any_type,
jl_any_type, jl_function_type),
0, 1, 0);
0, 1, 8);
jl_svecset(jl_task_type->types, 0, (jl_value_t*)jl_task_type);

done_sym = jl_symbol("done");
Expand Down Expand Up @@ -975,8 +974,7 @@ void jl_init_root_task(void *stack, size_t ssize)
jl_current_task->started = 1;
jl_current_task->parent = jl_current_task;
jl_current_task->current_module = jl_current_module;
jl_current_task->last = jl_current_task;
jl_current_task->tls = NULL;
jl_current_task->tls = jl_nothing;
jl_current_task->consumers = jl_nothing;
jl_current_task->state = runnable_sym;
jl_current_task->start = NULL;
Expand Down
Loading

0 comments on commit a7fd45e

Please sign in to comment.