From df7634a1bda54e49c293f92733ea463527702d8c Mon Sep 17 00:00:00 2001 From: Chris Foster Date: Tue, 24 Aug 2021 10:51:55 +1000 Subject: [PATCH] Avoid atomic increment of channel length This is unnecessary, as we hold the lock in any places where the length is updated. --- base/channels.jl | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/base/channels.jl b/base/channels.jl index c19ba140ac397..66c73c9e3310e 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -317,11 +317,23 @@ function put!(c::Channel{T}, v) where T return isbuffered(c) ? put_buffered(c, v) : put_unbuffered(c, v) end +# Atomically update channel length, *assuming* we hold the channel lock. +function _increment_channel_length(c, inc) + # We hold the channel lock so it's safe to non-atomically read and + # increment c.length + newlen = c.length + inc + # Atomically store c.length to prevent data races with other threads + # reading the length outside the lock. + @atomic :monotonic c.length = newlen +end + function put_buffered(c::Channel, v) lock(c) did_buffer = false try - @atomic :monotonic c.length += 1 + # Increment channel length eagerly (before push!) to count data in the + # buffer as well as offers from tasks which are blocked in wait(). + _increment_channel_length(c, 1) while length(c.data) == c.sz_max check_channel_state(c) wait(c.cond_put) @@ -331,7 +343,7 @@ function put_buffered(c::Channel, v) # notify all, since some of the waiters may be on a "fetch" call. notify(c.cond_take, nothing, true, false) finally - did_buffer || @atomic :monotonic c.length -= 1 + did_buffer || _increment_channel_length(c, -1) unlock(c) end return v @@ -340,7 +352,7 @@ end function put_unbuffered(c::Channel, v) lock(c) taker = try - @atomic :monotonic c.length += 1 + _increment_channel_length(c, 1) while isempty(c.cond_take.waitq) check_channel_state(c) notify(c.cond_wait) @@ -349,7 +361,7 @@ function put_unbuffered(c::Channel, v) # unfair scheduled version of: notify(c.cond_take, v, false, false); yield() popfirst!(c.cond_take.waitq) finally - @atomic :monotonic c.length -= 1 + _increment_channel_length(c, -1) unlock(c) end schedule(taker, v) @@ -396,7 +408,7 @@ function take_buffered(c::Channel) wait(c.cond_take) end v = popfirst!(c.data) - @atomic :monotonic c.length -= 1 + _increment_channel_length(c, -1) notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!. return v finally @@ -429,7 +441,7 @@ isready(c::Channel) = length(c) > 0 isempty(c::Channel) = length(c) == 0 function length(c::Channel) # Lock-free equivalent to `length(c.data) + length(c.cond_put.waitq)` - @atomic(:monotonic, c.length) + @atomic :monotonic c.length end lock(c::Channel) = lock(c.cond_take)