diff --git a/datafusion/core/src/datasource/data_source.rs b/datafusion/core/src/datasource/data_source.rs index be7570a20527..602c98eddb1a 100644 --- a/datafusion/core/src/datasource/data_source.rs +++ b/datafusion/core/src/datasource/data_source.rs @@ -31,13 +31,12 @@ use crate::datasource::physical_plan::{ }; use arrow_schema::SchemaRef; -use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; -use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::source::{DataSource, DataSourceExec}; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, @@ -63,6 +62,12 @@ pub trait FileSource: Send + Sync { fn with_schema(&self, schema: SchemaRef) -> Arc; /// Initialize new instance with projection information fn with_projection(&self, config: &FileScanConfig) -> Arc; + /// Initialize new instance with projected statistics + fn with_statistics(&self, statistics: Statistics) -> Arc; + /// Return execution plan metrics + fn metrics(&self) -> &ExecutionPlanMetricsSet; + /// Return projected statistics + fn statistics(&self) -> datafusion_common::Result; } /// Holds generic file configuration, and common behaviors for file sources. @@ -72,9 +77,6 @@ pub trait FileSource: Send + Sync { pub struct FileSourceConfig { source: Arc, base_config: FileScanConfig, - metrics: ExecutionPlanMetricsSet, - projected_statistics: Statistics, - cache: PlanProperties, } impl FileSourceConfig { @@ -87,36 +89,19 @@ impl FileSourceConfig { Arc::new(DataSourceExec::new(source)) } - /// Initialize a new `FileSourceConfig` instance with metrics, cache, and statistics. + /// Initialize a new `FileSourceConfig` instance. pub fn new(base_config: FileScanConfig, file_source: Arc) -> Self { let ( - projected_schema, - constraints, + _projected_schema, + _constraints, projected_statistics, - projected_output_ordering, + _projected_output_ordering, ) = base_config.project(); - let cache = Self::compute_properties( - Arc::clone(&projected_schema), - &projected_output_ordering, - constraints, - &base_config, - ); - let mut metrics = ExecutionPlanMetricsSet::new(); - - #[cfg(feature = "parquet")] - if let Some(parquet_config) = file_source.as_any().downcast_ref::() - { - metrics = parquet_config.metrics(); - let _predicate_creation_errors = MetricBuilder::new(&metrics) - .global_counter("num_predicate_creation_errors"); - }; + let file_source = file_source.with_statistics(projected_statistics); Self { source: file_source, base_config, - metrics, - projected_statistics, - cache, } } @@ -152,11 +137,6 @@ impl FileSourceConfig { &self.source } - /// Returns the `PlanProperties` of the plan - pub(crate) fn cache(&self) -> PlanProperties { - self.cache.clone() - } - fn compute_properties( schema: SchemaRef, orderings: &[LexOrdering], @@ -181,9 +161,6 @@ impl FileSourceConfig { fn with_file_groups(mut self, file_groups: Vec>) -> Self { self.base_config.file_groups = file_groups; - // Changing file groups may invalidate output partitioning. Update it also - let output_partitioning = Self::output_partitioning_helper(&self.base_config); - self.cache = self.cache.with_partitioning(output_partitioning); self } @@ -214,7 +191,7 @@ impl DataSource for FileSourceConfig { source.create_file_opener(object_store, &self.base_config, partition)?; let stream = - FileStream::new(&self.base_config, partition, opener, &self.metrics)?; + FileStream::new(&self.base_config, partition, opener, source.metrics())?; Ok(Box::pin(stream)) } @@ -268,54 +245,36 @@ impl DataSource for FileSourceConfig { fn repartitioned( &self, target_partitions: usize, - config: &ConfigOptions, + repartition_file_min_size: usize, exec: DataSourceExec, ) -> datafusion_common::Result>> { if !self.supports_repartition() { return Ok(None); } - let repartition_file_min_size = config.optimizer.repartition_file_min_size; let repartitioned_file_groups_option = FileGroupPartitioner::new() .with_target_partitions(target_partitions) .with_repartition_file_min_size(repartition_file_min_size) - .with_preserve_order_within_groups(self.cache().output_ordering().is_some()) + .with_preserve_order_within_groups( + exec.properties().output_ordering().is_some(), + ) .repartition_file_groups(&self.base_config.file_groups); if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { - let plan = Arc::new(exec.with_source(Arc::new( - self.clone().with_file_groups(repartitioned_file_groups), - ))); - return Ok(Some(plan)); + let source = self.clone().with_file_groups(repartitioned_file_groups); + let output_partitioning = + Self::output_partitioning_helper(&source.base_config); + let plan = exec + .with_source(Arc::new(source)) + // Changing file groups may invalidate output partitioning. Update it also + .with_partitioning(output_partitioning); + return Ok(Some(Arc::new(plan))); } Ok(None) } fn statistics(&self) -> datafusion_common::Result { - #[cfg(not(feature = "parquet"))] - let stats = self.projected_statistics.clone(); - - #[cfg(feature = "parquet")] - let stats = if let Some(parquet_config) = - self.source.as_any().downcast_ref::() - { - // When filters are pushed down, we have no way of knowing the exact statistics. - // Note that pruning predicate is also a kind of filter pushdown. - // (bloom filters use `pruning_predicate` too) - if parquet_config.pruning_predicate().is_some() - || parquet_config.page_pruning_predicate().is_some() - || (parquet_config.predicate().is_some() - && parquet_config.pushdown_filters()) - { - self.projected_statistics.clone().to_inexact() - } else { - self.projected_statistics.clone() - } - } else { - self.projected_statistics.clone() - }; - - Ok(stats) + self.source.statistics() } fn with_fetch(&self, limit: Option) -> Option> { @@ -323,9 +282,6 @@ impl DataSource for FileSourceConfig { Some(Arc::new(Self { source: Arc::clone(&self.source), base_config: config, - metrics: self.metrics.clone(), - projected_statistics: self.projected_statistics.clone(), - cache: self.cache(), })) } @@ -334,10 +290,17 @@ impl DataSource for FileSourceConfig { } fn metrics(&self) -> ExecutionPlanMetricsSet { - self.metrics.clone() + self.source.metrics().clone() } fn properties(&self) -> PlanProperties { - self.cache() + let (projected_schema, constraints, _, projected_output_ordering) = + self.base_config.project(); + Self::compute_properties( + Arc::clone(&projected_schema), + &projected_output_ordering, + constraints, + &self.base_config, + ) } } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 6fb086ee2611..0d034e8a0d1f 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -173,7 +173,10 @@ impl FileFormat for ArrowFormat { conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { - Ok(FileSourceConfig::new_exec(conf, Arc::new(ArrowConfig {}))) + Ok(FileSourceConfig::new_exec( + conf, + Arc::new(ArrowConfig::default()), + )) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 2ea65a0ce7ff..785b3f8a5703 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -29,6 +29,9 @@ use crate::error::Result; use arrow::buffer::Buffer; use arrow_ipc::reader::FileDecoder; use arrow_schema::SchemaRef; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + +use datafusion_common::Statistics; use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; @@ -36,7 +39,10 @@ use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; /// Arrow configuration struct that is given to DataSourceExec /// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow #[derive(Clone, Default)] -pub struct ArrowConfig {} +pub struct ArrowConfig { + metrics: ExecutionPlanMetricsSet, + projected_statistics: Option, +} impl FileSource for ArrowConfig { fn create_file_opener( @@ -56,15 +62,31 @@ impl FileSource for ArrowConfig { } fn with_batch_size(&self, _batch_size: usize) -> Arc { - Arc::new(ArrowConfig::default()) + Arc::new(Self { ..self.clone() }) } fn with_schema(&self, _schema: SchemaRef) -> Arc { - Arc::new(ArrowConfig::default()) + Arc::new(Self { ..self.clone() }) + } + fn with_statistics(&self, statistics: Statistics) -> Arc { + let mut conf = self.clone(); + conf.projected_statistics = Some(statistics); + Arc::new(conf) } fn with_projection(&self, _config: &FileScanConfig) -> Arc { - Arc::new(ArrowConfig::default()) + Arc::new(Self { ..self.clone() }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> Result { + let statistics = &self.projected_statistics; + Ok(statistics + .clone() + .expect("projected_statistics must be set")) } } diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index a053296ce296..e071e10583b2 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -28,15 +28,18 @@ use crate::error::Result; use arrow::datatypes::SchemaRef; +use datafusion_common::Statistics; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStore; -// TODO projected_constraints /// AvroConfig holds the extra configuration that is necessary for opening avro files #[derive(Clone, Default)] pub struct AvroConfig { schema: Option, batch_size: Option, projection: Option>, + metrics: ExecutionPlanMetricsSet, + projected_statistics: Option, } impl AvroConfig { @@ -95,12 +98,28 @@ impl FileSource for AvroConfig { conf.schema = Some(schema); Arc::new(conf) } + fn with_statistics(&self, statistics: Statistics) -> Arc { + let mut conf = self.clone(); + conf.projected_statistics = Some(statistics); + Arc::new(conf) + } fn with_projection(&self, config: &FileScanConfig) -> Arc { let mut conf = self.clone(); conf.projection = config.projected_file_column_names(); Arc::new(conf) } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> Result { + let statistics = &self.projected_statistics; + Ok(statistics + .clone() + .expect("projected_statistics must be set")) + } } #[cfg(feature = "avro")] diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 545504f34800..d02b7e9a3516 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -34,7 +34,9 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use arrow::csv; use arrow::datatypes::SchemaRef; +use datafusion_common::Statistics; use datafusion_execution::TaskContext; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; @@ -73,7 +75,7 @@ use tokio::task::JoinSet; /// )); /// let exec = FileSourceConfig::new_exec(file_scan_config, source_config); /// ``` -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct CsvConfig { batch_size: Option, file_schema: Option, @@ -84,21 +86,18 @@ pub struct CsvConfig { terminator: Option, escape: Option, comment: Option, + metrics: ExecutionPlanMetricsSet, + projected_statistics: Option, } impl CsvConfig { /// Returns a [`CsvConfig`] pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self { Self { - batch_size: None, - file_schema: None, - file_projection: None, has_header, delimiter, quote, - terminator: None, - escape: None, - comment: None, + ..Self::default() } } @@ -240,11 +239,27 @@ impl FileSource for CsvConfig { Arc::new(conf) } + fn with_statistics(&self, statistics: Statistics) -> Arc { + let mut conf = self.clone(); + conf.projected_statistics = Some(statistics); + Arc::new(conf) + } + fn with_projection(&self, config: &FileScanConfig) -> Arc { let mut conf = self.clone(); conf.file_projection = config.file_column_projection_indices(); Arc::new(conf) } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + fn statistics(&self) -> Result { + let statistics = &self.projected_statistics; + Ok(statistics + .clone() + .expect("projected_statistics must be set")) + } } impl FileOpener for CsvOpener { diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index a12aa76c437b..3aaa01b182cd 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -35,6 +35,8 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_execution::TaskContext; +use datafusion_common::Statistics; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; @@ -71,6 +73,8 @@ impl JsonOpener { #[derive(Clone, Default)] pub struct JsonConfig { batch_size: Option, + metrics: ExecutionPlanMetricsSet, + projected_statistics: Option, } impl JsonConfig { @@ -108,11 +112,27 @@ impl FileSource for JsonConfig { } fn with_schema(&self, _schema: SchemaRef) -> Arc { - Arc::new(Self { ..*self }) + Arc::new(Self { ..self.clone() }) + } + fn with_statistics(&self, statistics: Statistics) -> Arc { + let mut conf = self.clone(); + conf.projected_statistics = Some(statistics); + Arc::new(conf) } fn with_projection(&self, _config: &FileScanConfig) -> Arc { - Arc::new(Self { ..*self }) + Arc::new(Self { ..self.clone() }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> Result { + let statistics = &self.projected_statistics; + Ok(statistics + .clone() + .expect("projected_statistics must be set to call")) } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index f0bff9f0ca1b..b45585d09e9d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -51,6 +51,7 @@ use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; +use datafusion_common::Statistics; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; @@ -292,6 +293,7 @@ pub struct ParquetConfig { batch_size: Option, /// Optional hint for the size of the parquet metadata metadata_size_hint: Option, + projected_statistics: Option, } impl ParquetConfig { @@ -467,11 +469,6 @@ impl ParquetConfig { self.table_parquet_options.global.pushdown_filters } - /// Return metrics - pub(crate) fn metrics(&self) -> ExecutionPlanMetricsSet { - self.metrics.clone() - } - /// If true, the `RowFilter` made by `pushdown_filters` may try to /// minimize the cost of filter evaluation by reordering the /// predicate [`Expr`]s. If false, the predicates are applied in @@ -561,7 +558,7 @@ impl FileSource for ParquetConfig { page_pruning_predicate: self.page_pruning_predicate.clone(), table_schema: Arc::clone(&base_config.file_schema), metadata_size_hint: self.metadata_size_hint, - metrics: self.metrics(), + metrics: self.metrics().clone(), parquet_file_reader_factory, pushdown_filters: self.pushdown_filters(), reorder_filters: self.reorder_filters(), @@ -585,9 +582,37 @@ impl FileSource for ParquetConfig { Arc::new(Self { ..self.clone() }) } + fn with_statistics(&self, statistics: Statistics) -> Arc { + let mut conf = self.clone(); + conf.projected_statistics = Some(statistics); + Arc::new(conf) + } + fn with_projection(&self, _config: &FileScanConfig) -> Arc { Arc::new(Self { ..self.clone() }) } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> Result { + let statistics = &self.projected_statistics; + let statistics = statistics + .clone() + .expect("projected_statistics must be set"); + // When filters are pushed down, we have no way of knowing the exact statistics. + // Note that pruning predicate is also a kind of filter pushdown. + // (bloom filters use `pruning_predicate` too) + if self.pruning_predicate().is_some() + || self.page_pruning_predicate().is_some() + || (self.predicate().is_some() && self.pushdown_filters()) + { + Ok(statistics.to_inexact()) + } else { + Ok(statistics) + } + } } fn should_enable_page_index( diff --git a/datafusion/physical-plan/src/source.rs b/datafusion/physical-plan/src/source.rs index 8a3ed089778f..5ddc9d66b808 100644 --- a/datafusion/physical-plan/src/source.rs +++ b/datafusion/physical-plan/src/source.rs @@ -26,6 +26,7 @@ use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::Partitioning; /// Common behaviors in Data Sources for both from Files and Memory. /// See `DataSourceExec` for physical plan implementation @@ -40,7 +41,7 @@ pub trait DataSource: Send + Sync { fn repartitioned( &self, _target_partitions: usize, - _config: &ConfigOptions, + _repartition_file_min_size: usize, _exec: DataSourceExec, ) -> datafusion_common::Result>> { Ok(None) @@ -107,8 +108,11 @@ impl ExecutionPlan for DataSourceExec { target_partitions: usize, config: &ConfigOptions, ) -> datafusion_common::Result>> { - self.source - .repartitioned(target_partitions, config, self.clone()) + self.source.repartitioned( + target_partitions, + config.optimizer.repartition_file_min_size, + self.clone(), + ) } fn execute( @@ -130,7 +134,7 @@ impl ExecutionPlan for DataSourceExec { fn with_fetch(&self, limit: Option) -> Option> { let mut source = Arc::clone(&self.source); source = source.with_fetch(limit)?; - let cache = source.properties().clone(); + let cache = self.cache.clone(); Some(Arc::new(Self { source, cache })) } @@ -163,4 +167,10 @@ impl DataSourceExec { self.cache = self.cache.with_constraints(constraints); self } + + /// Assign output partitioning + pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self { + self.cache = self.cache.with_partitioning(partitioning); + self + } }