Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data races in n_avail(::Channel) to fix isready/isempty #41833

Merged
merged 1 commit into from
Nov 10, 2021

Conversation

c42f
Copy link
Member

@c42f c42f commented Aug 9, 2021

Make it clear that isready() is not threadsafe, as it has a data race during reading of the channel buffer or waitq length.

Alternatively we could make isready threadsafe with lock() but perhaps this would cause more harm than good? It also wouldn't make uses of isready() non-racy without a lock around the rest of the user's code which interacts with the channel. So for now I've just documented the current state of affairs.

Also remove an unlocked optimistic use of isready() in wait() to avoid the data race.

vchuravy
vchuravy previously approved these changes Aug 9, 2021
@vtjnash
Copy link
Member

vtjnash commented Aug 9, 2021

I don't think this is necessary ("benign"). If you have a single-reader, this function may even be useful. Ideally, we could make this read atomic-relaxed so that TSan tooling would ignore it too.

@c42f
Copy link
Member Author

c42f commented Aug 10, 2021

I don't think this is necessary ("benign")

Yes I wondered about this. But I worry that declaring data races benign is often underestimating the combined "creativity" of users and the optimizer.

For example, for users who loop on isready() for some reason, I'd be concerned that the compiler can hoist the load out of the loop under data-race-free assumption.

Consider apparently innocent (if bizarre) user code such as

function foo(c::Channel)
    s = 0
    for i=1:typemax(Int)
        !isready(c) || break
        s += i^2 + i^3 - 1 # some computation not involving memory operations
    end
    s
end

Here the user expects to break out of the computation if the channel becomes nonempty. But this is not what seems to happen:

julia> c = Channel(Inf)
Channel{Any}(9223372036854775807) (empty)

julia> t = Threads.@spawn foo(c)
Task (runnable) @0x00007efd21e428c0

julia> put!(c, 1)
1

# Still running!?
julia> t
Task (runnable) @0x00007efd21e428c0

My guess is that the compiler has hoisted the load in this case?

