-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: use broadcast channel for event listeners #8193
Conversation
80f3deb
to
9d5d81d
Compare
check out these types we have reth/crates/metrics/src/common/mpsc.rs Lines 36 to 43 in ef01d50
reth/crates/metrics/src/common/mpsc.rs Lines 75 to 82 in ef01d50
we use them so far only for the channel we have a panel for observing this channel |
@emhane awesome thx! will check how to include something similar for the broadcast channels, the metrics can be very useful to assign the proper size to each |
a6f2112
to
16b61b9
Compare
a55828b
to
06ff479
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, you don't need a new BeaconEngineMessage
to subscribe to the broadcast stream because you clone the sender and pass it to the handle, neat
crates/node/events/src/node.rs
Outdated
/// Transforms a stream of `Result<T, BroadcastStreamRecvError>` into a stream of `NodeEvent`, | ||
/// applying a uniform error handling and conversion strategy. | ||
pub fn handle_broadcast_stream<T>( | ||
stream: impl Stream<Item = Result<T, BroadcastStreamRecvError>> + Unpin, | ||
) -> impl Stream<Item = NodeEvent> + Unpin | ||
where | ||
T: Into<NodeEvent>, | ||
{ | ||
stream.map(|result_event| { | ||
result_event | ||
.map(Into::into) | ||
.unwrap_or_else(|err| NodeEvent::Other(format!("Stream error: {:?}", err))) | ||
}) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about implementing FromIterator
here, that will work I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about implementing
FromIterator
here, that will work I think
the map is provided by streamext, and streams are not (sync) iterators, so I'm not sure Fromiterator
is the right fit here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rip, would be nice but it's sealed, hopefully soon ™️ in stable
5da034a
to
77df31b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall I think this great.
I don't think we'll have any issues with this for engine/pipeline events because those basically just for reporting and it's fine to drop some.
My main concern is the network transaction task which is more likely to drop messages but it relies on networkevents for peer tracking for example.
although 1k messages should be fine, I'd feel more comfortable if we could bump the default capacity to 2k and add a metric for when we lag in the tx task. maybe we should emit peer added/removed separately, but we should still proceed with this.
I'd also like a new function/stream variant that does not return results but rather skips the lag error, this would make the API easier in some places, ref
reth/crates/storage/provider/src/traits/chain.rs
Lines 37 to 43 in cb658ca
/// A Stream of [CanonStateNotification]. | |
#[derive(Debug)] | |
#[pin_project::pin_project] | |
pub struct CanonStateNotificationStream { | |
#[pin] | |
st: BroadcastStream<CanonStateNotification>, | |
} |
we could move this stream type to our tokio util crate
we also need this for the txpool channels which is mostlikely the most critical part because exposed over RPC.
@@ -197,7 +199,7 @@ pub struct TransactionsManager<Pool> { | |||
/// Subscriptions to all network related events. | |||
/// | |||
/// From which we get all new incoming transaction related messages. | |||
network_events: UnboundedReceiverStream<NetworkEvent>, | |||
network_events: BroadcastStream<NetworkEvent>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly concerned about this, because now we're no longer guaranteed delivery of all network events which can result in wrong peer tracking, for example session closed, although 1000 messages should be sufficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think dropping NetworkEvent::SessionEstablished
and NetworkEvent::PeerAdded
is recoverable, but not sure if dropping NetworkEvent::SessionClosed
and NetworkEvent::PeerRemoved
can lead to memory leak. depends on if all data structures that are updated accordingly are bounded.
Co-authored-by: Emilia Hane <[email protected]>
Co-authored-by: Emilia Hane <[email protected]>
4f7d39d
to
c400887
Compare
makes total sense, done ptal |
…ndle event listener on constructor
this is great! |
Co-authored-by: Emilia Hane <[email protected]>
Co-authored-by: Emilia Hane <[email protected]>
EventListeners
implements a multi producer multi consumer queue where each sent value is seen by all consumers.To achieve this
EventListeners
allocates astd::Vec
to be filled with tokio::sync::UnboundedSender every time EventListeners::new_listener is called.As every value sent via
EventListeners
is cloned to each UnboundedReceiver and the channels are unbounded this is prone to unlimited memory growth and eventual OOM attacks.To prevent this, in this PR tokio's tokio::sync::broadcast multi producer multi consumer queue is used instead.
For now the size of all the broadcast channels is set to 1000, would be good to measure how much is needed for each. Pending adding metrics as suggested in this comment #8193 (comment) will be done in a follow up.