Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ignore empty files in ListingTable when listing files with or without partition filters, as well as when inferring schema #13750

Merged
merged 7 commits into from
Dec 18, 2024

Conversation

Blizzara
Copy link
Contributor

@Blizzara Blizzara commented Dec 12, 2024

Which issue does this PR close?

Ignore empty files in ListingTable. Sometimes input datasets can contain empty files (as in 0 bytes), and trying to treat them like normal files fails when e.g. reading parquet metadata.

Closes #13737.

Rationale for this change

Empty files cannot contribute anything to the table, other than to break things, so ignoring them is pretty much strictly better. Also aligns with e.g. Spark.

What changes are included in this PR?

Ignore empty files in ListingTable when listing files with or without partition filters, as well as when inferring schema

One thing I'm not sure about is that it seems pruned_partition_list is also used when writing to table, in

let file_list_stream = pruned_partition_list(
. Is it a problem to ignore empty files there?

Are these changes tested?

Added empty file into existing tests for pruned_partition_list, as well as new tests for list_partitions. Table.rs didn't seem to have any tests for schema inference, so I didn't add anything for it.

Also tested in our production (well, testing) system.

Are there any user-facing changes?

Reading input ListingTables containing empty files now succeeds.

…thout partition filters, as well as when inferring schema
@github-actions github-actions bot added the core Core DataFusion crate label Dec 12, 2024
#[tokio::test]
async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume there was no special reason to test parallelism for empty files? Now that we just skip empty files altogether there is no parallelism, the test does pass with the repartition settings as well but they seemed unrelevant so I cleaned them away. Can add back if there's a reason to keep them!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some research and found it seems to have been added in #6801 by @2010YOUY01 . As long as the code works with empty files (aka doesn't throw an error / go into a infinite loop) I think we are good

Thus I suggest leaving at least one test where we set the repartition file sizes/min file size to 0 and make sure nothing bad happens

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done! 3f58b02

#[tokio::test]
async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
Copy link
Contributor Author

@Blizzara Blizzara Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this checks that the plan has a CsvExec node, which we no longer have (now it's a TableScan: empty / EmptyExec)

result
.objects
.into_iter()
.filter(|object_meta| object_meta.size > 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for (hive-style) partitioned reads

@@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>(
table_path
.list_all_files(ctx, store, file_extension)
.await?
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for non-(hive-style)-partitioned reads

@@ -470,6 +470,8 @@ impl ListingOptions {
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
// Empty files cannot affect schema but may throw when trying to read for it
.try_filter(|object_meta| future::ready(object_meta.size > 0))
Copy link
Contributor Author

@Blizzara Blizzara Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for all (parquet) reads that don't provide schema

@@ -2251,6 +2251,59 @@ mod tests {
scan_format(state, &*format, &testdata, file_name, projection, limit).await
}

/// Test that 0-byte files don't break while reading
#[tokio::test]
async fn test_read_empty_parquet() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These validate the two reading modes for parquet, + also the schema inference change. Reverting any of those three will cause one or both of these to fail.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Blizzara -- I think this PR makes sense to me 🙏

I think it would be good to leave at least one "repartition a bunch of zero sized files" test in to make sure that doesn't trigger some cornercase, but otherwise this looks good to me

#[tokio::test]
async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some research and found it seems to have been added in #6801 by @2010YOUY01 . As long as the code works with empty files (aka doesn't throw an error / go into a infinite loop) I think we are good

Thus I suggest leaving at least one test where we set the repartition file sizes/min file size to 0 and make sure nothing bad happens

@@ -671,6 +680,106 @@ mod tests {
);
}

fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment here might be nice explaining what the str/usize/Vec<&str> means for future readers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done! 3f58b02

Copy link
Contributor

@goldmedal goldmedal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Blizzara. It looks good to me 👍

@goldmedal goldmedal merged commit 1fc7769 into apache:main Dec 18, 2024
25 checks passed
@goldmedal
Copy link
Contributor

Thanks @Blizzara and @alamb for reviewing

@Blizzara Blizzara deleted the avo/listing-table-ignore-empty-files branch December 18, 2024 16:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ignore empty (parquet) files when using ListingTable
3 participants