Skip to content

Commit

Permalink
Avoid atomic increment of channel length
Browse files Browse the repository at this point in the history
This is unnecessary, as we hold the lock in any places where the length
is updated.
  • Loading branch information
c42f committed Aug 24, 2021
1 parent 6f56e93 commit 190be09
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,26 @@ function put!(c::Channel{T}, v) where T
return isbuffered(c) ? put_buffered(c, v) : put_unbuffered(c, v)
end

# Atomically update channel length, *assuming* we hold the channel lock.
macro _inc_channel_length(c, inc)
quote
c = $(esc(c))
# We hold the channel lock so it's safe to non-atomically read and
# increment c.length
newlen = c.length + $(esc(inc))
# Atomically store c.length to prevent data races with other threads
# reading the length outside the lock.
@atomic :monotonic c.length = newlen
end
end

function put_buffered(c::Channel, v)
lock(c)
did_buffer = false
try
@atomic :monotonic c.length += 1
# Increment channel length eagerly (before push!) to count data in the
# buffer as well as offers from tasks which are blocked in wait().
@_inc_channel_length(c, 1)
while length(c.data) == c.sz_max
check_channel_state(c)
wait(c.cond_put)
Expand All @@ -331,7 +346,7 @@ function put_buffered(c::Channel, v)
# notify all, since some of the waiters may be on a "fetch" call.
notify(c.cond_take, nothing, true, false)
finally
did_buffer || @atomic :monotonic c.length -= 1
did_buffer || @_inc_channel_length(c, -1)
unlock(c)
end
return v
Expand All @@ -340,7 +355,7 @@ end
function put_unbuffered(c::Channel, v)
lock(c)
taker = try
@atomic :monotonic c.length += 1
@_inc_channel_length(c, 1)
while isempty(c.cond_take.waitq)
check_channel_state(c)
notify(c.cond_wait)
Expand All @@ -349,7 +364,7 @@ function put_unbuffered(c::Channel, v)
# unfair scheduled version of: notify(c.cond_take, v, false, false); yield()
popfirst!(c.cond_take.waitq)
finally
@atomic :monotonic c.length -= 1
@_inc_channel_length(c, -1)
unlock(c)
end
schedule(taker, v)
Expand Down Expand Up @@ -396,7 +411,7 @@ function take_buffered(c::Channel)
wait(c.cond_take)
end
v = popfirst!(c.data)
@atomic :monotonic c.length -= 1
@_inc_channel_length(c, -1)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
return v
finally
Expand Down Expand Up @@ -429,7 +444,7 @@ isready(c::Channel) = length(c) > 0
isempty(c::Channel) = length(c) == 0
function length(c::Channel)
# Lock-free equivalent to `length(c.data) + length(c.cond_put.waitq)`
@atomic(:monotonic, c.length)
@atomic :monotonic c.length
end

lock(c::Channel) = lock(c.cond_take)
Expand Down

0 comments on commit 190be09

Please sign in to comment.