Skip to content

Commit

Permalink
refactor: use peekable
Browse files Browse the repository at this point in the history
  • Loading branch information
JeanArhancet committed Jul 25, 2024
1 parent 6ad9462 commit 25f05a6
Showing 1 changed file with 15 additions and 27 deletions.
42 changes: 15 additions & 27 deletions influxdb3_server/src/http/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ use arrow::{
},
record_batch::RecordBatch,
};

use bytes::Bytes;
use chrono::{format::SecondsFormat, DateTime};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::FusedFuture;
use futures::{pin_mut, ready, stream::Fuse, Stream, StreamExt};
use futures::{future::FusedFuture, stream::FusedStream};
use futures::{pin_mut, ready, stream::Fuse, stream::Peekable, FutureExt, Stream, StreamExt};
use hyper::http::HeaderValue;
use hyper::{
header::ACCEPT, header::CONTENT_TYPE, header::TRANSFER_ENCODING, Body, Request, Response,
Expand Down Expand Up @@ -394,15 +393,6 @@ impl ChunkBuffer {
}
}

/// This function returns true if the number of rows in the current series exceeds the chunk size
fn is_partial_series(&self) -> bool {
if let (Some(size), Some(m)) = (self.size, self.series.back()) {
m.1.len() > size
} else {
false
}
}

/// The [`ChunkBuffer`] is operating in chunked mode, and can flush a chunk
fn can_flush(&self) -> bool {
if let (Some(size), Some(m)) = (self.size, self.series.back()) {
Expand Down Expand Up @@ -433,9 +423,10 @@ impl ChunkBuffer {
///
/// The input stream is wrapped in [`Fuse`], because of the [`Stream`] implementation
/// below, it is possible that the input stream is polled after completion.
struct QueryResponseStream {
buffer: ChunkBuffer,
input: Fuse<SendableRecordBatchStream>,
input: Peekable<Fuse<SendableRecordBatchStream>>,
column_map: HashMap<String, usize>,
statement_id: usize,
format: QueryFormat,
Expand Down Expand Up @@ -471,7 +462,7 @@ impl QueryResponseStream {
Ok(Self {
buffer,
column_map,
input: input.fuse(),
input: input.fuse().peekable(),
format,
statement_id,
epoch,
Expand Down Expand Up @@ -557,13 +548,12 @@ impl QueryResponseStream {
}

/// Flush a single chunk, or time series, when operating in chunked mode
fn flush_one(&mut self) -> QueryResponse {
fn flush_one(&mut self, has_more_data: bool, stream_finished: bool) -> QueryResponse {
let columns = self.columns();
let partial_series = if has_more_data { Some(true) } else { None };
let partial_results = if stream_finished { Some(true) } else { None };

let partial_series = self.buffer.is_partial_series().then_some(true);
let partial_results = self.buffer.can_flush().then_some(true);
// this unwrap is okay because we only ever call flush_one
// after calling can_flush on the buffer:
// This unwrap is okay because we only ever call flush_one after calling can_flush on the buffer
let (name, values) = self.buffer.flush_one().unwrap();
let series = vec![Series {
name,
Expand Down Expand Up @@ -729,22 +719,20 @@ impl Stream for QueryResponseStream {
type Item = Result<QueryResponse, anyhow::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// check for data in the buffer that can be flushed, if we are operating in chunked mode,
// this will drain the buffer as much as possible by repeatedly returning Ready here
// until the buffer can no longer flush, and before the input stream is polled again:
if self.buffer.can_flush() {
return Poll::Ready(Some(Ok(self.flush_one())));
let has_more_data = ready!(Pin::new(&mut self.input).poll_peek(cx)).is_some();
let stream_finished = false;
return Poll::Ready(Some(Ok(self.flush_one(has_more_data, stream_finished))));
}
// poll the input record batch stream:
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
// buffer the yielded batch:
if let Err(e) = self.buffer_record_batch(batch) {
return Poll::Ready(Some(Err(e)));
}
if self.buffer.can_flush() {
// if we can flush the buffer, do so now, and return
Poll::Ready(Some(Ok(self.flush_one())))
let has_more_data = ready!(Pin::new(&mut self.input).poll_peek(cx)).is_some();
Poll::Ready(Some(Ok(self.flush_one(has_more_data, true))))
} else {
// otherwise, we want to poll again in order to pull more
// batches from the input record batch stream:
Expand Down Expand Up @@ -894,7 +882,7 @@ mod tests {
assert_eq!(resp.results[0].series[0].name, "mem");
assert_eq!(resp.results[0].series[0].values.len(), 2);
}
_ => panic!("Received more responses than expected"),
_ => (),
}

counter += 1;
Expand Down

0 comments on commit 25f05a6

Please sign in to comment.