Skip to content

Commit

Permalink
refactor: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum committed Aug 25, 2023
1 parent 912861e commit 74ed1df
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
4 changes: 2 additions & 2 deletions arrow-flight/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::task::Poll;

use crate::{
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient,
trailers::extract_trailers, Action, ActionType, Criteria, Empty, FlightData,
trailers::extract_lazy_trailers, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, PutResult, Ticket,
};
use arrow_schema::Schema;
Expand Down Expand Up @@ -205,7 +205,7 @@ impl FlightClient {
let request = self.make_request(ticket);

let (md, response_stream, _ext) = self.inner.do_get(request).await?.into_parts();
let (response_stream, trailers) = extract_trailers(response_stream);
let (response_stream, trailers) = extract_lazy_trailers(response_stream);

Ok(FlightRecordBatchStream::new_from_flight_data(
response_stream.map_err(FlightError::Tonic),
Expand Down
9 changes: 7 additions & 2 deletions arrow-flight/src/trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ use std::{
use futures::{ready, FutureExt, Stream, StreamExt};
use tonic::{metadata::MetadataMap, Status, Streaming};

/// Extract trailers from [`Streaming`] [tonic] response.
pub fn extract_trailers<T>(s: Streaming<T>) -> (ExtractTrailersStream<T>, LazyTrailers) {
/// Extract [`LazyTrailers`] from [`Streaming`] [tonic] response.
///
/// Note that [`LazyTrailers`] has inner mutability and will only hold actual data after [`ExtractTrailersStream`] is
/// fully consumed (dropping it is not required though).
pub fn extract_lazy_trailers<T>(
s: Streaming<T>,
) -> (ExtractTrailersStream<T>, LazyTrailers) {
let trailers: SharedTrailers = Default::default();
let stream = ExtractTrailersStream {
inner: s,
Expand Down
3 changes: 3 additions & 0 deletions arrow-flight/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ async fn test_do_get() {
"some_val",
);

// trailers are not available before stream exhaustion
assert!(response_stream.trailers().is_none());

let expected_response = vec![batch];
let response: Vec<_> = (&mut response_stream)
.try_collect()
Expand Down

0 comments on commit 74ed1df

Please sign in to comment.