Skip to content

Commit

Permalink
feat: Determine ordering of file groups (#9593)
Browse files Browse the repository at this point in the history
* add statistics to PartitionedFile

* just dump work for now

* working test case

* fix jumbled rebase

* forgot to annotate #[test]

* more refactoring

* add a link

* refactor again

* whitespace

* format debug log

* remove useless itertools

* refactor test

* fix bug

* use sort_file_groups in ListingTable

* move check into a better place

* refactor test a bit

* more testing

* more testing

* better error message

* fix log msg

* fix again

* add sqllogictest and fixes

* fix test

* Update datafusion/core/src/datasource/listing/mod.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/core/src/datasource/physical_plan/file_scan_config.rs

Co-authored-by: Andrew Lamb <[email protected]>

* more unit tests

* rename to split_groups_by_statistics

* only use groups if there's <= target_partitions

* refactor a bit, no need for projected_schema

* fix reverse order

* save work for now

* lots of test cases in new slt

* remove output check

* fix

* fix last test

* comment on params

* clippy

* revert parquet.slt

* no need to pass projection separately

* Update datafusion/core/src/datasource/listing/mod.rs

Co-authored-by: Nga Tran <[email protected]>

* update comment on  in

* fix test?

* un-fix?

* add fix back in?

* move indices_sorted_by_min to MinMaxStatistics

* move MinMaxStatistics to its own module

* fix license

* add feature flag

* update config

---------

Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Nga Tran <[email protected]>
  • Loading branch information
3 people authored May 1, 2024
1 parent 1e0c760 commit 7c1c794
Show file tree
Hide file tree
Showing 21 changed files with 1,018 additions and 19 deletions.
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ config_namespace! {

/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true

/// Attempt to eliminate sorts by packing & sorting files with non-overlapping
/// statistics into the same file groups.
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub(crate) mod test_util {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
}]];

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ pub async fn pruned_partition_list<'a>(
object_meta,
partition_values: partition_values.clone(),
range: None,
statistics: None,
extensions: None,
})
}));
Expand Down
12 changes: 10 additions & 2 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod url;

use crate::error::Result;
use chrono::TimeZone;
use datafusion_common::ScalarValue;
use datafusion_common::{ScalarValue, Statistics};
use futures::Stream;
use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
Expand Down Expand Up @@ -67,6 +67,11 @@ pub struct PartitionedFile {
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
/// Optional statistics that describe the data in this file if known.
///
/// DataFusion relies on these statistics for planning (in particular to sort file groups),
/// so if they are incorrect, incorrect answers may result.
pub statistics: Option<Statistics>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
Expand All @@ -83,6 +88,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
}
}
Expand All @@ -98,7 +104,8 @@ impl PartitionedFile {
version: None,
},
partition_values: vec![],
range: None,
range: Some(FileRange { start, end }),
statistics: None,
extensions: None,
}
.with_range(start, end)
Expand Down Expand Up @@ -128,6 +135,7 @@ impl From<ObjectMeta> for PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
}
}
Expand Down
39 changes: 34 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,16 +739,43 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (partitioned_file_lists, statistics) =
let (mut partitioned_file_lists, statistics) =
self.list_files_for_scan(state, filters, limit).await?;

let projected_schema = project_schema(&self.schema(), projection)?;

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let output_ordering = self.try_create_output_ordering()?;
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
)
})
})
.flatten()
{
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
}
}
None => {} // no ordering required
};

// extract types of partition columns
let table_partition_cols = self
.options
Expand All @@ -772,6 +799,7 @@ impl TableProvider for ListingTable {
} else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};

// create the execution plan
self.options
.format
Expand All @@ -784,7 +812,7 @@ impl TableProvider for ListingTable {
statistics,
projection: projection.cloned(),
limit,
output_ordering: self.try_create_output_ordering()?,
output_ordering,
table_partition_cols,
},
filters.as_ref(),
Expand Down Expand Up @@ -937,10 +965,11 @@ impl ListingTable {
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let mut part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
Expand Down
Loading

0 comments on commit 7c1c794

Please sign in to comment.