-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cap mux simple #5577
Cap mux simple #5577
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this a lot.
This would allow us to detach a subprotocol connection via StreamClone.
compared to the "SatelliteStream" approach, this requires that the StreamClone
is spawned onto another task, right?
stream clone doesn't necessarily have to be spawned in another task than the one that the eth session is running in, but it can be. running another capability stream in the same session thread as eth works fine too. each extra protocol is an extra stream, and should be modelled after the one stream, usually |
62571da
to
013a448
Compare
013a448
to
7615d3d
Compare
7615d3d
to
d73be94
Compare
commit 703634b stores the extra conns in we may want to introduce a bound on how many extra protocol connections can be open in total. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the mux/demux sink+stream a lot.
I'm unsure about the integration in reth-network yet, imo the satellite stream would be easier to handle for this use case.
But still undecided. So to move forward I'd like to include the stream impl alone and think about reth-network integration separately.
crates/net/eth-wire/src/muxdemux.rs
Outdated
for _ in 0..self.demux.len() { | ||
if self.inner.poll_ready_unpin(cx).is_pending() { | ||
break | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should flip this:
as long as the underlying sink can accept messages, start sending buffered messages from demux
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the main stream should not be prioritised over the secondary streams? e.g. eth protocol should not be prioritised over snap protocol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commit c6c86d1 addresses this by sending at least as many messages as there are stream clones, then sending as long as their is no message received for the main stream
sgtm |
ad8bfd2
to
4bdb173
Compare
4bdb173
to
8bda850
Compare
opened a pr to address this #5640 |
cc6b446
to
48303b3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks great, I like the muxer a lot.
one nit re poll loop.
also perhaps we should prioritize sending outgoing messages and put the sink logic in its own loop? so we process as many outgoing messages as possible before reading from the wire. not sure though
/// More or less a weak clone of the stream wrapped in [`MuxDemuxer`] but the bytes belonging to | ||
/// other capabilities have been filtered out. | ||
#[derive(Debug)] | ||
pub struct StreamClone { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsure about the naming yet,
I believe you had WeakStream
or something.
perhaps WeakCapabilityStream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WeakStreamClone
initially, 'weak' since it is some kind of reference to the stream but doesn't own it. if not stream clone then the naming I like the most is ProtocolProxy
from multiplex.rs. MuxDemuxer
or MuxDemuxStream
is then the proxy server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I added ManuallyDrop wrapper, then I removed it for simplicity since anyway method of trait CanDisconnet should be used before drop. Now it's kind of a weak clone (of owning ref MuxDemuxStream) because MuxDemuxStream can be dropped when there are still StreamClones using the pointed to MuxDemuxer but it will prevent disconnecting if method from CanDisconnect trait is used.
ea6657b
to
65aa041
Compare
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
Signed-off-by: Emilia Hane <[email protected]>
751d0f9
to
884bfe0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great!
I'm very happy with how this turned out.
injected user defined sub protocol handles span up a new attack surface. to what extent do we want to account for that? for example, if a sub protocol has a DoS vulnerability so that there are always subprotocol messages to send then the main protocol won't receive any messages if we don't implement poll_next like on MuxDemuxStream in this pr.
I think it should be the protocol's impl to handle spam, punish peer for sending bad/spammy messages, like eth also does
an alternative approach to the
RlpxSatelliteStream
. a simplification of #2981.contrary to
RlpxSatelliteStream
,MuxDemuxStream
is wrapped byEthStream
. this means existing implementation of a session is unchanged.the equivalent to
ProtocolProxy
isStreamClone
, althoughStreamClone
is wrapped by a capability stream e.g.SnapStream<StreamClone>
.functionality that is removed from #2981
MuxDemuxStream
(the main stream).