Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Make unbounded channels size warning exact (part 1) #13117

Closed
wants to merge 1 commit into from

Conversation

dmitry-markin
Copy link
Contributor

@dmitry-markin dmitry-markin commented Jan 10, 2023

This is a follow-up to #12971 & #13020. As discussed in #12971 (comment), it's desirable to set exact warning threshold on the unbounded channel queue size.

The implementation relies on Release & Acquire memory ordering properties to make sure the counter modification order is increment first and then decrement (and therefore we don't hit the unsigned integer underflow). This relies on the following guarantees:

A load operation with this memory order performs the acquire operation on the affected memory location: no reads or writes in the current thread can be reordered before this load.

and

A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store.

(See https://en.cppreference.com/w/cpp/atomic/memory_order)

Nevertheless, I'm not 100% confident that the causality relationship between unbounded_send() and poll_next() (see code comments) really works as expected in this context. So any review with proper understanding of memory ordering implications is greatly appreciated.

If concurrency-wise this PR is OK, I will extend the same exact warning semantics on mpsc::tracing_unbounded() in a follow-up PR.

CC @nazar-pc

@dmitry-markin dmitry-markin added A0-please_review Pull request needs code review. B0-silent Changes should not be mentioned in any release notes C1-low PR touches the given topic and has a low impact on builders. labels Jan 10, 2023
@dmitry-markin dmitry-markin requested review from koute and bkchr January 10, 2023 14:54
@dmitry-markin dmitry-markin requested a review from a team January 12, 2023 10:47
@@ -52,10 +52,10 @@ use std::{
///
/// The name is used in Prometheus reports, the queue size threshold is used
/// to warn if there are too many unprocessed events in the channel.
pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) {
pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go one step further and use NonZeroUsize as 0 doesn't really make much sense as a value here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would require writing NonZeroUsize::new(100_000).unwrap() instead of 100_000 every time when passing the queue size threshold, so I'm not sure that it makes sense.

// Note that this is not the usual Release-Acquire ordering based on synchronization of
// store and load of an atomic variable, but more general application of Acquire and
// Release ordering properties.
let _ = self.queue_size.fetch_sub(1, Ordering::Release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just use SeqCst instead to be on a safe size, I tried, but I don't think I understand all the implications of other options for something other than simple load or store operation.

Copy link

@mrcnski mrcnski Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You usually can't go wrong with SeqCst for memory ordering. It's the default on most modern hardware, and pretty fast. Edit: correction based on the cppreference link: it is the default memory ordering used by modern compilers, but not all modern hardware is sequentially consistent (e.g. ARM).

However, I don't understand the larger issue here. Reading the PR and the linked comments just made me confused. @dmitry-markin can you please summarize what the issue is we are trying to solve, and what memory ordering has to do with it?

Copy link
Contributor Author

@dmitry-markin dmitry-markin Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I don't understand the larger issue here. Reading the PR and the linked comments just made me confused. @dmitry-markin can you please summarize what the issue is we are trying to solve, and what memory ordering has to do with it?

Originally we added the queue size monitoring to unbounded channels in order to detect message "leaks" — when some of the receivers are not polled, and the messages keep stacking in the channel, "leaking" memory (we observed such bugs in the past). In the original implementation of this queue threshold warning I didn't bother at all about precise queue size estimation and memory order implications, considering that +/-1 error in queue size estimation is not a big deal when we talk about 100_000 or even 100 messages warning threshold.

It turned out, though, that at least some substrate users like @nazar-pc would like to set the exact warning threshold, specifically when dealing with channels with explicit acknowledgement (in this case the queue size threshold must be exactly 1), so I created this follow-up PR that makes the queue size measurement exact.

Nevertheless, doing things right with atomics seems not that trivial, especially when we go out of typical C++/Rust memory orderings territory and start to reason on a level of loads and stores and their reordering across the barrier (mentioned only in C++ docs). As the PR comment notes, we do not use atomics here to order other memory operations across the memory barrier imposed by the atomic operation, but actually do quite the opposite. Assuming the ordering of non-atomic (or other atomic) operations relative to our atomic operations, and the causality relation between pushing a message into the channel and popping it, we derive the ordering of our atomic operations relative to each other: the increment happens before the decrement, so we never hit the underflow of usize counter.

It's worth noting that SeqCst guarantee would only help as long as mpsc channel internally also uses SeqCst, because SeqCst ordering only guarantees single order of events relative to other SeqCst operations, and not to other atomic or non-atomic operations. So we can't really rely on this, because as though true right now, the use of SeqCst is the internal implementation detail of mpsc channel that can change in the future (especially taking into account that code comments there contradict the used SeqCst ordering).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly unless someone can point out some downsides I'd probably prefer if we'd just switch to a channel that supports checking the length natively instead of jerry rigging it ourselves with atomics and potentially depend on implicit implementation details of the current channel we're using.

e.g. the channel from async-channel natively supports .len() (And I also like this crate for other reasons, e.g. it is a proper async channel which also supports sync usage, and can be switched to a bounded/unbounded variant and keep the same type. One downside it has is that it's a linked list internally so the performance can be bad, but we're using futures' channels now and it also uses a linked list, so there shouldn't really be a significant difference.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just let's do what @koute is saying and continue living our happy lives without any synchronization problems :D

@dmitry-markin dmitry-markin requested a review from a team February 15, 2023 13:52
Comment on lines +200 to 201
let queue_size = sender.queue_size.fetch_add(1, Ordering::Acquire);
if queue_size == sender.queue_size_warning && !sender.warning_fired {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs fetch_add returns the previous value, is this what is intended here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it just means queue_size_warning sets the maximum allowed queue size before emitting the warning.

@@ -81,12 +81,9 @@ pub struct Sender {
/// Name to identify the channel (e.g., in Prometheus and logs).
name: &'static str,
/// Number of events in the queue. Clone of [`Receiver::in_transit`].
Copy link

@mrcnski mrcnski Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the second half of this comment. Edit: also there's a cargo doc --document-private-items warning:

warning: unresolved link to `Receiver::in_transit`
  --> client/network/src/service/out_events.rs:83:48
   |
83 |     /// Number of events in the queue. Clone of [`Receiver::in_transit`].
   |                                                   ^^^^^^^^^^^^^^^^^^^^ the struct `Receiver` has no field or associated item named `in_transit`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there is an error in the comment. It should be Receiver::queue_size.

@dmitry-markin
Copy link
Contributor Author

Closing in favor of #13490.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
A0-please_review Pull request needs code review. B0-silent Changes should not be mentioned in any release notes C1-low PR touches the given topic and has a low impact on builders.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants