Skip to content

Commit

Permalink
fix: Ignore empty files in ListingTable when listing files with or wi…
Browse files Browse the repository at this point in the history
…thout partition filters, as well as when inferring schema
  • Loading branch information
Blizzara committed Dec 12, 2024
1 parent 98c483e commit 789897b
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 1 deletion.
111 changes: 110 additions & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,13 @@ impl Partition {
trace!("Listing partition {}", self.path);
let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
let result = store.list_with_delimiter(prefix).await?;
self.files = Some(result.objects);
self.files = Some(
result
.objects
.into_iter()
.filter(|object_meta| object_meta.size > 0)
.collect(),
);
Ok((self, result.common_prefixes))
}
}
Expand Down Expand Up @@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>(
table_path
.list_all_files(ctx, store, file_extension)
.await?
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0))
.map_ok(|object_meta| object_meta.into()),
));
}
Expand Down Expand Up @@ -566,6 +573,7 @@ mod tests {
async fn test_pruned_partition_list_empty() {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/notparquetfile", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
Expand All @@ -590,6 +598,7 @@ mod tests {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
Expand Down Expand Up @@ -671,6 +680,106 @@ mod tests {
);
}

fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
(
partition.path.as_ref(),
partition.depth,
partition
.files
.as_ref()
.map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
.unwrap_or(Vec::new()),
)
}

#[tokio::test]
async fn test_list_partition() {
let (store, _) = make_test_store_and_state(&[
("tablepath/part1=p1v1/file.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0),
]);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
0,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec![]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
]
);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
1,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v2/part2=p2v1", 2, vec![]),
("tablepath/part1=p1v2/part2=p2v2", 2, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
("tablepath/part1=p1v3/part2=p2v1", 2, vec![]),
]
);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
2,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
(
"tablepath/part1=p1v2/part2=p2v1",
2,
vec!["file1.parquet", "file2.parquet"]
),
("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]),
("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]),
]
);
}

#[test]
fn test_parse_partitions_for_path() {
assert_eq!(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ impl ListingOptions {
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
// Empty files cannot affect schema but may throw when trying to read for it
.try_filter(|object_meta| future::ready(object_meta.size > 0))
.try_collect()
.await?;

Expand Down

0 comments on commit 789897b

Please sign in to comment.