From 359258bda431f335f0f206aec2cba9813bc6e7b0 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Wed, 10 Apr 2019 12:48:49 -0400 Subject: [PATCH] channels: remove WeakRef from Condition Using a WeakRef meant we might not actually `bind` the result. If nobody was still holding a reference to put contents into the Condition, we would simply garbage collect it, and then never need to close it. Since that does not seem to be the intent, instead move to just keeping a strong reference (alternatively, we would have to switch to using `stream_wait` with ref-counting, but that seems suboptimal for several reasons.). fix #31507 --- base/channels.jl | 51 ++++++++++++++++++++++++------------------------ test/channels.jl | 11 ++++++----- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/base/channels.jl b/base/channels.jl index 27f1719341375..38145f0d636d8 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -192,8 +192,10 @@ Stacktrace: ``` """ function bind(c::Channel, task::Task) - ref = WeakRef(c) - register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref)) + # TODO: implement "schedulewait" and deprecate taskdone_hook + #T = Task(() -> close_chnl_on_taskdone(task, c)) + #schedulewait(task, T) + register_taskdone_hook(task, tsk -> close_chnl_on_taskdone(tsk, c)) return c end @@ -223,33 +225,30 @@ function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n)) return (chnls, tasks) end -function close_chnl_on_taskdone(t::Task, ref::WeakRef) - c = ref.value - if c isa Channel - isopen(c) || return - cleanup = () -> try - isopen(c) || return - if istaskfailed(t) - excp = task_result(t) - if excp isa Exception - close(c, excp) - return - end +function close_chnl_on_taskdone(t::Task, c::Channel) + isopen(c) || return + cleanup = () -> try + isopen(c) || return + if istaskfailed(t) + excp = task_result(t) + if excp isa Exception + close(c, excp) + return end - close(c) - return - finally - unlock(c) end - if trylock(c) - # can't use `lock`, since attempts to task-switch to wait for it - # will just silently fail and leave us with broken state - cleanup() - else - # so schedule this to happen once we are finished destroying our task - # (on a new Task) - @async (lock(c); cleanup()) + close(c) + return + finally + unlock(c) end + if trylock(c) + # can't use `lock`, since attempts to task-switch to wait for it + # will just silently fail and leave us with broken state + cleanup() + else + # so schedule this to happen once we are finished destroying our task + # (on a new Task) + @async (lock(c); cleanup()) end nothing end diff --git a/test/channels.jl b/test/channels.jl index c73d2dcb054c2..a01c1d98972a5 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -88,19 +88,20 @@ using Distributed @testset "channels bound to tasks" for N in [0, 10] # Normal exit of task c = Channel(N) - bind(c, @async (yield(); nothing)) + bind(c, @async (GC.gc(); yield(); nothing)) @test_throws InvalidStateException take!(c) @test !isopen(c) # Error exception in task c = Channel(N) - bind(c, @async (yield(); error("foo"))) + bind(c, @async (GC.gc(); yield(); error("foo"))) @test_throws ErrorException take!(c) @test !isopen(c) # Multiple channels closed by the same bound task cs = [Channel(N) for i in 1:5] - tf2 = () -> begin + tf2() = begin + GC.gc() if N > 0 foreach(c -> (@assert take!(c) === 2), cs) end @@ -129,8 +130,8 @@ using Distributed # Multiple tasks, first one to terminate closes the channel nth = rand(1:5) ref = Ref(0) - cond = Condition() tf3(i) = begin + GC.gc() if i == nth ref[] = i else @@ -138,7 +139,7 @@ using Distributed end end - tasks = [Task(()->tf3(i)) for i in 1:5] + tasks = [Task(() -> tf3(i)) for i in 1:5] c = Channel(N) foreach(t -> bind(c, t), tasks) foreach(schedule, tasks)