Skip to content

Commit

Permalink
make metrics and statistics a part of File type specific configurations
Browse files Browse the repository at this point in the history
make cache a part of DataSourceExec
  • Loading branch information
mertak-synnada committed Jan 20, 2025
1 parent 91110cc commit 6c76b3f
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 97 deletions.
107 changes: 35 additions & 72 deletions datafusion/core/src/datasource/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ use crate::datasource::physical_plan::{
};

use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::source::{DataSource, DataSourceExec};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
Expand All @@ -63,6 +62,12 @@ pub trait FileSource: Send + Sync {
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
/// Initialize new instance with projection information
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
/// Initialize new instance with projected statistics
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;
/// Return projected statistics
fn statistics(&self) -> datafusion_common::Result<Statistics>;
}

/// Holds generic file configuration, and common behaviors for file sources.
Expand All @@ -72,9 +77,6 @@ pub trait FileSource: Send + Sync {
pub struct FileSourceConfig {
source: Arc<dyn FileSource>,
base_config: FileScanConfig,
metrics: ExecutionPlanMetricsSet,
projected_statistics: Statistics,
cache: PlanProperties,
}

impl FileSourceConfig {
Expand All @@ -87,36 +89,19 @@ impl FileSourceConfig {
Arc::new(DataSourceExec::new(source))
}

/// Initialize a new `FileSourceConfig` instance with metrics, cache, and statistics.
/// Initialize a new `FileSourceConfig` instance.
pub fn new(base_config: FileScanConfig, file_source: Arc<dyn FileSource>) -> Self {
let (
projected_schema,
constraints,
_projected_schema,
_constraints,
projected_statistics,
projected_output_ordering,
_projected_output_ordering,
) = base_config.project();
let cache = Self::compute_properties(
Arc::clone(&projected_schema),
&projected_output_ordering,
constraints,
&base_config,
);
let mut metrics = ExecutionPlanMetricsSet::new();

#[cfg(feature = "parquet")]
if let Some(parquet_config) = file_source.as_any().downcast_ref::<ParquetConfig>()
{
metrics = parquet_config.metrics();
let _predicate_creation_errors = MetricBuilder::new(&metrics)
.global_counter("num_predicate_creation_errors");
};
let file_source = file_source.with_statistics(projected_statistics);

Self {
source: file_source,
base_config,
metrics,
projected_statistics,
cache,
}
}

Expand Down Expand Up @@ -152,11 +137,6 @@ impl FileSourceConfig {
&self.source
}

/// Returns the `PlanProperties` of the plan
pub(crate) fn cache(&self) -> PlanProperties {
self.cache.clone()
}

fn compute_properties(
schema: SchemaRef,
orderings: &[LexOrdering],
Expand All @@ -181,9 +161,6 @@ impl FileSourceConfig {

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
self.cache = self.cache.with_partitioning(output_partitioning);
self
}

Expand Down Expand Up @@ -214,7 +191,7 @@ impl DataSource for FileSourceConfig {
source.create_file_opener(object_store, &self.base_config, partition)?;

let stream =
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
FileStream::new(&self.base_config, partition, opener, source.metrics())?;
Ok(Box::pin(stream))
}

Expand Down Expand Up @@ -268,64 +245,43 @@ impl DataSource for FileSourceConfig {
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
repartition_file_min_size: usize,
exec: DataSourceExec,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
if !self.supports_repartition() {
return Ok(None);
}
let repartition_file_min_size = config.optimizer.repartition_file_min_size;

let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(self.cache().output_ordering().is_some())
.with_preserve_order_within_groups(
exec.properties().output_ordering().is_some(),
)
.repartition_file_groups(&self.base_config.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let plan = Arc::new(exec.with_source(Arc::new(
self.clone().with_file_groups(repartitioned_file_groups),
)));
return Ok(Some(plan));
let source = self.clone().with_file_groups(repartitioned_file_groups);
let output_partitioning =
Self::output_partitioning_helper(&source.base_config);
let plan = exec
.with_source(Arc::new(source))
// Changing file groups may invalidate output partitioning. Update it also
.with_partitioning(output_partitioning);
return Ok(Some(Arc::new(plan)));
}
Ok(None)
}

fn statistics(&self) -> datafusion_common::Result<Statistics> {
#[cfg(not(feature = "parquet"))]
let stats = self.projected_statistics.clone();

#[cfg(feature = "parquet")]
let stats = if let Some(parquet_config) =
self.source.as_any().downcast_ref::<ParquetConfig>()
{
// When filters are pushed down, we have no way of knowing the exact statistics.
// Note that pruning predicate is also a kind of filter pushdown.
// (bloom filters use `pruning_predicate` too)
if parquet_config.pruning_predicate().is_some()
|| parquet_config.page_pruning_predicate().is_some()
|| (parquet_config.predicate().is_some()
&& parquet_config.pushdown_filters())
{
self.projected_statistics.clone().to_inexact()
} else {
self.projected_statistics.clone()
}
} else {
self.projected_statistics.clone()
};

Ok(stats)
self.source.statistics()
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let config = self.base_config.clone().with_limit(limit);
Some(Arc::new(Self {
source: Arc::clone(&self.source),
base_config: config,
metrics: self.metrics.clone(),
projected_statistics: self.projected_statistics.clone(),
cache: self.cache(),
}))
}

Expand All @@ -334,10 +290,17 @@ impl DataSource for FileSourceConfig {
}

fn metrics(&self) -> ExecutionPlanMetricsSet {
self.metrics.clone()
self.source.metrics().clone()
}

fn properties(&self) -> PlanProperties {
self.cache()
let (projected_schema, constraints, _, projected_output_ordering) =
self.base_config.project();
Self::compute_properties(
Arc::clone(&projected_schema),
&projected_output_ordering,
constraints,
&self.base_config,
)
}
}
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ impl FileFormat for ArrowFormat {
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(FileSourceConfig::new_exec(conf, Arc::new(ArrowConfig {})))
Ok(FileSourceConfig::new_exec(
conf,
Arc::new(ArrowConfig::default()),
))
}

async fn create_writer_physical_plan(
Expand Down
30 changes: 26 additions & 4 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@ use crate::error::Result;
use arrow::buffer::Buffer;
use arrow_ipc::reader::FileDecoder;
use arrow_schema::SchemaRef;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;

use datafusion_common::Statistics;
use futures::StreamExt;
use itertools::Itertools;
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};

/// Arrow configuration struct that is given to DataSourceExec
/// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow
#[derive(Clone, Default)]
pub struct ArrowConfig {}
pub struct ArrowConfig {
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
}

impl FileSource for ArrowConfig {
fn create_file_opener(
Expand All @@ -56,15 +62,31 @@ impl FileSource for ArrowConfig {
}

fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
Arc::new(ArrowConfig::default())
Arc::new(Self { ..self.clone() })
}

fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
Arc::new(ArrowConfig::default())
Arc::new(Self { ..self.clone() })
}
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.projected_statistics = Some(statistics);
Arc::new(conf)
}

fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
Arc::new(ArrowConfig::default())
Arc::new(Self { ..self.clone() })
}

fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}

fn statistics(&self) -> Result<Statistics> {
let statistics = &self.projected_statistics;
Ok(statistics
.clone()
.expect("projected_statistics must be set"))
}
}

Expand Down
21 changes: 20 additions & 1 deletion datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ use crate::error::Result;

use arrow::datatypes::SchemaRef;

use datafusion_common::Statistics;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use object_store::ObjectStore;

// TODO projected_constraints
/// AvroConfig holds the extra configuration that is necessary for opening avro files
#[derive(Clone, Default)]
pub struct AvroConfig {
schema: Option<SchemaRef>,
batch_size: Option<usize>,
projection: Option<Vec<String>>,
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
}

impl AvroConfig {
Expand Down Expand Up @@ -95,12 +98,28 @@ impl FileSource for AvroConfig {
conf.schema = Some(schema);
Arc::new(conf)
}
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.projected_statistics = Some(statistics);
Arc::new(conf)
}

fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.projection = config.projected_file_column_names();
Arc::new(conf)
}

fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}

fn statistics(&self) -> Result<Statistics> {
let statistics = &self.projected_statistics;
Ok(statistics
.clone()
.expect("projected_statistics must be set"))
}
}

#[cfg(feature = "avro")]
Expand Down
29 changes: 22 additions & 7 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;

use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
Expand Down Expand Up @@ -73,7 +75,7 @@ use tokio::task::JoinSet;
/// ));
/// let exec = FileSourceConfig::new_exec(file_scan_config, source_config);
/// ```
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct CsvConfig {
batch_size: Option<usize>,
file_schema: Option<SchemaRef>,
Expand All @@ -84,21 +86,18 @@ pub struct CsvConfig {
terminator: Option<u8>,
escape: Option<u8>,
comment: Option<u8>,
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
}

impl CsvConfig {
/// Returns a [`CsvConfig`]
pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self {
Self {
batch_size: None,
file_schema: None,
file_projection: None,
has_header,
delimiter,
quote,
terminator: None,
escape: None,
comment: None,
..Self::default()
}
}

Expand Down Expand Up @@ -240,11 +239,27 @@ impl FileSource for CsvConfig {
Arc::new(conf)
}

fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.projected_statistics = Some(statistics);
Arc::new(conf)
}

fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.file_projection = config.file_column_projection_indices();
Arc::new(conf)
}

fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
fn statistics(&self) -> Result<Statistics> {
let statistics = &self.projected_statistics;
Ok(statistics
.clone()
.expect("projected_statistics must be set"))
}
}

impl FileOpener for CsvOpener {
Expand Down
Loading

0 comments on commit 6c76b3f

Please sign in to comment.