-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
protocols/streaming-response #1947
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the pull request @wngr.
I am in favor of adding a stream-oriented protocol abstraction to rust-libp2p.
whether you would be willing to accept such a dedicated crate, or whether it would make sense to fold this into request-response.
Opposite to @romanb gut feeling, with the simple API interface (request()
, respond()
, finish_response()
) you are proposing in this pull request in mind, I would expect adding stream-oriented responses to libp2p-request-response
directly not to be too intrusive, put simply, replacing the oneshot
between behaviour and handler with an mpsc channel. That said, my gut feeling also tells me that Roamn's gut feeling is usually right.
I will need to try adding stream-oriented responses to libp2p-request-response
before giving a better review here. I might get around doing so in the next couple of weeks. In case someone else has time before that, I would appreciate a pull request with a proof-of-concept.
IMHO the biggest design mismatch is the double use of the Thinking about this, I tend to come back to the same point: The inherent abstraction of the current Not sure to which conclusion that leads, just leaving my thoughts here. Maybe it's better to have some duplication than try to come up with a silver bullet.. |
Sorry for the slow response time here.
I tried patching In case there are multiple projects that could make use of streaming-response, I am happy to include it in rust-libp2p (//CC @thomaseizinger). In case there is only a single project, I would prefer for it to be maintained outside of the rust-libp2p mono-repo. It might as well be worth considering making streaming-response more generic, in the sense of allowing streaming on both the requesting and the responding side. See @romanb's suggestion in #1942 (comment):
This would allow both the communication pattern suggested in this pull request, as well as various other streaming based patterns. |
Thanks for your response nonetheless! You have an idea of an API sketch for a generic duplex streaming protocol? Wouldn't that just be an opinionated behaviour that wraps some book-keeping into a common package:
Let me know if there is more interest in this direction. If not, I will be publishing the |
Pretty much, yes. I would deem this useful as (A) a building block to build e.g.
👍
I was thinking of exposing an impl StreamingBehaviour {
fn new_stream(&mut self, peer_id: PeerId, addr: Option<Multiaddr>) -> RequestId {
// ...
}
}
impl NetworkBehaviour for StreamingBehaviour {
type OutEvent = StreamingEvent;
// ...
}
enum StreamingEvent {
NewStream {
id: RequestId,
stream: // something that implements AsyncRead + AsyncWrite and allows to be tracked to close an idle connection
}
}
Off the top of my head yes, though there might be many caveats that I am missing right now. Take the above with a grain of salt. After all, I am not currently in need of a streaming based protocol abstraction myself, thus I am having difficulties coming up with a useful abstraction. In case a more generic abstraction like the one suggested above would be useful for multiple real-world protocols, I think this is worth pursuing. If not, I think your more specific What do you think @wngr? |
Yup, I think I'm gonna give this a try. |
55d4d70
to
585ec0c
Compare
@wngr let me know once you would like another review here. |
Thanks, will do! The force push was accidental, nothing to see here .. |
NetworkBehaviour
585ec0c
to
888f153
Compare
I finally came around spending some time on this. It's pretty barebones, but I think convenient enough to be of broader use. I borrowed some things from the |
stream.write_all(b"Hello").await.unwrap(); | ||
stream.flush().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Smoke test. Direct usage of Async{Read,Write}
.
|
||
let (peer2_id, trans) = mk_transport(); | ||
let mut swarm2 = Swarm::new(trans, Streaming::<PongCodec>::default(), peer2_id); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example using a typed codec.
SwarmEvent::ConnectionClosed { peer_id, .. } => { | ||
assert_eq!(peer_id, peer1_id); | ||
assert_eq!(rx.next().await.unwrap(), 10); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example demonstrating connection keep-alive.
Thanks @wngr! Review is in progress. Hope to finish sometime later today or next week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wngr!
Couple of suggestions below. I still need to take a deeper look at the test code.
@@ -0,0 +1,306 @@ | |||
use futures::future; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of consistency, would you mind including license headers? Feel free to mention yourself. In case a file is based on libp2p-request-response
, please include the reference to Parity Technologies.
/// connection to be established. | ||
pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundStreamId; 10]>>, | ||
config: StreamingConfig, | ||
_codec: PhantomData<T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_codec: PhantomData<T>, |
Is this needed?
} | ||
|
||
fn protocol_name() -> Self::Protocol { | ||
b"/streaming/bytes/1.0.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect users to use custom protocol names, even when using IdentityCodec
. What do you think of making this an optional field on IdentityCodec
defaulting to /streaming/bytes/1.0.0
?
/// by [`NetworkBehaviour::addresses_of_peer`]. | ||
/// | ||
/// Addresses added in this way are only removed by `remove_address`. | ||
pub fn add_address(&mut self, peer: PeerId, address: Multiaddr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to this pull request: I find the user experience of first having to add an address for a peer and only then being able to open a stream and thus a connection to said peer not ideal. This is not specific to this pull request, e.g. libp2p-request-response
and libp2p-dcutr
(#2076) have the same indirection.
I am not opposed to the NetworkBehaviour::addresses_of_peer
, though I would suggest to extend NetworkBehaviourAction::Dial{Peer,Address}
, allowing one to provide a set of addresses right away, potentially later on extended by addresses retrieved through NetworkBehaviour::addresses_of_peer
from other NetworkBehaviour
implementations.
EMPTY_QUEUE_SHRINK_THRESHOLD, | ||
}; | ||
|
||
pub(crate) type RefCount = Arc<PhantomData<u8>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that the task of the handler is not woken up, when a StreamHandle
with a RefCount
is dropped? If so, a handler, if not polled due to some other event, might stay in KeepAlive::Yes
forever, even though all its streams are closed, correct?
In libp2p-relay
I have solved this with the help of a oneshot
. Would that be an option here as well?
rust-libp2p/protocols/relay/src/handler.rs
Lines 110 to 119 in 98bc5e6
/// Tracks substreams lend out to other [`RelayHandler`]s or as | |
/// [`Connection`](protocol::Connection) to the | |
/// [`RelayTransport`](crate::RelayTransport). | |
/// | |
/// For each substream to the peer of this handler, there is a future in here that resolves once | |
/// the given substream is dropped. | |
/// | |
/// Once all substreams are dropped and this handler has no other work, [`KeepAlive::Until`] can | |
/// be set, allowing the connection to be closed eventually. | |
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<()>>, |
} | ||
|
||
fn inject_disconnected(&mut self, peer: &PeerId) { | ||
self.connected.remove(peer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that inject_connection_closed
above removes a peer from connected
if no more connections exist and given that inject_disconnected
is only ever called after inject_connection_closed, this line should never be executed, correct? If so, how about at least adding a
debug_asserthere to make sure
inject_connection_closed` will properly clean up in the future.
Lines 579 to 583 in 98bc5e6
this.behaviour | |
.inject_connection_closed(&peer_id, &id, &endpoint); | |
if num_established == 0 { | |
this.behaviour.inject_disconnected(&peer_id); | |
} |
inbound_stream_id: Arc<AtomicU64>, | ||
/// A pending fatal error that results in the connection being closed. | ||
pending_error: Option<ProtocolsHandlerUpgrErr<std::convert::Infallible>>, | ||
_codec: PhantomData<T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_codec: PhantomData<T>, |
Is this needed?
} | ||
|
||
#[derive(Debug, Default)] | ||
pub struct StreamingProtocol<T: StreamingCodec> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub struct StreamingProtocol<T: StreamingCodec> { | |
pub struct StreamingProtocol<T> { |
Is this needed?
pub struct StreamingProtocol<T: StreamingCodec> { | ||
_codec: PhantomData<T>, | ||
} | ||
impl<T: StreamingCodec> StreamingProtocol<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl<T: StreamingCodec> StreamingProtocol<T> { | |
impl<T> StreamingProtocol<T> { |
Same as above.
Thanks for your feedback @mxinden, appreciate it. My focus in the past months has shifted quite a bit, so it's unlikely I will make much progress on this. Might get warmed up in the future, but no gain in keeping the PR open until then. |
Just to update the thread, the #5027 has been merged into the master branch. You can find an easier method for establishing basic connections between peers in the stream protocol example. |
this adds a
streaming-response
crate, which provides aNetworkBehaviour
similarto the
request-response
protocol to have a generic messaging mechanismbetween two peers. The most notable difference to
request-response
andthe main motivation for its implementation is to provide an open-ended
streaming mechanism for the responses: a consumer's request can result
in any number of individual response frames, until said stream is
finalized. Conceptually, it is somewhat similar to grpc streaming.
Posting this here as a draft PR for discussion, whether you would be willing
to accept such a dedicated crate, or whether it would make sense to fold this into
request-response
.Related discussions #1727 #1942