-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prototype message patterns: RecvSend
, SendRecv
and Send
#5
base: master
Are you sure you want to change the base?
Conversation
a34b5c8
to
bd8ac90
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I think about the pattern and what this seems to essentially provide is this (pseudo-code):
fn read_write_pair<R, W, C>(read_half: R, write_half: W, codec: C)
-> impl Stream<Item = (Request, Sender<Response>)>
where
R: Stream,
W: Sink,
C: Encode + Decode,
{
read_half.map(|request| {
let (tx, rx) = oneshot();
tokio::spawn(async {
let response = rx.recv().await;
write_half.send(codec.encode(response)).await;
});
(codec.decode(request), tx)
)
}
I might be completely wrong, but there doesn't seem to be much documentation explaining what all this is and what problem it tries to solve. It would also be great to see explanation of design choices and why one might want to use this library. This comment isn't just about the pull request, but rather about the whole asynchronous-codec
, I look at it and I just don't get why and when would I use it in my own project if I had one where it is applicable.
tests/reqres.rs
Outdated
let mut buffer = Vec::new(); | ||
buffer.extend_from_slice(b"hello\n"); | ||
|
||
let mut stream = RecvSend::new(Cursor::new(&mut buffer), LinesCodec).close_after_send(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing to me. It seems to use buffer
for both reading a messages from and writing message to. It'll be a more complex data structure in practice though where data flow for reads and writes is separate, right?
If so, why not having two explicit arguments: source and sink?
UPD: I see it comes from Framed
, whose usefulness and reason for combining both source/stream and sink I don't get either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer is meant to simulate a network stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I just found example counter-intuitive because TCP stream (as an example) is duplex, but reads and writes are accumulated on different sides rather than being concatenated in the same vector. Composing duplex stream from separate read and write buffers would be more intuitive, but it would also be much more code.
I guess what I anticipated is some comment about why we read and write to the same slice of memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you are saying. Unfortunately, I am yet to find a good test utility that plugs neatly into AsyncRead
+ AsyncWrite
. Might need to write one ...
tests/reqres.rs
Outdated
|
||
assert_eq!(req, "hello\n"); | ||
|
||
let (rx, tx) = oneshot::channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tx
usually means transmitting, meaning sending. Names are flipped here.
Sorry for the lack of documentation! The What this PR adds are "message patterns" on top of a framed stream. So far only one is added but it is easy to envision more. I've started with Your pseudo-code is on point. My vision is that by implementing The item returned from The design goals for this were:
|
Okay, that is really helpful, I think I have a much better understanding of it now, thanks! |
Sent some tweaks for this branch in thomaseizinger#1 |
@mxinden I keep coming back to wanting this almost every time I touch any protocol code. Are you in favor of this? |
We would use it in combination with a |
I am hesitant: I don't think the usage of All that said, the abstraction might be worth the confusion that comes with it. It seems to me that you feel strongly about this new abstraction. Which of our rust-libp2p protocols would benefit the most of this pattern? Do you want to integrate this into one of them as a proof-of-concept? |
I think of it as a source of events of a protocol.
Every one that sends messages. The idea has the potential to replace every state machine for sending messages in |
RecvSend
implementationRecvSend
, SendRecv
and Send
Here you go: libp2p/rust-libp2p#3610 |
Code duplication in here is currently horrendous but that can be fixed later. |
I've written about this - to some extent - here: libp2p/rust-libp2p#3603 The usage of If we had a generator interface, I'd use that! |
I might have a go at re-writing these using |
It doesn't really work very well because Rust doesn't have native generators yet. |
Whilst thinking more about libp2p/rust-libp2p#3130 and the PoC I made in https://github.com/thomaseizinger/rust-async-response-stream, I realized that this could actually be part of
asynchronous-codec
.It could be implemented on top but it is also fairly intertwined with the idea of
Framed
. A good data point here is that we don't need any additional dependencies. All we are doing is defining aStream
utility on top ofFramed
that makes it easier to implement a request-response message pattern.The design has slightly evolved from https://github.com/thomaseizinger/rust-async-response-stream. Instead of providing two
Future
s, we now just implementStream
. For the easy usecase where we close the stream after sending the response, we only ever output oneItem
: The incoming request and the placeholder for the response. Because it is aStream
, aSelectAll
will continue to poll it until it returnsNone
. This allows us to completely hide the message exchange and makes using this really easy:SelectAll
.SelectAll
.I also removed the timeout, I think this should be implemented on top.