diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 6f354b31ae878..b3b02c919f154 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -21,7 +21,6 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. -use std::collections::VecDeque; use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; @@ -38,12 +37,15 @@ use crate::physical_plan::RecordBatchStream; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; +use chrono::TimeZone; use datafusion_common::instant::Instant; use datafusion_common::ScalarValue; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; +use object_store::path::Path; +use object_store::ObjectMeta; /// A fallible future that resolves to a stream of [`RecordBatch`] pub type FileOpenFuture = @@ -76,7 +78,8 @@ pub trait FileOpener: Unpin { /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { /// An iterator over input files. - file_iter: VecDeque, + files: Vec, + cur_file_idx: usize, /// The stream schema (file schema including partition columns and after /// projection). projected_schema: SchemaRef, @@ -263,7 +266,8 @@ impl FileStream { let files = config.file_groups[partition].clone(); Ok(Self { - file_iter: files.into(), + files, + cur_file_idx: 0, projected_schema, remain: config.limit, file_opener, @@ -289,18 +293,36 @@ impl FileStream { /// Since file opening is mostly IO (and may involve a /// bunch of sequential IO), it can be parallelized with decoding. fn start_next_file(&mut self) -> Option)>> { - let part_file = self.file_iter.pop_front()?; + if self.cur_file_idx == self.files.len() { + return None; + } + + let part_file = &mut self.files[self.cur_file_idx]; + self.cur_file_idx += 1; + + let object_meta = mem::replace( + &mut part_file.object_meta, + ObjectMeta { + location: Path::default(), + last_modified: chrono::Utc.timestamp_nanos(0), + size: 0, + e_tag: None, + version: None, + }, + ); + + let partition_values = mem::take(&mut part_file.partition_values); let file_meta = FileMeta { - object_meta: part_file.object_meta, - range: part_file.range, - extensions: part_file.extensions, + object_meta, + range: part_file.range.clone(), + extensions: part_file.extensions.clone(), }; Some( self.file_opener .open(file_meta) - .map(|future| (future, part_file.partition_values)), + .map(|future| (future, partition_values)), ) }