diff --git a/base/channels.jl b/base/channels.jl index 1557504bbe21e..5a4d3ba166098 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -37,6 +37,7 @@ mutable struct Channel{T} <: AbstractChannel{T} excp::Union{Exception, Nothing} # exception to be thrown when state !== :open data::Vector{T} + @atomic length::Int # Length, tracked atomically sz_max::Int # maximum size of channel function Channel{T}(sz::Integer = 0) where T @@ -46,7 +47,7 @@ mutable struct Channel{T} <: AbstractChannel{T} lock = ReentrantLock() cond_put, cond_take = Threads.Condition(lock), Threads.Condition(lock) cond_wait = (sz == 0 ? Threads.Condition(lock) : cond_take) # wait is distinct from take iff unbuffered - return new(cond_take, cond_wait, cond_put, :open, nothing, Vector{T}(), sz) + return new(cond_take, cond_wait, cond_put, :open, nothing, Vector{T}(), 0, sz) end end @@ -318,15 +319,19 @@ end function put_buffered(c::Channel, v) lock(c) + did_buffer = false try + @atomic :monotonic c.length += 1 while length(c.data) == c.sz_max check_channel_state(c) wait(c.cond_put) end push!(c.data, v) + did_buffer = true # notify all, since some of the waiters may be on a "fetch" call. notify(c.cond_take, nothing, true, false) finally + did_buffer || @atomic :monotonic c.length -= 1 unlock(c) end return v @@ -335,6 +340,7 @@ end function put_unbuffered(c::Channel, v) lock(c) taker = try + @atomic :monotonic c.length += 1 while isempty(c.cond_take.waitq) check_channel_state(c) notify(c.cond_wait) @@ -343,6 +349,7 @@ function put_unbuffered(c::Channel, v) # unfair scheduled version of: notify(c.cond_take, v, false, false); yield() popfirst!(c.cond_take.waitq) finally + @atomic :monotonic c.length -= 1 unlock(c) end schedule(taker, v) @@ -389,6 +396,7 @@ function take_buffered(c::Channel) wait(c.cond_take) end v = popfirst!(c.data) + @atomic :monotonic c.length -= 1 notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!. return v finally @@ -417,9 +425,12 @@ immediately, does not block. For unbuffered channels returns `true` if there are tasks waiting on a [`put!`](@ref). """ -isready(c::Channel) = n_avail(c) > 0 -n_avail(c::Channel) = isbuffered(c) ? length(c.data) : length(c.cond_put.waitq) -isempty(c::Channel) = isbuffered(c) ? isempty(c.data) : isempty(c.cond_put.waitq) +isready(c::Channel) = length(c) > 0 +isempty(c::Channel) = length(c) == 0 +function length(c::Channel) + # Lock-free equivalent to `length(c.data) + length(c.cond_put.waitq)` + @atomic(:monotonic, c.length) +end lock(c::Channel) = lock(c.cond_take) lock(f, c::Channel) = lock(f, c.cond_take) @@ -450,12 +461,12 @@ function show(io::IO, ::MIME"text/plain", c::Channel) if !isopen(c) print(io, " (closed)") else - n = n_avail(c) + n = length(c) if n == 0 print(io, " (empty)") else s = n == 1 ? "" : "s" - print(io, " (", n_avail(c), " item$s available)") + print(io, " (", n, " item$s available)") end end end diff --git a/base/condition.jl b/base/condition.jl index be0f618865a48..ce725823dc3b5 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -79,7 +79,6 @@ lock(f, c::GenericCondition) = lock(f, c.lock) # have waiter wait for c function _wait2(c::GenericCondition, waiter::Task) - ct = current_task() assert_havelock(c) push!(c.waitq, waiter) return