-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Ignore empty files in ListingTable when listing files with or without partition filters, as well as when inferring schema #13750
Changes from 4 commits
789897b
c5d46f2
61c5e0d
af25b26
53d96be
38cdbf2
3f58b02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1259,73 +1259,57 @@ mod tests { | |
Ok(()) | ||
} | ||
|
||
/// Read a single empty csv file in parallel | ||
/// Read a single empty csv file | ||
/// | ||
/// empty_0_byte.csv: | ||
/// (file is empty) | ||
#[rstest(n_partitions, case(1), case(2), case(3), case(4))] | ||
#[tokio::test] | ||
async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> { | ||
let config = SessionConfig::new() | ||
.with_repartition_file_scans(true) | ||
.with_repartition_file_min_size(0) | ||
.with_target_partitions(n_partitions); | ||
let ctx = SessionContext::new_with_config(config); | ||
async fn test_csv_empty_file() -> Result<()> { | ||
let ctx = SessionContext::new(); | ||
ctx.register_csv( | ||
"empty", | ||
"tests/data/empty_0_byte.csv", | ||
CsvReadOptions::new().has_header(false), | ||
) | ||
.await?; | ||
|
||
// Require a predicate to enable repartition for the optimizer | ||
let query = "select * from empty where random() > 0.5;"; | ||
let query_result = ctx.sql(query).await?.collect().await?; | ||
let actual_partitions = count_query_csv_partitions(&ctx, query).await?; | ||
|
||
#[rustfmt::skip] | ||
let expected = ["++", | ||
"++"]; | ||
assert_batches_eq!(expected, &query_result); | ||
assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Read a single empty csv file with header in parallel | ||
/// Read a single empty csv file with header | ||
/// | ||
/// empty.csv: | ||
/// c1,c2,c3 | ||
#[rstest(n_partitions, case(1), case(2), case(3))] | ||
#[tokio::test] | ||
async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> Result<()> { | ||
let config = SessionConfig::new() | ||
.with_repartition_file_scans(true) | ||
.with_repartition_file_min_size(0) | ||
.with_target_partitions(n_partitions); | ||
let ctx = SessionContext::new_with_config(config); | ||
async fn test_csv_empty_with_header() -> Result<()> { | ||
let ctx = SessionContext::new(); | ||
ctx.register_csv( | ||
"empty", | ||
"tests/data/empty.csv", | ||
CsvReadOptions::new().has_header(true), | ||
) | ||
.await?; | ||
|
||
// Require a predicate to enable repartition for the optimizer | ||
let query = "select * from empty where random() > 0.5;"; | ||
let query_result = ctx.sql(query).await?.collect().await?; | ||
let actual_partitions = count_query_csv_partitions(&ctx, query).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this checks that the plan has a |
||
|
||
#[rustfmt::skip] | ||
let expected = ["++", | ||
"++"]; | ||
assert_batches_eq!(expected, &query_result); | ||
assert_eq!(n_partitions, actual_partitions); | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Read multiple empty csv files in parallel | ||
/// Read multiple empty csv files | ||
/// | ||
/// all_empty | ||
/// ├── empty0.csv | ||
|
@@ -1334,14 +1318,9 @@ mod tests { | |
/// | ||
/// empty0.csv/empty1.csv/empty2.csv: | ||
/// (file is empty) | ||
#[rstest(n_partitions, case(1), case(2), case(3), case(4))] | ||
#[tokio::test] | ||
async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> Result<()> { | ||
let config = SessionConfig::new() | ||
.with_repartition_file_scans(true) | ||
.with_repartition_file_min_size(0) | ||
.with_target_partitions(n_partitions); | ||
let ctx = SessionContext::new_with_config(config); | ||
async fn test_csv_multiple_empty_files() -> Result<()> { | ||
let ctx = SessionContext::new(); | ||
let file_format = Arc::new(CsvFormat::default().with_has_header(false)); | ||
let listing_options = ListingOptions::new(file_format.clone()) | ||
.with_file_extension(file_format.get_ext()); | ||
|
@@ -1358,13 +1337,11 @@ mod tests { | |
// Require a predicate to enable repartition for the optimizer | ||
let query = "select * from empty where random() > 0.5;"; | ||
let query_result = ctx.sql(query).await?.collect().await?; | ||
let actual_partitions = count_query_csv_partitions(&ctx, query).await?; | ||
|
||
#[rustfmt::skip] | ||
let expected = ["++", | ||
"++"]; | ||
assert_batches_eq!(expected, &query_result); | ||
assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty | ||
|
||
Ok(()) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -619,13 +619,11 @@ mod tests { | |
Ok(()) | ||
} | ||
|
||
#[rstest(n_partitions, case(1), case(2), case(3), case(4))] | ||
#[tokio::test] | ||
async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
async fn it_can_read_empty_ndjson() -> Result<()> { | ||
let config = SessionConfig::new() | ||
.with_repartition_file_scans(true) | ||
.with_repartition_file_min_size(0) | ||
.with_target_partitions(n_partitions); | ||
.with_repartition_file_min_size(0); | ||
|
||
let ctx = SessionContext::new_with_config(config); | ||
|
||
|
@@ -638,7 +636,6 @@ mod tests { | |
let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;"; | ||
|
||
let result = ctx.sql(query).await?.collect().await?; | ||
let actual_partitions = count_num_partitions(&ctx, query).await?; | ||
|
||
#[rustfmt::skip] | ||
let expected = [ | ||
|
@@ -647,7 +644,6 @@ mod tests { | |
]; | ||
|
||
assert_batches_eq!(expected, &result); | ||
assert_eq!(1, actual_partitions); | ||
|
||
Ok(()) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1312,7 +1312,7 @@ mod tests { | |
|
||
use crate::datasource::file_format::parquet::test_util::store_parquet; | ||
use crate::physical_plan::metrics::MetricValue; | ||
use crate::prelude::{SessionConfig, SessionContext}; | ||
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; | ||
use arrow::array::{Array, ArrayRef, StringArray}; | ||
use arrow_array::types::Int32Type; | ||
use arrow_array::{DictionaryArray, Int32Array, Int64Array}; | ||
|
@@ -1323,8 +1323,8 @@ mod tests { | |
as_float64_array, as_int32_array, as_timestamp_nanosecond_array, | ||
}; | ||
use datafusion_common::config::ParquetOptions; | ||
use datafusion_common::ScalarValue; | ||
use datafusion_common::ScalarValue::Utf8; | ||
use datafusion_common::{assert_batches_eq, ScalarValue}; | ||
use datafusion_execution::object_store::ObjectStoreUrl; | ||
use datafusion_execution::runtime_env::RuntimeEnv; | ||
use datafusion_physical_plan::stream::RecordBatchStreamAdapter; | ||
|
@@ -2251,6 +2251,79 @@ mod tests { | |
scan_format(state, &*format, &testdata, file_name, projection, limit).await | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_read_parquet() -> Result<()> { | ||
let testdata = crate::test_util::parquet_test_data(); | ||
let path = format!("{testdata}/alltypes_tiny_pages.parquet"); | ||
let file = File::open(path).await.unwrap(); | ||
let options = ArrowReaderOptions::new().with_page_index(true); | ||
let builder = | ||
ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone()) | ||
.await | ||
.unwrap() | ||
.metadata() | ||
.clone(); | ||
check_page_index_validation(builder.column_index(), builder.offset_index()); | ||
|
||
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); | ||
let file = File::open(path).await.unwrap(); | ||
|
||
let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options) | ||
.await | ||
.unwrap() | ||
.metadata() | ||
.clone(); | ||
check_page_index_validation(builder.column_index(), builder.offset_index()); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_read_empty_parquet() -> Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These validate the two reading modes for parquet, + also the schema inference change. Reverting any of those three will cause one or both of these to fail. |
||
let testdata = crate::test_util::parquet_test_data(); | ||
let path = format!("{testdata}/empty.parquet"); | ||
|
||
let ctx = SessionContext::new(); | ||
|
||
let df = ctx | ||
.read_parquet(&path, ParquetReadOptions::default()) | ||
.await | ||
.expect("read_parquet should succeed"); | ||
|
||
let result = df.collect().await?; | ||
#[rustfmt::skip] | ||
let expected = ["++", | ||
"++"]; | ||
assert_batches_eq!(expected, &result); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_read_partitioned_empty_parquet() -> Result<()> { | ||
let testdata = crate::test_util::parquet_test_data(); | ||
let path = format!("{testdata}/partitioned/"); | ||
|
||
let ctx = SessionContext::new(); | ||
|
||
let df = ctx | ||
.read_parquet( | ||
&path, | ||
ParquetReadOptions::new() | ||
.table_partition_cols(vec![("col1".to_string(), DataType::Utf8)]), | ||
) | ||
.await | ||
.expect("read_parquet should succeed"); | ||
|
||
let result = df.collect().await?; | ||
#[rustfmt::skip] | ||
let expected = ["++", | ||
"++"]; | ||
assert_batches_eq!(expected, &result); | ||
|
||
Ok(()) | ||
} | ||
|
||
fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> { | ||
let tmp_dir = tempfile::TempDir::new().unwrap(); | ||
let local = Arc::new( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,7 +171,13 @@ impl Partition { | |
trace!("Listing partition {}", self.path); | ||
let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty()); | ||
let result = store.list_with_delimiter(prefix).await?; | ||
self.files = Some(result.objects); | ||
self.files = Some( | ||
result | ||
.objects | ||
.into_iter() | ||
.filter(|object_meta| object_meta.size > 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needed for (hive-style) partitioned reads |
||
.collect(), | ||
); | ||
Ok((self, result.common_prefixes)) | ||
} | ||
} | ||
|
@@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>( | |
table_path | ||
.list_all_files(ctx, store, file_extension) | ||
.await? | ||
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needed for non-(hive-style)-partitioned reads |
||
.map_ok(|object_meta| object_meta.into()), | ||
)); | ||
} | ||
|
@@ -566,6 +573,7 @@ mod tests { | |
async fn test_pruned_partition_list_empty() { | ||
let (store, state) = make_test_store_and_state(&[ | ||
("tablepath/mypartition=val1/notparquetfile", 100), | ||
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), | ||
("tablepath/file.parquet", 100), | ||
]); | ||
let filter = Expr::eq(col("mypartition"), lit("val1")); | ||
|
@@ -590,6 +598,7 @@ mod tests { | |
let (store, state) = make_test_store_and_state(&[ | ||
("tablepath/mypartition=val1/file.parquet", 100), | ||
("tablepath/mypartition=val2/file.parquet", 100), | ||
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), | ||
("tablepath/mypartition=val1/other=val3/file.parquet", 100), | ||
]); | ||
let filter = Expr::eq(col("mypartition"), lit("val1")); | ||
|
@@ -671,6 +680,106 @@ mod tests { | |
); | ||
} | ||
|
||
fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a comment here might be nice explaining what the str/usize/Vec<&str> means for future readers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done! 3f58b02 |
||
( | ||
partition.path.as_ref(), | ||
partition.depth, | ||
partition | ||
.files | ||
.as_ref() | ||
.map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect()) | ||
.unwrap_or_default(), | ||
) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_list_partition() { | ||
let (store, _) = make_test_store_and_state(&[ | ||
("tablepath/part1=p1v1/file.parquet", 100), | ||
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), | ||
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), | ||
("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), | ||
("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), | ||
("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), | ||
]); | ||
|
||
let partitions = list_partitions( | ||
store.as_ref(), | ||
&ListingTableUrl::parse("file:///tablepath/").unwrap(), | ||
0, | ||
None, | ||
) | ||
.await | ||
.expect("listing partitions failed"); | ||
|
||
assert_eq!( | ||
&partitions | ||
.iter() | ||
.map(describe_partition) | ||
.collect::<Vec<_>>(), | ||
&vec![ | ||
("tablepath", 0, vec![]), | ||
("tablepath/part1=p1v1", 1, vec![]), | ||
("tablepath/part1=p1v2", 1, vec![]), | ||
("tablepath/part1=p1v3", 1, vec![]), | ||
] | ||
); | ||
|
||
let partitions = list_partitions( | ||
store.as_ref(), | ||
&ListingTableUrl::parse("file:///tablepath/").unwrap(), | ||
1, | ||
None, | ||
) | ||
.await | ||
.expect("listing partitions failed"); | ||
|
||
assert_eq!( | ||
&partitions | ||
.iter() | ||
.map(describe_partition) | ||
.collect::<Vec<_>>(), | ||
&vec![ | ||
("tablepath", 0, vec![]), | ||
("tablepath/part1=p1v1", 1, vec!["file.parquet"]), | ||
("tablepath/part1=p1v2", 1, vec![]), | ||
("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), | ||
("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), | ||
("tablepath/part1=p1v3", 1, vec![]), | ||
("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), | ||
] | ||
); | ||
|
||
let partitions = list_partitions( | ||
store.as_ref(), | ||
&ListingTableUrl::parse("file:///tablepath/").unwrap(), | ||
2, | ||
None, | ||
) | ||
.await | ||
.expect("listing partitions failed"); | ||
|
||
assert_eq!( | ||
&partitions | ||
.iter() | ||
.map(describe_partition) | ||
.collect::<Vec<_>>(), | ||
&vec![ | ||
("tablepath", 0, vec![]), | ||
("tablepath/part1=p1v1", 1, vec!["file.parquet"]), | ||
("tablepath/part1=p1v2", 1, vec![]), | ||
("tablepath/part1=p1v3", 1, vec![]), | ||
( | ||
"tablepath/part1=p1v2/part2=p2v1", | ||
2, | ||
vec!["file1.parquet", "file2.parquet"] | ||
), | ||
("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), | ||
("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), | ||
] | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_parse_partitions_for_path() { | ||
assert_eq!( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -470,6 +470,8 @@ impl ListingOptions { | |
let files: Vec<_> = table_path | ||
.list_all_files(state, store.as_ref(), &self.file_extension) | ||
.await? | ||
// Empty files cannot affect schema but may throw when trying to read for it | ||
.try_filter(|object_meta| future::ready(object_meta.size > 0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needed for all (parquet) reads that don't provide schema |
||
.try_collect() | ||
.await?; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume there was no special reason to test parallelism for empty files? Now that we just skip empty files altogether there is no parallelism, the test does pass with the repartition settings as well but they seemed unrelevant so I cleaned them away. Can add back if there's a reason to keep them!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some research and found it seems to have been added in #6801 by @2010YOUY01 . As long as the code works with empty files (aka doesn't throw an error / go into a infinite loop) I think we are good
Thus I suggest leaving at least one test where we set the repartition file sizes/min file size to 0 and make sure nothing bad happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done! 3f58b02