For context, I came across code involving isready loops while trying to understand/fix the connection pool HTTP.jl and ended up reading the Base Channel implementation to understand which use patterns are valid. (In the case of HTTP.jl, the code holds the channel's lock while calling isready() followed by take!(), which is strange but probably-valid-I-guess...)

Ideally, we could make this read atomic-relaxed so that TSan tooling would ignore it too.

Sure, what's the right way to go about this? Would it prevent the problem above?

@tkf
Copy link
Member

tkf commented Aug 15, 2021

If we read/write the underlying data (the vector length) of isready using the relaxed ordering, I think it'd allow this behavior

ref = Ref(0)
c = Channel(1)
@spawn begin
    ref[] = 1
    put!(c, nothing)
end
if isready(c)
    @assert ref[] == 0
end

which is rather counter intuitive. Explicitly documenting the lack of ordering may be one option, though.

My guess is that the compiler has hoisted the load in this case?

This can happen with atomics too:

CppCon 2016: JF Bastien “No Sane Compiler Would Optimize Atomics" - YouTube (around 45:16)

@c42f
Copy link
Member Author

c42f commented Aug 16, 2021

This can happen with atomics too

Yikes. This makes me wonder what guarantees, if any, we can make about isready(). Overall it seems that no memory ordering can help with the loop above as the loop body doesn't contain any memory operations to order with respect to the load in isready(), and yet the code is superficially coherent.

I'm a little confused by the talk because they use std::atomic<int> and operator++ in the code which should use memory_order_seq_cst by default. But they talk about it as if it's relaxed memory ordering which enables these optimizations.

@tkf
Copy link
Member

tkf commented Aug 16, 2021

I wonder how other concurrent collection libraries do this. For example, java.util.concurrent.ConcurrentLinkedQueue does talk about memory ordering

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentLinkedQueue happen-before actions subsequent to the access or removal of that element from the ConcurrentLinkedQueue in another thread.

--- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

... but only for "put!" and "take!". It has the isEmpty method but its documentation doesn't say anything about the ordering.

C# also doesn't say anything about it ConcurrentQueue.IsEmpty Property (System.Collections.Concurrent) | Microsoft Docs. Similar dicussion, but the answer seems to be no: c# - Does ConcurrentQueue.IsEmpty require a memory barrier? - Stack Overflow

Overall, monotonic might not be so crazy from these two data points.

I commented on the puzzling things in the talk here https://julialang.zulipchat.com/#narrow/stream/236830-concurrency/topic/Atomics.20progress.20bar.20example

@c42f
Copy link
Member Author

c42f commented Aug 18, 2021

I'm a little confused by the talk because they use std::atomic<int> and operator++ in the code which should use memory_order_seq_cst by default.

It turns out that they correct this in the talk to say that they meant to use relaxed ordering in the example. (Also the talk predates the standard). In any case, the talk is discussing the coalescing of relaxed stores. For loads, it currently looks like clang won't hoist a relaxed read out of a loop, at least in the case that it affects control flow (https://godbolt.org/z/bhM4e4dz3)

It has the isEmpty method but its documentation doesn't say anything about the ordering.

Practically, OpenJDK checks that first() is not null, which involves reading a bunch of java volatile variables - seems similar to the implementation of their poll() ("take!") so likely this is sensibly ordered. See https://github.com/openjdk/jdk/blob/ec63957f9d103e86d3b8e235e79cabb8992cb3ca/src/java.base/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java#L427

I also looked at Go's len() builtin implementation for channels. It seems that they just use a nonatomic load. Though they do have special purpose codegen for len() so it's not super easy to see what's going on. See https://github.com/golang/go/blob/946e2543f878929752da9d16575dd5e96ac78532/src/cmd/compile/internal/ssagen/ssa.go#L5921 (Perhaps this is because code which spins on len() is just pretty bad, and likely completely unnecessary in Go, so nobody saw a need to care about about ordering.)

@tkf
Copy link
Member

tkf commented Aug 18, 2021

I think the real problem is how to specify (i.e., document) the behavior (rather than how to implement them) so that users can reason about their code based on a proper memory model. Also, there are two distinct questions:

  1. Should isready(ch) be callable without locking ch when ch is shared between the worker threads.
  2. Should isready(ch) participate in the happens-before relationship.

I initially thought yes for both, but now that looking at other channel specifications, no for 2 sounds reasonable to me. Also, relying on isready is not great use of the channel anyway.

(But looking at Go is a good idea. The len documentation and "Length and capacity" specification do not say anything about the ordering guarantee.)

@c42f
Copy link
Member Author

c42f commented Aug 18, 2021

I think the real problem is how to specify (i.e., document) the behavior (rather than how to implement them)

Definitely. Though looking at the implementation can be a nice way to guess the intent when the documentation is missing :-) If I understand the Go implementation, I think it may be that "nobody bothered to specify it, because len() is inherently racy and you shouldn't use it that way".

Should isready(ch) be callable without locking ch when ch is shared between the worker threads

I think so, yes. More generally it seems useful to be able to read length(ch) without any possibility of blocking. Currently length isn't implemented, but there's the internal function n_avail which is used to implement show. It would be no good if show could block just because we want to display an estimate of the current channel length!

Should isready(ch) participate in the happens-before relationship

Maybe not — I don't see what value that would bring if it's inherently racy to use the return of isready(). If the user needs consistency between isready and take!, they can lock the channel explicitly I guess?

@vtjnash
Copy link
Member

vtjnash commented Aug 18, 2021

