diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 2bab2b302844..5e8db70d4512 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -71,7 +71,7 @@ num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" ordered-float = "2.10" parking_lot = "0.12" -parquet = { version = "11", features = ["arrow"] } +parquet = { version = "11", features = ["arrow", "async"] } paste = "^1.0" pin-project-lite= "^0.2.7" pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index edfe5e2cecd6..2e3fcfbff9ac 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -22,10 +22,10 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; -use futures::{stream, AsyncRead, StreamExt}; +use futures::{stream, StreamExt}; use crate::datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, + ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, }; use crate::datasource::PartitionedFile; use crate::error::{DataFusionError, Result}; @@ -72,14 +72,10 @@ impl LocalFileReader { #[async_trait] impl ObjectReader for LocalFileReader { - async fn chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { - todo!( - "implement once async file readers are available (arrow-rs#78, arrow-rs#111)" - ) + async fn chunk_reader(&self) -> Result> { + let file = tokio::fs::File::open(&self.file.path).await?; + let file = tokio::io::BufReader::new(file); + Ok(Box::new(file)) } fn sync_chunk_reader( diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index 3a9da6701700..ae9f8ca65ad3 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -28,21 +28,42 @@ use std::sync::Arc; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use futures::{AsyncRead, Stream, StreamExt}; +use futures::{Stream, StreamExt}; +use tokio::io::{AsyncBufRead, AsyncSeek}; use local::LocalFileSystem; use crate::error::{DataFusionError, Result}; +/// Provides async access to read a file, combing [`AsyncSeek`] +/// and [`AsyncBufRead`] so they can be used as a trait object +/// +/// [`AsyncSeek`] is necessary because readers may need to seek around whilst +/// reading, either because the format itself is structured (e.g. parquet) +/// or because it needs to read metadata or infer schema as an initial step +/// +/// [`AsyncBufRead`] is necessary because readers may wish to read data +/// up until some delimiter (e.g. csv or newline-delimited JSON) +/// +/// Note: the same block of data may be read multiple times +/// +/// Implementations that fetch from object storage may wish to maintain an internal +/// buffer of fetched data blocks, potentially discarding them or spilling them to disk +/// based on memory pressure +/// +/// TODO(#1614): Remove Sync +pub trait ChunkReader: AsyncBufRead + AsyncSeek + Send + Sync + Unpin {} +impl ChunkReader for T {} + /// Object Reader for one file in an object store. /// /// Note that the dynamic dispatch on the reader might /// have some performance impacts. #[async_trait] pub trait ObjectReader: Send + Sync { - /// Get reader for a part [start, start + length] in the file asynchronously - async fn chunk_reader(&self, start: u64, length: usize) - -> Result>; + /// Get a [`ChunkReader`] for the file, successive calls to this MUST + /// return readers with independent seek positions + async fn chunk_reader(&self) -> Result>; /// Get reader for a part [start, start + length] in the file fn sync_chunk_reader( diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 1a58928f23a4..ec73d1a52209 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -17,25 +17,26 @@ //! Execution plan for reading Parquet files -use futures::{StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use std::fmt; use std::fs; use std::path::Path; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::{any::Any, convert::TryInto}; -use crate::datasource::file_format::parquet::ChunkObjectReader; -use crate::datasource::object_store::ObjectStore; +use crate::datasource::object_store::{ChunkReader, ObjectStore, SizedFile}; use crate::datasource::PartitionedFile; use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::file_format::{PartitionColumnProjector, SchemaAdapter}; use crate::{ error::{DataFusionError, Result}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ file_format::FileScanConfig, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - stream::RecordBatchReceiverStream, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, @@ -50,27 +51,22 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use log::{debug, warn}; -use parquet::arrow::ArrowWriter; +use log::debug; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; use parquet::file::{ - metadata::RowGroupMetaData, reader::SerializedFileReader, - serialized_reader::ReadOptionsBuilder, statistics::Statistics as ParquetStatistics, + metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, }; use fmt::Debug; -use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; use parquet::file::properties::WriterProperties; +use tokio::task; use tokio::task::JoinHandle; -use tokio::{ - sync::mpsc::{channel, Receiver, Sender}, - task, -}; -use crate::physical_plan::file_format::SchemaAdapter; +use crate::physical_plan::RecordBatchStream; use async_trait::async_trait; - -use super::PartitionColumnProjector; +use futures::{future::BoxFuture, FutureExt}; +use parquet::arrow::async_reader::ParquetRecordBatchStream; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -210,62 +206,40 @@ impl ExecutionPlan for ParquetExec { partition_index: usize, context: Arc, ) -> Result { - // because the parquet implementation is not thread-safe, it is necessary to execute - // on a thread and communicate with channels - let (response_tx, response_rx): ( - Sender>, - Receiver>, - ) = channel(2); - - let partition = self.base_config.file_groups[partition_index].clone(); - let metrics = self.metrics.clone(); + let files = self.base_config.file_groups[partition_index].to_vec(); + let file_schema = &self.base_config.file_schema; + let projection = match self.base_config.file_column_projection_indices() { Some(proj) => proj, - None => (0..self.base_config.file_schema.fields().len()).collect(), + None => (0..file_schema.fields().len()).collect(), }; - let pruning_predicate = self.pruning_predicate.clone(); - let batch_size = context.session_config().batch_size; - let limit = self.base_config.limit; - let object_store = Arc::clone(&self.base_config.object_store); - let partition_col_proj = PartitionColumnProjector::new( + + let partition_column_projector = PartitionColumnProjector::new( Arc::clone(&self.projected_schema), &self.base_config.table_partition_cols, ); - let adapter = SchemaAdapter::new(self.base_config.file_schema.clone()); - - let join_handle = task::spawn_blocking(move || { - if let Err(e) = read_partition( - object_store.as_ref(), - adapter, - partition_index, - &partition, - metrics, - &projection, - &pruning_predicate, - batch_size, - response_tx.clone(), - limit, - partition_col_proj, - ) { - warn!( - "Parquet reader thread terminated due to error: {:?} for files: {:?}", - e, partition - ); - // Send the error back to the main thread. - // - // Ignore error sending (via `.ok()`) because that - // means the receiver has been torn down (and nothing - // cares about the errors anymore) - send_result(&response_tx, Err(e.into())).ok(); - } + let schema_adapter = SchemaAdapter::new(file_schema.clone()); + + let config = Arc::new(PartitionConfig { + projection, + schema_adapter, + projected_schema: Arc::clone(&self.projected_schema), + object_store: Arc::clone(&self.base_config.object_store), + partition_idx: partition_index, + batch_size: context.session_config().batch_size, + pruning_predicate: self.pruning_predicate.clone(), + metrics: self.metrics.clone(), }); - Ok(RecordBatchReceiverStream::create( - &self.projected_schema, - response_rx, - join_handle, - )) + Ok(Box::pin(ParquetExecStream { + partition_column_projector, + config, + files, + state: StreamState::Init, + file_idx: 0, + remaining_rows: self.base_config.limit.unwrap_or(usize::MAX), + })) } fn fmt_as( @@ -294,21 +268,127 @@ impl ExecutionPlan for ParquetExec { } } -fn send_result( - response_tx: &Sender>, - result: ArrowResult, -) -> Result<()> { - // Note this function is running on its own blockng tokio thread so blocking here is ok. - response_tx - .blocking_send(result) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; - Ok(()) +struct PartitionConfig { + projected_schema: SchemaRef, + + object_store: Arc, + + partition_idx: usize, + + batch_size: usize, + + projection: Vec, + + pruning_predicate: Option, + + schema_adapter: SchemaAdapter, + + metrics: ExecutionPlanMetricsSet, +} + +enum StreamState { + Init, + Create(BoxFuture<'static, Result>>>), + Stream(ParquetRecordBatchStream>), + Error, +} + +struct ParquetExecStream { + config: Arc, + + remaining_rows: usize, + + file_idx: usize, + + partition_column_projector: PartitionColumnProjector, + + files: Vec, + + state: StreamState, +} + +impl ParquetExecStream { + fn adapt_batch(&mut self, batch: RecordBatch) -> ArrowResult { + let adapted_batch = self + .config + .schema_adapter + .adapt_batch(batch, &self.config.projection)?; + + self.partition_column_projector + .project(adapted_batch, &self.files[self.file_idx].partition_values) + } +} + +impl RecordBatchStream for ParquetExecStream { + fn schema(&self) -> SchemaRef { + self.config.projected_schema.clone() + } +} + +impl Stream for ParquetExecStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + + if this.remaining_rows == 0 { + return Poll::Ready(None); + } + + loop { + match &mut this.state { + StreamState::Init => { + let file = match this.files.get(this.file_idx) { + Some(file) => file.file_meta.sized_file.clone(), + None => return Poll::Ready(None), + }; + + this.state = StreamState::Create( + create_stream(this.config.clone(), file).boxed(), + ); + } + StreamState::Create(f) => match futures::ready!(f.poll_unpin(cx)) { + Ok(s) => { + this.state = StreamState::Stream(s); + } + Err(e) => { + this.state = StreamState::Error; + return Poll::Ready(Some(Err(ArrowError::ExternalError( + Box::new(e), + )))); + } + }, + StreamState::Stream(s) => match futures::ready!(s.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + this.remaining_rows = + this.remaining_rows.saturating_sub(batch.num_rows()); + + return Poll::Ready(Some(self.adapt_batch(batch))); + } + Some(Err(e)) => { + this.state = StreamState::Error; + return Poll::Ready(Some(Err(ArrowError::ExternalError( + Box::new(e), + )))); + } + None => { + this.file_idx += 1; + this.state = StreamState::Init + } + }, + StreamState::Error => return Poll::Ready(None), + } + } + } } /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { - row_group_metadata: &'a RowGroupMetaData, + row_group_metadata: &'a [RowGroupMetaData], parquet_schema: &'a Schema, } @@ -341,26 +421,33 @@ macro_rules! get_statistic { // Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate macro_rules! get_min_max_values { ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ - let (column_index, field) = - if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { - (v, f) - } else { - // Named column was not present - return None; - }; + let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { + (v, f) + } else { + // Named column was not present + return None + }; let data_type = field.data_type(); // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type let null_scalar: ScalarValue = data_type.try_into().ok()?; - $self.row_group_metadata - .column(column_index) - .statistics() - .map(|stats| get_statistic!(stats, $func, $bytes_func)) - .flatten() - // column either didn't have statistics at all or didn't have min/max values - .or_else(|| Some(null_scalar.clone())) - .map(|s| s.to_array()) + let scalar_values : Vec = $self.row_group_metadata + .iter() + .flat_map(|meta| { + meta.column(column_index).statistics() + }) + .map(|stats| { + get_statistic!(stats, $func, $bytes_func) + }) + .map(|maybe_scalar| { + // column either did't have statistics at all or didn't have min/max values + maybe_scalar.unwrap_or_else(|| null_scalar.clone()) + }) + .collect(); + + // ignore errors converting to arrays (e.g. different types) + ScalarValue::iter_to_array(scalar_values).ok() }} } @@ -375,14 +462,17 @@ macro_rules! get_null_count_values { return None; }; - let value = ScalarValue::UInt64( - $self - .row_group_metadata - .column(column_index) - .statistics() - .map(|s| s.null_count()), - ); - Some(value.to_array()) + let scalar_values: Vec = $self + .row_group_metadata + .iter() + .flat_map(|meta| meta.column(column_index).statistics()) + .map(|stats| { + ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap())) + }) + .collect(); + + // ignore errors converting to arrays (e.g. different types) + ScalarValue::iter_to_array(scalar_values).ok() }}; } @@ -396,7 +486,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } fn num_containers(&self) -> usize { - 1 + self.row_group_metadata.len() } fn null_counts(&self, column: &Column) -> Option { @@ -404,119 +494,56 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } } -fn build_row_group_predicate( - pruning_predicate: &PruningPredicate, - metrics: ParquetFileMetrics, -) -> Box bool> { - let pruning_predicate = pruning_predicate.clone(); - Box::new( - move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool { - let parquet_schema = pruning_predicate.schema().as_ref(); - let pruning_stats = RowGroupPruningStatistics { - row_group_metadata, - parquet_schema, - }; - let predicate_values = pruning_predicate.prune(&pruning_stats); - match predicate_values { - Ok(values) => { - // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !*v).count(); - metrics.row_groups_pruned.add(num_pruned); - values[0] - } - // stats filter array could not be built - // return a closure which will not filter out any row groups - Err(e) => { - debug!("Error evaluating row group predicate values {}", e); - metrics.predicate_evaluation_errors.add(1); - true - } - } - }, - ) -} +async fn create_stream( + config: Arc, + file: SizedFile, +) -> Result>> { + let file_metrics = + ParquetFileMetrics::new(config.partition_idx, &file.path, &config.metrics); -#[allow(clippy::too_many_arguments)] -fn read_partition( - object_store: &dyn ObjectStore, - schema_adapter: SchemaAdapter, - partition_index: usize, - partition: &[PartitionedFile], - metrics: ExecutionPlanMetricsSet, - projection: &[usize], - pruning_predicate: &Option, - batch_size: usize, - response_tx: Sender>, - limit: Option, - mut partition_column_projector: PartitionColumnProjector, -) -> Result<()> { - let mut total_rows = 0; - 'outer: for partitioned_file in partition { - debug!("Reading file {}", &partitioned_file.file_meta.path()); - - let file_metrics = ParquetFileMetrics::new( - partition_index, - &*partitioned_file.file_meta.path(), - &metrics, - ); - let object_reader = - object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; - - let mut opt = ReadOptionsBuilder::new(); - if let Some(pruning_predicate) = pruning_predicate { - opt = opt.with_predicate(build_row_group_predicate( - pruning_predicate, - file_metrics, - )); - } - - let file_reader = SerializedFileReader::new_with_options( - ChunkObjectReader(object_reader), - opt.build(), - )?; - - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let adapted_projections = - schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?; + let object_reader = config.object_store.file_reader(file)?; + let reader = object_reader.chunk_reader().await?; - let mut batch_reader = - arrow_reader.get_record_reader_by_columns(adapted_projections, batch_size)?; - loop { - match batch_reader.next() { - Some(Ok(batch)) => { - total_rows += batch.num_rows(); - - let adapted_batch = schema_adapter.adapt_batch(batch, projection)?; + let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?; - let proj_batch = partition_column_projector - .project(adapted_batch, &partitioned_file.partition_values); + let pruning_stats = RowGroupPruningStatistics { + row_group_metadata: builder.metadata().row_groups(), + parquet_schema: builder.schema().as_ref(), + }; - send_result(&response_tx, proj_batch)?; - if limit.map(|l| total_rows >= l).unwrap_or(false) { - break 'outer; - } - } - None => { - break; - } - Some(Err(e)) => { - let err_msg = - format!("Error reading batch from {}: {}", partitioned_file, e); - // send error to operator - send_result( - &response_tx, - Err(ArrowError::ParquetError(err_msg.clone())), - )?; - // terminate thread with error - return Err(DataFusionError::Execution(err_msg)); - } + if let Some(predicate) = &config.pruning_predicate { + match predicate.prune(&pruning_stats) { + Ok(predicate_values) => { + let num_pruned = predicate_values.iter().filter(|&v| !*v).count(); + file_metrics.row_groups_pruned.add(num_pruned); + + let row_groups = predicate_values + .into_iter() + .enumerate() + .filter_map(|(idx, v)| match v { + true => Some(idx), + false => None, + }) + .collect(); + + builder = builder.with_row_groups(row_groups) + } + Err(e) => { + debug!("Error evaluating row group predicate values {}", e); + file_metrics.predicate_evaluation_errors.add(1); } } - } + }; + + let projection = config + .schema_adapter + .map_projections(builder.schema(), &config.projection)?; - // finished reading files (dropping response_tx will close - // channel) - Ok(()) + builder + .with_batch_size(config.batch_size) + .with_projection(projection) + .build() + .map_err(DataFusionError::ParquetError) } /// Executes a query and writes the results to a partitioned Parquet file. @@ -579,10 +606,12 @@ mod tests { }, physical_plan::collect, }; + use std::fs::File; + use std::io::Write; use super::*; - use crate::execution::options::CsvReadOptions; - use crate::prelude::{SessionConfig, SessionContext}; + use crate::physical_plan::execute_stream; + use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext}; use arrow::array::Float32Array; use arrow::{ array::{Int64Array, Int8Array, StringArray}, @@ -598,19 +627,12 @@ mod tests { }, schema::types::SchemaDescPtr, }; - use std::fs::File; - use std::io::Write; - use tempfile::TempDir; + use tempfile::{NamedTempFile, TempDir}; - /// writes each RecordBatch as an individual parquet file and then - /// reads it back in to the named location. - async fn round_trip_to_parquet( - batches: Vec, - projection: Option>, - schema: Option, - ) -> Result> { + // Writes the `batches` to temporary files + fn write_files(batches: Vec) -> Vec { // When vec is dropped, temp files are deleted - let files: Vec<_> = batches + batches .into_iter() .map(|batch| { let output = tempfile::NamedTempFile::new().expect("creating temp file"); @@ -626,8 +648,14 @@ mod tests { writer.close().unwrap(); output }) - .collect(); + .collect() + } + async fn make_exec( + files: &[NamedTempFile], + projection: Option>, + schema: Option, + ) -> ParquetExec { let file_names: Vec<_> = files .iter() .map(|t| t.path().to_string_lossy().to_string()) @@ -649,7 +677,7 @@ mod tests { }; // prepare the scan - let parquet_exec = ParquetExec::new( + ParquetExec::new( FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_groups: vec![file_groups], @@ -660,8 +688,18 @@ mod tests { table_partition_cols: vec![], }, None, - ); + ) + } + /// writes each RecordBatch as an individual parquet file and then + /// reads it back in to the named location. + async fn round_trip_to_parquet( + batches: Vec, + projection: Option>, + schema: Option, + ) -> Result> { + let files = write_files(batches); + let parquet_exec = make_exec(&files, projection, schema).await; let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); collect(Arc::new(parquet_exec), task_ctx).await @@ -871,12 +909,28 @@ mod tests { Field::new("c3", DataType::Int8, true), ]); - // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], None, Some(Arc::new(schema))) - .await; - assert_contains!(read.unwrap_err().to_string(), - "Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8"); + let files = write_files(vec![batch1, batch2]); + let plan = make_exec(&files, None, Some(Arc::new(schema))).await; + let session_ctx = SessionContext::new(); + let context = session_ctx.task_ctx(); + let mut stream = execute_stream(Arc::new(plan), context).await.unwrap(); + let batch = stream.next().await.unwrap().unwrap(); + + // First batch should read successfully + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &[batch]); + + // Second batch should error + let err = stream.next().await.unwrap().unwrap_err().to_string(); + assert!(err.contains("Failed to map column projection for field c3. Incompatible data types Float32 and Int8"), "{}", err); } #[tokio::test] @@ -1018,18 +1072,13 @@ mod tests { // invalid file should produce an error to that effect assert_contains!( batch.unwrap_err().to_string(), - "External error: Parquet error: Arrow: IO error" + "External error: IO error: No such file or directory" ); assert!(results.next().await.is_none()); Ok(()) } - fn parquet_file_metrics() -> ParquetFileMetrics { - let metrics = Arc::new(ExecutionPlanMetricsSet::new()); - ParquetFileMetrics::new(0, "file.parquet", &metrics) - } - #[test] fn row_group_pruning_predicate_simple_expr() -> Result<()> { use datafusion_expr::{col, lit}; @@ -1048,13 +1097,12 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); assert_eq!(row_group_filter, vec![false, true]); Ok(()) @@ -1078,13 +1126,12 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out assert_eq!(row_group_filter, vec![true, true]); @@ -1123,13 +1170,12 @@ mod tests { ], ); let row_group_metadata = vec![rgm1, rgm2]; - let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND assert_eq!(row_group_filter, vec![false, true]); @@ -1138,13 +1184,12 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); let pruning_predicate = PruningPredicate::try_new(expr, schema)?; - let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); assert_eq!(row_group_filter, vec![true, true]); Ok(()) @@ -1184,45 +1229,70 @@ mod tests { let pruning_predicate = PruningPredicate::try_new(expr, schema)?; let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); - let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); + let row_group_stats = RowGroupPruningStatistics { + row_group_metadata: &row_group_metadata, + parquet_schema: pruning_predicate.schema().as_ref(), + }; + + let row_group_filter = pruning_predicate.prune(&row_group_stats).unwrap(); // First row group was filtered out because it contains no null value on "c2". assert_eq!(row_group_filter, vec![false, true]); Ok(()) } - #[test] - fn row_group_pruning_predicate_eq_null_expr() -> Result<()> { + #[tokio::test] + async fn row_group_pruning_predicate_eq_null_expr() -> Result<()> { use datafusion_expr::{col, lit}; // test row group predicate with an unknown (Null) expr // // int > 1 and bool = NULL => c1_max > 1 and null - let expr = col("c1") + let expr = col("tinyint_col") .gt(lit(15)) - .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Boolean, false), - ])); - let pruning_predicate = PruningPredicate::try_new(expr, schema)?; - let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); + .and(col("bool_col").eq(lit(ScalarValue::Boolean(None)))); + + let session_ctx = SessionContext::new(); + let context = session_ctx.task_ctx(); + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/alltypes_plain.parquet", testdata); + + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![vec![local_unpartitioned_file(filename.clone())]], + file_schema: ParquetFormat::default() + .infer_schema(local_object_reader_stream(vec![filename])) + .await?, + statistics: Statistics::default(), + projection: Some(vec![1, 2, 3]), + limit: None, + table_partition_cols: vec![], + }, + Some(expr), + ); + assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); - let mut row_group_predicate = - build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); - let row_group_filter = row_group_metadata - .iter() - .enumerate() - .map(|(i, g)| row_group_predicate(g, i)) - .collect::>(); // no row group is filtered out because the predicate expression can't be evaluated - // when a null array is generated for a statistics column, - assert_eq!(row_group_filter, vec![true, true]); + let mut results = parquet_exec.execute(0, context).await?; + let batch = results.next().await.unwrap()?; + + assert_eq!(8, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + let schema = batch.schema(); + let field_names: Vec<&str> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!(vec!["bool_col", "tinyint_col", "smallint_col"], field_names); + + let batch = results.next().await; + assert!(batch.is_none()); + + let metrics = parquet_exec.metrics().unwrap(); + let metric = metrics + .iter() + .find(|metric| metric.value().name() == "predicate_evaluation_errors") + .unwrap(); + assert_eq!(metric.value().as_usize(), 1); Ok(()) } diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index e93b4cd2d410..b9bcf4939b9a 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -24,12 +24,13 @@ use std::{ use crate::{ datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, + ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, + ObjectStore, SizedFile, }, error::{DataFusionError, Result}, }; use async_trait::async_trait; -use futures::{stream, AsyncRead, StreamExt}; +use futures::{stream, StreamExt}; #[derive(Debug)] /// An object store implem that is useful for testing. @@ -99,11 +100,7 @@ struct EmptyObjectReader(u64); #[async_trait] impl ObjectReader for EmptyObjectReader { - async fn chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { + async fn chunk_reader(&self) -> Result> { unimplemented!() } diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs new file mode 100644 index 000000000000..f1cd4967bb0a --- /dev/null +++ b/datafusion/tests/sql.rs @@ -0,0 +1,48 @@ +use datafusion::prelude::SessionContext; +use futures::StreamExt; +use std::fs::File; +use std::io::Read; +use std::path::Path; + +#[test] +fn foo() { + let file_path = "/tmp/parquet_query_sqlfeBpC5.parquet"; + assert!(Path::new(file_path).exists(), "path not found"); + println!("Using parquet file {}", file_path); + + let mut context = SessionContext::new(); + + let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + rt.block_on(context.register_parquet("t", file_path)) + .unwrap(); + + // We read the queries from a file so they can be changed without recompiling the benchmark + let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap(); + let mut queries = String::new(); + queries_file.read_to_string(&mut queries).unwrap(); + + for query in queries.split(';') { + let query = query.trim(); + + // Remove comment lines + let query: Vec<_> = query.split('\n').filter(|x| !x.starts_with("--")).collect(); + let query = query.join(" "); + + // Ignore blank lines + if query.is_empty() { + continue; + } + + let query = query.as_str(); + + let mut context = context.clone(); + rt.block_on(async move { + let query = context.sql(query).await.unwrap(); + let mut stream = query.execute_stream().await.unwrap(); + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + assert!(batch.num_rows() != 0); + } + }) + } +}