Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File partitioning for ListingTable #1141

Merged
merged 18 commits into from
Nov 1, 2021
Merged

Conversation

rdettai
Copy link
Contributor

@rdettai rdettai commented Oct 18, 2021

Which issue does this PR close?

Closes #1139.

Rationale for this change

Adding capability to parse file partitioning and prune unnecessary files

What changes are included in this PR?

  • add partition_values: Vec<ScalarValue> to PartitionedFile
  • implement pruned_partition_list
  • add extra column for the partition dimensions to execute() record batch result in file format execution plans
    • avro/csv/json
    • parquet
  • add the proper TableProviderFilterPushDown value to supports_filter_pushdown() to avoid re-evaluation of the partition pruning [1]]

What changes are planned in further issues/PRs?

Are there any user-facing changes?

Rename the ListingOptions.partitions to ListingOptions.table_partition_cols to make it a bit more explicit.


[1] re-evaluating the filters on the the partition column would be expensive:

  • it requires the the column to be pushed down, thus materialized in the source execution plan. This is acceptable if we use DictionaryArray<uint8> which is pretty cheap.
  • when applying the filtering expression the dictionary needs to be expanded because many kernel ops are not supported on Dictionaries for now (link and Add better and faster support for dictionary types #87). This could be very expensive!!!!

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Oct 18, 2021
@rdettai rdettai changed the title [feat] adding partition_values to PartitionedFile File partitioning for ListingTable Oct 18, 2021
@alamb
Copy link
Contributor

alamb commented Oct 18, 2021

I plan to check this out carefully tomorrow

@rdettai
Copy link
Contributor Author

rdettai commented Oct 19, 2021

Thanks! You can take a quick look, but this will not be ready before 1 or 2 days... If you want, I can ping you once it's in a more reviewable state 😃

@alamb
Copy link
Contributor

alamb commented Oct 19, 2021

Thanks! You can take a quick look, but this will not be ready before 1 or 2 days... If you want, I can ping you once it's in a more reviewable state 😃

Sounds good. Thank you 🙏

@rdettai rdettai force-pushed the file-partitioning branch 2 times, most recently from b1a9db2 to 34df752 Compare October 22, 2021 16:49
@rdettai
Copy link
Contributor Author

rdettai commented Oct 22, 2021

This is a bit harder than I thought it would be 😅. I will have to keep working on this next week. Feel free to give some feedback already, most of the important elements are already there.

datafusion/src/datasource/listing/table.rs Show resolved Hide resolved
/// The minimum number of records required from this source plan
pub limit: Option<usize>,
/// The partitioning column names
pub table_partition_dims: Vec<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why not name it as table_partition_cols to better align with the comment? the type should make it clear that it's storing column names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, I have to admit that I am in a huge hesitation regarding naming 😅. I am wondering if it's not the comment that should be changed. This partitions are originally encoded in the file path, that we then parse and project into a column if necessary. So they end up as columns, but they are not columns per say.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually they are handled as "virtual columns" during compute right? for example, when a user is writing a SQL query to filter against a partition, they will apply the filter to that partition just like other regular columns. I am suggesting partition column here because it's the term used in hive and spark, so readers would be more familiar with it. Are there systems that use partition dimensions as the naming convention?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that I am aware of, I'll change this to cols 😉

datafusion/src/datasource/listing/helpers.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/listing/helpers.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/listing/helpers.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/listing/helpers.rs Show resolved Hide resolved
datafusion/src/datasource/listing/helpers.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/listing/helpers.rs Outdated Show resolved Hide resolved
} else {
let applicable_filters = filters
.iter()
.filter(|f| expr_applicable_for_cols(table_partition_dims, f));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid the complexity of expr_applicable_for_cols, perhaps we could just throw a runtime error to the user if invalid filters are provided? this also makes sure that if there is a typo in the filter, we won't silent the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we are silencing any error here. Typos in the query were already caught (or should already be caught) upstream in the SQL parser with checks like https://github.com/apache/arrow-datafusion/blob/fe1a9e2c55392b934c85098430f78a26ef71380e/datafusion/src/sql/planner.rs#L769-L775

Here expr_applicable_for_cols is different, it only checks whether a given expression can be resolved using the partitioning dimensions only. For instance, if the table partitioning is of type mytable/date=2021-01-01/file.csv with a file schema of the form Schema([Field("a",int), Field("b",string)]). A filter that contains WHERE b='hello' or WHERE b=date is perfectly valid, but the filter should not be kept in the list of applicable_filters because they cannot be resolved with the partitioning column only.

@houqp
Copy link
Member

houqp commented Oct 25, 2021

Epic work @rdettai !

when applying the filtering expression the dictionary needs to be expanded because many kernel ops are not supported on Dictionaries for now

I think we won't need to worry too much about this for this particular use-case because it won't be a problem anymore after we move to use scalar columnar value to store the partition values.

@alamb
Copy link
Contributor

alamb commented Oct 25, 2021

I plan to review this PR tomorrow (when I am fresher and can git it the look it deserves)

@alamb
Copy link
Contributor

alamb commented Oct 26, 2021

I plan to review this PR tomorrow (when I am fresher and can git it the look it deserves)

@rdettai mentioned he has some more work planned for this PR so I will postpone additional review until that is complete

@rdettai
Copy link
Contributor Author

rdettai commented Oct 28, 2021

I just noticed that there seems to be a small irregularity in terms of columns statistics vector length. I'm adding one last test about that with the appropriate patch if required 😉

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Epic work. I got through almost all of this PR but I need to finish up helpers.rs -- but I ran out of time today; Will finish tomorrow

I did leave some comment / feedback but I don't think any of it is absolutely required to merge

ballista/rust/core/proto/ballista.proto Show resolved Hide resolved
@@ -613,33 +614,28 @@ message ScanLimit {
uint32 limit = 1;
}

message ParquetScanExecNode {
message FileScanExecConf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me

.collect(),
batch_size: exec.batch_size() as u32,
base_conf: Some(exec.base_config().try_into()?),
// TODO serialize predicates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a TODO you plan for this PR? Or a follow on one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already there.

projection: conf
.projection
.as_ref()
.unwrap_or(&vec![])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like an improvement to me, but previously the code would error if projection: None was passed and this code will simply convert that to an empty list.

Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None is encoded with empty vec. Not the cleanest but works here as projection with no column could not have another meaning.

@@ -61,16 +62,9 @@ impl FileFormat for AvroFormat {
async fn create_physical_plan(
&self,
conf: PhysicalPlanConfig,
_filters: &[Expr],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is neat that the creation of the physical plan gets the filters

)
.await;

let result = ctx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool

datafusion/src/datasource/listing/helpers.rs Show resolved Hide resolved
Comment on lines +55 to +56
/// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently
/// supported for the column type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned elsewhere I think it would be fine to say "these are always dictionary coded string columns" rather than "currently" which hints at changing it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this in the other #1141 (comment)

) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
if expr_applicable_for_cols(&self.options.table_partition_cols, filter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

datafusion/src/datasource/listing/helpers.rs Outdated Show resolved Hide resolved
rdettai added a commit to rdettai/arrow-datafusion that referenced this pull request Oct 29, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is great and ready to go. Any other thoughts @houqp or @Dandandan ?


lazy_static! {
/// The datatype used for all partitioning columns for now
pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- so we are envisioning some how allowing users of this code to specify the type in the partitioning somehow (and provide their own code to determine the partition values). That makes sense

}

impl ExpressionVisitor for ApplicabilityVisitor<'_> {
fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

datafusion/src/datasource/listing/helpers.rs Show resolved Hide resolved
}

/// convert the paths of the files to a record batch with the following columns:
/// - one column for the file size named `_df_part_file_size_`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a clever way to apply filtering -- convert the paths to batches and then run the evaluation on the batches, and then turn it back to paths 👍

assert_eq!(&parsed_files[0].partition_values, &[]);
assert_eq!(&parsed_files[1].partition_values, &[]);

let parsed_metas = parsed_files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb mentioned this pull request Nov 1, 2021
@alamb
Copy link
Contributor

alamb commented Nov 1, 2021

I plan to merge from master and if all the tests pass put this one in. FYI @houqp / @Dandandan @jimexist

Please let me know if you want more time to review

@alamb
Copy link
Contributor

alamb commented Nov 1, 2021

FYI fixed a logical conflict in 5d34be6

repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
uint32 batch_size = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this be back-compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you don't have DataFusion nodes with different versions, it should be ok!

@@ -70,7 +70,7 @@ pub enum ListEntry {
}

/// The path and size of the file.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it is really useful to add a trait that is not used 😊


use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

pub fn aggr_test_schema() -> SchemaRef {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn aggr_test_schema() -> SchemaRef {
pub(crate) fn aggr_test_schema() -> SchemaRef {

Copy link
Contributor Author

@rdettai rdettai Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests module is enabled for tests only anyway, so the (crate) modifier does not have much effects here (you would need to build DataFusion in test mode to use it).

@alamb
Copy link
Contributor

alamb commented Nov 1, 2021

As this has been outstanding for a long time and is a fairly large change, the potential for conflict is large -- I am going to merge it in now and hopefully we can keep iterating in future PRs. Thanks again @rdettai @jimexist @houqp and @Dandandan -- 🚀

@alamb alamb merged commit b2a5028 into apache:master Nov 1, 2021
@Dandandan
Copy link
Contributor

Thank you @rdettai ! Really nice work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement partitioned read in listing table provider
5 participants