There are at least 3 valid (non-racy) uses of isready that I am aware of:

  • single reader: if isready(channel); fetch(channel); end (though not recommended, so Julia's IO patterns generally try to force you into a blocking mindset rather than a spinloop)
  • implementing tryfetch: lock(channel) do; isready(channel) ? Some(fetch(channel)) : nothing; end (same comment as for single-reader)
  • statistics measurements (i.e. implementing show): sum(isready, channels) / sum(n_avail, channels) / etc.

None of these require memory orderings stronger than relaxed.

The other example (#41833 (comment)) with Ref above seems fixable by giving this read acquire ordering, but is that useful?

@tkf
Copy link
Member

tkf commented Aug 19, 2021

Yeah, I agree relaxed read/write is sufficient (and also increases the classes of code we can write). My initial comment was only looking at the name isready literally.

A tricky use-case I just realized was sync_end

julia/base/task.jl

Lines 344 to 347 in 543386d

function sync_end(c::Channel{Any})
local c_ex
while isready(c)
r = take!(c)

I think this code (+ :monotonic) would have been OK if we could assume all the tasks are scheduled through @sync. Since it's not the case, #41927 tries to address it. There is also a similar issue around isopen.

@tkf tkf added the multithreading Base.Threads and related functionality label Aug 19, 2021
@c42f
Copy link
Member Author

c42f commented Aug 20, 2021

I noticed that isopen() and isempty() have data races as well, so we've got:

  • isopen()
  • isempty()
  • isready()
  • n_avail() (should probably just be length())

@c42f c42f force-pushed the cjf/fix-channel-data-race branch from 3e08033 to 9790e41 Compare August 22, 2021 05:41
@c42f c42f changed the title Document non-threadsafety of isready(::Channel) and remove data race in wait Add length(::Channel) and fix data races in isready/isempty Aug 22, 2021
@c42f
Copy link
Member Author

c42f commented Aug 22, 2021

Ok, I've completely changed tack here to add a new atomic length field, and implemented length(::Channel), isready and isempty in terms of this.

This is slightly redundant in that we should "always" have c.length == length(c.data) + length(c.cond_put.waitq), but doing this avoids the much larger changes needed to allow the length field of data and waitq to be written and read atomically.

There's a slight change / improvement of semantic here between length(::Channel) and the old n_avail(::Channel) — we now count any tasks which are waiting on put! for buffered channels (in addition to the buffer length).

Does this sound like a good compromise? If so I'll add test coverage for length() as necessary.

@tkf
Copy link
Member

tkf commented Aug 22, 2021

This sounds like the best approach ATM to me.

@c42f
Copy link
Member Author

c42f commented Aug 23, 2021

Ok, I've added tests for the new length() behavior.

I think the existing use of isready() inside the implementation of wait() is now valid, so I've reverted that part back to the original code:

function wait(c::Channel)
    isready(c) && return  # OK ??
    lock(c)
    try
        while !isready(c)  # OK ??
            check_channel_state(c)
            wait(c.cond_wait)
        end
    finally
        unlock(c)
    end
    nothing
end

Why do I think this is valid? Well, we can use wait() inside or outside a lock:

Racy use outside a lock

wait(c)
# At this point, we only know `c` had a value "recently" or "soon"

There's really no guarantee here, so :monotonic read of c.length outside the lock should be ok.

Use inside a lock

lock(c)
wait(c)
# At this point we know `c` definitely has a value
unlock(c)

This use should be valid independent of c.length being atomic at all, because stores to c.length only happen inside the lock elsewhere in the code.

@tkf
Copy link
Member

tkf commented Aug 23, 2021

I agree with the reasoning.

base/channels.jl Outdated
try
@atomic :monotonic c.length += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@atomic :monotonic c.length += 1
@atomic :monotonic c.length = c.length + 1 # just atomic store, not increment

we don't actually want atomic increment (which is often quite slow), but simply a monotonic store

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, this is inside the lock... so confusing!

So (to check I understand) I think this means:

  • We don't need an atomic load — no other thread can be storing to this in parallel
  • For the same reasons, we don't need atomic increment
  • We do require atomic (monotonic) store, as other threads may do an atomic load outside the lock

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go through and update all the places that this pattern occurs.

while length(c.data) == c.sz_max
check_channel_state(c)
wait(c.cond_put)
end
push!(c.data, v)
did_buffer = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am slightly confused what this is a count of precisely. Why not do the increment here, so it is always a lower bound on the number of items which are available, rather than being slightly ahead of the items that are available?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to count the tasks waiting in wait(c.cond_put), for consistency with unbuffered channels.

If we don't do this, the count of "available items" is quite inconsistent between (finite) buffered vs unbuffered channels.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I don't have a strong opinion on the "right" answer here. I just think we should be consistent in the way we count "available" items in buffered/unbuffered cases.

Copy link
Member

@tkf tkf Aug 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree it's nice to be consistent.

Another approach to make it consistent is to have length(unbuffered_channel) == 0 and length(buffered_channel) == length(buffered_channel.data), right? Maybe it's reasonable in some sense, since we'd have

lock(ch)
n = length(ch)
close(ch)
unlock(ch)
@assert length(collect(ch)) == n

when there are no takers. This property seems intuitive to me. (Though I guess it makes length rather useless for the internals...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is definitely the other option 👍

I think the current system might be more useful as it gives you more options for applying backpressure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely convinced this is problem. I feel there's a whole pile of caveats which we could use to argue against many of the standard verbs when applied to concurrent and/or blocking containers.

I guess we should ask the question:

  • What else could length() possibly mean, such that it would make "normal generic container code" work with a Channel?

Another random thought — we now have the notion of closewrite() (currently shutdown()) which is meant for closing the writer side of full duplex IO streams. closewrite(::Channel) could also make sense and would allow the iteration operation you're describing to succeed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be very happy if you can come up with a concise set of precondition, postcondition and invariance of length that encompasses the definition of length in this PR. I just think it's very hard if not impossible.

Also, I don't think concurrency is relevant to my point. I tried to work around the specification issue by adding the condition "there are no interference from other tasks" (i.e., no receivers and no new senders). My point is that pre-close channel does not have a well-defined length that is compatible with iterate. So, I don't think closewrite fixes the issue since the point of length in this PR (aka "n_avail") is querying the state of the channel before it's closed (for supporting the backpressure usecase).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's hard, which is why I already reverted to n_avail yesterday. I assume you noticed this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we take the two cases separately:

  • blocking channels: acts as a threading barrier or semaphore, giving the upper bound on the number of items that are currently already blocked waiting to write into this channel
  • non-blocking channel: acts as a yield-free predicate, giving the lower bound on the number of items that can be read from the channel without yielding

Perhaps the comparison is that the former condition, in a batch processing system, is relevant for flow-control of writers, and the later is relevant for flow-control of readers? Where the readers/writers want to spend their efforts working on the queue with the greatest demand / most capacity?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted to n_avail

Ah, sorry, I didn't notice.

the former condition, in a batch processing system, is relevant for flow-control of writers, and the later is relevant for flow-control of readers

Hmm... interesting. This feels like an important point. Implementing this idea requires supporting length(c.data) and n_avail separately.

@@ -417,9 +425,12 @@ immediately, does not block.
For unbuffered channels returns `true` if there are tasks waiting
on a [`put!`](@ref).
"""
isready(c::Channel) = n_avail(c) > 0
n_avail(c::Channel) = isbuffered(c) ? length(c.data) : length(c.cond_put.waitq)
isempty(c::Channel) = isbuffered(c) ? isempty(c.data) : isempty(c.cond_put.waitq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think length(c.cond_put.waitq) should also be a non-data-race-y thing to access. Probably not currently implemented as such, but perhaps ideally would be? (rather tricky with the knowledge that it might get removed from the queue and put in another queue, so perhaps not possible though)

base/channels.jl Outdated Show resolved Hide resolved
@c42f c42f force-pushed the cjf/fix-channel-data-race branch from 190be09 to df7634a Compare August 25, 2021 00:23
@c42f c42f changed the title Add length(::Channel) and fix data races in isready/isempty Fix data races in n_avail(::Channel) to fix isready/isempty Aug 26, 2021
@c42f
Copy link
Member Author

c42f commented Aug 26, 2021

Ok, I've reverted the use of Base.length(::Channel) to the internal n_avail() so we can merge the fixes here without getting caught up in the API question.

Overall I still think it would make sense to use length() for this but we can have that debate separately.

@vchuravy vchuravy dismissed their stale review August 26, 2021 09:34

done on earlier version

Copy link
Member

@tkf tkf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM other than the naming of the field .length and the related method.

base/channels.jl Outdated Show resolved Hide resolved
base/channels.jl Outdated Show resolved Hide resolved
@tkf
Copy link
Member

tkf commented Aug 29, 2021

Actually, never mind. n_avail doesn't say what is available so it's not a very strictly better name for the field than length. We could call it n_avail_items (other available things are takers) but I don't have a strong opinion on this.

@c42f c42f force-pushed the cjf/fix-channel-data-race branch from 4888119 to ac0a4ae Compare September 30, 2021 03:40
test/channels.jl Outdated Show resolved Hide resolved
base/channels.jl Show resolved Hide resolved
@c42f c42f force-pushed the cjf/fix-channel-data-race branch from c203a3f to a38092d Compare November 6, 2021 02:16
@c42f
Copy link
Member Author

c42f commented Nov 6, 2021

Ok, I added that suggestion + rebased now. This PR has languished, let's get it merged once CI passes.

test/channels.jl Outdated Show resolved Hide resolved
test/channels.jl Outdated Show resolved Hide resolved
@KristofferC
Copy link
Member

Needs a rebase to fix CI I guess.

@c42f c42f force-pushed the cjf/fix-channel-data-race branch from f65f386 to 89e19a9 Compare November 6, 2021 21:15
base/condition.jl Outdated Show resolved Hide resolved
This removes the data race from isready() and isempty(), which are now
implemented in terms of n_avail(). A new atomic `n_avail` field is added
to track the "current number of available items" (buffered + waiting
tasks). This is separate from the buffer and wait queue because these
consist of `Vector`s which cannot easily have their length fields read
and written atomically.

For buffered channels, the n_avail now includes a count of any waiting
tasks in addition to the number of buffered items. This makes it
consistent with the computation for unbuffered channels.

Co-authored-by: Takafumi Arakaki <[email protected]>
@c42f c42f force-pushed the cjf/fix-channel-data-race branch from 2c6b3fa to 13f9a5e Compare November 8, 2021 07:02
@c42f
Copy link
Member Author

c42f commented Nov 10, 2021

The buildkite failure looks unrelated. Let's merge this.

@c42f c42f merged commit 924a13a into master Nov 10, 2021
@c42f c42f deleted the cjf/fix-channel-data-race branch November 10, 2021 03:49
LilithHafner pushed a commit to LilithHafner/julia that referenced this pull request Feb 22, 2022
…g#41833)

This removes the data race from isready() and isempty(), which are now
implemented in terms of n_avail(). A new atomic `n_avail` field is added
to track the "current number of available items" (buffered + waiting
tasks). This is separate from the buffer and wait queue because these
consist of `Vector`s which cannot easily have their length fields read
and written atomically.

For buffered channels, the n_avail now includes a count of any waiting
tasks in addition to the number of buffered items. This makes it
consistent with the computation for unbuffered channels.

Co-authored-by: Takafumi Arakaki <[email protected]>
LilithHafner pushed a commit to LilithHafner/julia that referenced this pull request Mar 8, 2022
…g#41833)

This removes the data race from isready() and isempty(), which are now
implemented in terms of n_avail(). A new atomic `n_avail` field is added
to track the "current number of available items" (buffered + waiting
tasks). This is separate from the buffer and wait queue because these
consist of `Vector`s which cannot easily have their length fields read
and written atomically.

For buffered channels, the n_avail now includes a count of any waiting
tasks in addition to the number of buffered items. This makes it
consistent with the computation for unbuffered channels.

Co-authored-by: Takafumi Arakaki <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multithreading Base.Threads and related functionality
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants