diff --git a/base/channels.jl b/base/channels.jl index c19ba140ac397..70862b06cf843 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -317,11 +317,26 @@ function put!(c::Channel{T}, v) where T return isbuffered(c) ? put_buffered(c, v) : put_unbuffered(c, v) end +# Atomically update channel length, *assuming* we hold the channel lock. +macro _inc_channel_length(c, inc) + quote + c = $(esc(c)) + # We hold the channel lock so it's safe to non-atomically read and + # increment c.length + newlen = c.length + $(esc(inc)) + # Atomically store c.length to prevent data races with other threads + # reading the length outside the lock. + @atomic :monotonic c.length = newlen + end +end + function put_buffered(c::Channel, v) lock(c) did_buffer = false try - @atomic :monotonic c.length += 1 + # Increment channel length eagerly (before push!) to count data in the + # buffer as well as offers from tasks which are blocked in wait(). + @_inc_channel_length(c, 1) while length(c.data) == c.sz_max check_channel_state(c) wait(c.cond_put) @@ -331,7 +346,7 @@ function put_buffered(c::Channel, v) # notify all, since some of the waiters may be on a "fetch" call. notify(c.cond_take, nothing, true, false) finally - did_buffer || @atomic :monotonic c.length -= 1 + did_buffer || @_inc_channel_length(c, -1) unlock(c) end return v @@ -340,7 +355,7 @@ end function put_unbuffered(c::Channel, v) lock(c) taker = try - @atomic :monotonic c.length += 1 + @_inc_channel_length(c, 1) while isempty(c.cond_take.waitq) check_channel_state(c) notify(c.cond_wait) @@ -349,7 +364,7 @@ function put_unbuffered(c::Channel, v) # unfair scheduled version of: notify(c.cond_take, v, false, false); yield() popfirst!(c.cond_take.waitq) finally - @atomic :monotonic c.length -= 1 + @_inc_channel_length(c, -1) unlock(c) end schedule(taker, v) @@ -396,7 +411,7 @@ function take_buffered(c::Channel) wait(c.cond_take) end v = popfirst!(c.data) - @atomic :monotonic c.length -= 1 + @_inc_channel_length(c, -1) notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!. return v finally @@ -429,7 +444,7 @@ isready(c::Channel) = length(c) > 0 isempty(c::Channel) = length(c) == 0 function length(c::Channel) # Lock-free equivalent to `length(c.data) + length(c.cond_put.waitq)` - @atomic(:monotonic, c.length) + @atomic :monotonic c.length end lock(c::Channel) = lock(c.cond_take)