Skip to content

Commit

Permalink
channels: remove WeakRef from Condition (#31673)
Browse files Browse the repository at this point in the history
Using a WeakRef meant we might not actually `bind` the result.
If nobody was still holding a reference to put contents into the Condition,
we would simply garbage collect it, and then never need to close it.
Since that does not seem to be the intent,
instead move to just keeping a strong reference
(alternatively, we would have to switch to using `stream_wait`
with ref-counting, but that seems suboptimal for several reasons.).

fix #31507
  • Loading branch information
vtjnash authored and JeffBezanson committed Apr 11, 2019
1 parent b4e95cb commit 29f61cd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
51 changes: 25 additions & 26 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ Stacktrace:
```
"""
function bind(c::Channel, task::Task)
ref = WeakRef(c)
register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref))
# TODO: implement "schedulewait" and deprecate taskdone_hook
#T = Task(() -> close_chnl_on_taskdone(task, c))
#schedulewait(task, T)
register_taskdone_hook(task, tsk -> close_chnl_on_taskdone(tsk, c))
return c
end

Expand Down Expand Up @@ -223,33 +225,30 @@ function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
return (chnls, tasks)
end

function close_chnl_on_taskdone(t::Task, ref::WeakRef)
c = ref.value
if c isa Channel
isopen(c) || return
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
function close_chnl_on_taskdone(t::Task, c::Channel)
isopen(c) || return
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
close(c)
return
finally
unlock(c)
end
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
close(c)
return
finally
unlock(c)
end
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
end
nothing
end
Expand Down
11 changes: 6 additions & 5 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,20 @@ using Distributed
@testset "channels bound to tasks" for N in [0, 10]
# Normal exit of task
c = Channel(N)
bind(c, @async (yield(); nothing))
bind(c, @async (GC.gc(); yield(); nothing))
@test_throws InvalidStateException take!(c)
@test !isopen(c)

# Error exception in task
c = Channel(N)
bind(c, @async (yield(); error("foo")))
bind(c, @async (GC.gc(); yield(); error("foo")))
@test_throws ErrorException take!(c)
@test !isopen(c)

# Multiple channels closed by the same bound task
cs = [Channel(N) for i in 1:5]
tf2 = () -> begin
tf2() = begin
GC.gc()
if N > 0
foreach(c -> (@assert take!(c) === 2), cs)
end
Expand Down Expand Up @@ -129,16 +130,16 @@ using Distributed
# Multiple tasks, first one to terminate closes the channel
nth = rand(1:5)
ref = Ref(0)
cond = Condition()
tf3(i) = begin
GC.gc()
if i == nth
ref[] = i
else
sleep(2.0)
end
end

tasks = [Task(()->tf3(i)) for i in 1:5]
tasks = [Task(() -> tf3(i)) for i in 1:5]
c = Channel(N)
foreach(t -> bind(c, t), tasks)
foreach(schedule, tasks)
Expand Down

0 comments on commit 29f61cd

Please sign in to comment.