-
Notifications
You must be signed in to change notification settings - Fork 842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement fallible streams for FlightClient::do_put
#3464
Conversation
arrow-flight/src/client.rs
Outdated
done: bool, | ||
} | ||
|
||
impl SplitStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of splitting - this stream can stop whenever the first error appears and keep that first error if any, so it can be retrieved.
arrow-flight/src/client.rs
Outdated
} | ||
|
||
/// returns only the OK responses from a stream of results | ||
struct SplitStreamErr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct FallibleStream {
input_stream: BoxStream<'static, Result<FlightData>>,
err: Option<FlightError>,
}
impl Stream for FallibleStream {
type Item = FlightData;
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
match self.input_stream.poll_next_unpin(cx) {
std::task::Poll::Ready(res) => match res {
Some(data) => match data {
Ok(ok) => std::task::Poll::Ready(Some(ok)),
Err(e) => {
self.err = Some(e);
std::task::Poll::Ready(None)
},
} ,
None => std::task::Poll::Ready(None),
},
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense @Dandandan 👍 thank you for the suggestion
|
||
// input stream to client sends good FlightData followed by an error | ||
let input_flight_data = test_flight_data().await; | ||
let input_stream = futures::stream::iter(input_flight_data.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added these two tests to show what is going on -- aka that the error provided to the client do_put
call need to get to the returned stream even though the error did not come from the server
arrow-flight/src/client.rs
Outdated
/// The setup of copying errors to result stream looks like this: | ||
/// | ||
/// ```text | ||
/// input: ---> (Stream of Result<FlightData>) ---- (Stream of FlightData) ---- network ----> Server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made some ascii art to explain what is happening here
I implemented the approach that @Dandandan suggested. I like it much better as I think it is clearer of what is going on. I also added some ASCII art and tests. @viirya and @tustvold are you ok with this solution? If so I will apply the same treatment to do_exchange as well |
Any thoughts @tustvold ? Is this approach ok with you? |
I intend to review this tomorrow, I think there might be a way to make it simpler, making use of a oneshot instead of a mutex, or something |
I took the liberty of pushing a simplified version of this in 914948b PTAL |
Well, it is less code, though also less comments 🤷 Seems good to me other than the possibly the Thank you |
Benchmark runs are scheduled for baseline = 47e4b61 and contender = 0373a9d. 0373a9d is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
NOTE
This is a WIP -- I want feedback on the approach before I spent more time polishing it upWhich issue does this PR close?
Part of #3462
Rationale for this change
Support sending a stream of
Results
to a server, aborting the connection if the input stream has an errorThis is very likely what would happen if you had a execution system producing RecordBatches
What changes are included in this PR?
FlightClient::do_put
to accept a stream ofResult
Planned follow on PR
do_exchange
Are there any user-facing changes?
signature is changed