Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ignore empty files in ListingTable when listing files with or without partition filters, as well as when inferring schema #13750

Merged
merged 7 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 10 additions & 28 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,73 +1259,57 @@ mod tests {
Ok(())
}

/// Read a single empty csv file in parallel
/// Read a single empty csv file
///
/// empty_0_byte.csv:
/// (file is empty)
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume there was no special reason to test parallelism for empty files? Now that we just skip empty files altogether there is no parallelism, the test does pass with the repartition settings as well but they seemed unrelevant so I cleaned them away. Can add back if there's a reason to keep them!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some research and found it seems to have been added in #6801 by @2010YOUY01 . As long as the code works with empty files (aka doesn't throw an error / go into a infinite loop) I think we are good

Thus I suggest leaving at least one test where we set the repartition file sizes/min file size to 0 and make sure nothing bad happens

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done! 3f58b02

let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::new_with_config(config);
async fn test_csv_empty_file() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"empty",
"tests/data/empty_0_byte.csv",
CsvReadOptions::new().has_header(false),
)
.await?;

// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty

Ok(())
}

/// Read a single empty csv file with header in parallel
/// Read a single empty csv file with header
///
/// empty.csv:
/// c1,c2,c3
#[rstest(n_partitions, case(1), case(2), case(3))]
#[tokio::test]
async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::new_with_config(config);
async fn test_csv_empty_with_header() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"empty",
"tests/data/empty.csv",
CsvReadOptions::new().has_header(true),
)
.await?;

// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
Copy link
Contributor Author

@Blizzara Blizzara Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this checks that the plan has a CsvExec node, which we no longer have (now it's a TableScan: empty / EmptyExec)


#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(n_partitions, actual_partitions);

Ok(())
}

/// Read multiple empty csv files in parallel
/// Read multiple empty csv files
///
/// all_empty
/// ├── empty0.csv
Expand All @@ -1334,13 +1318,13 @@ mod tests {
///
/// empty0.csv/empty1.csv/empty2.csv:
/// (file is empty)
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> Result<()> {
async fn test_csv_multiple_empty_files() -> Result<()> {
// Testing that partitioning doesn't break with empty files
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
.with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
let file_format = Arc::new(CsvFormat::default().with_has_header(false));
let listing_options = ListingOptions::new(file_format.clone())
Expand All @@ -1358,13 +1342,11 @@ mod tests {
// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty

Ok(())
}
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,11 @@ mod tests {
Ok(())
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

async fn it_can_read_empty_ndjson() -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
.with_repartition_file_min_size(0);

let ctx = SessionContext::new_with_config(config);

Expand All @@ -638,7 +636,6 @@ mod tests {
let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";

let result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_num_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = [
Expand All @@ -647,7 +644,6 @@ mod tests {
];

assert_batches_eq!(expected, &result);
assert_eq!(1, actual_partitions);

Ok(())
}
Expand Down
57 changes: 55 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ mod tests {

use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_array::types::Int32Type;
use arrow_array::{DictionaryArray, Int32Array, Int64Array};
Expand All @@ -1323,8 +1323,8 @@ mod tests {
as_float64_array, as_int32_array, as_timestamp_nanosecond_array,
};
use datafusion_common::config::ParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::Utf8;
use datafusion_common::{assert_batches_eq, ScalarValue};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -2251,6 +2251,59 @@ mod tests {
scan_format(state, &*format, &testdata, file_name, projection, limit).await
}

/// Test that 0-byte files don't break while reading
#[tokio::test]
async fn test_read_empty_parquet() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These validate the two reading modes for parquet, + also the schema inference change. Reverting any of those three will cause one or both of these to fail.

let tmp_dir = tempfile::TempDir::new().unwrap();
let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy());
File::create(&path).await?;

let ctx = SessionContext::new();

let df = ctx
.read_parquet(&path, ParquetReadOptions::default())
.await
.expect("read_parquet should succeed");

let result = df.collect().await?;
#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &result);

Ok(())
}

/// Test that 0-byte files don't break while reading
#[tokio::test]
async fn test_read_partitioned_empty_parquet() -> Result<()> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let partition_dir = tmp_dir.path().join("col1=a");
std::fs::create_dir(&partition_dir).unwrap();
File::create(partition_dir.join("empty.parquet"))
.await
.unwrap();

let ctx = SessionContext::new();

let df = ctx
.read_parquet(
tmp_dir.path().to_str().unwrap(),
ParquetReadOptions::new()
.table_partition_cols(vec![("col1".to_string(), DataType::Utf8)]),
)
.await
.expect("read_parquet should succeed");

let result = df.collect().await?;
#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &result);

Ok(())
}

fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let local = Arc::new(
Expand Down
112 changes: 111 additions & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,13 @@ impl Partition {
trace!("Listing partition {}", self.path);
let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
let result = store.list_with_delimiter(prefix).await?;
self.files = Some(result.objects);
self.files = Some(
result
.objects
.into_iter()
.filter(|object_meta| object_meta.size > 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for (hive-style) partitioned reads

.collect(),
);
Ok((self, result.common_prefixes))
}
}
Expand Down Expand Up @@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>(
table_path
.list_all_files(ctx, store, file_extension)
.await?
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for non-(hive-style)-partitioned reads

.map_ok(|object_meta| object_meta.into()),
));
}
Expand Down Expand Up @@ -566,6 +573,7 @@ mod tests {
async fn test_pruned_partition_list_empty() {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/notparquetfile", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
Expand All @@ -590,6 +598,7 @@ mod tests {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
Expand Down Expand Up @@ -671,6 +680,107 @@ mod tests {
);
}

/// Describe a partition as a (path, depth, files) tuple for easier assertions
fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment here might be nice explaining what the str/usize/Vec<&str> means for future readers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done! 3f58b02

(
partition.path.as_ref(),
partition.depth,
partition
.files
.as_ref()
.map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
.unwrap_or_default(),
)
}

#[tokio::test]
async fn test_list_partition() {
let (store, _) = make_test_store_and_state(&[
("tablepath/part1=p1v1/file.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0),
]);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
0,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec![]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
]
);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
1,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v2/part2=p2v1", 2, vec![]),
("tablepath/part1=p1v2/part2=p2v2", 2, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
("tablepath/part1=p1v3/part2=p2v1", 2, vec![]),
]
);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
2,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
(
"tablepath/part1=p1v2/part2=p2v1",
2,
vec!["file1.parquet", "file2.parquet"]
),
("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]),
("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]),
]
);
}

#[test]
fn test_parse_partitions_for_path() {
assert_eq!(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ impl ListingOptions {
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
// Empty files cannot affect schema but may throw when trying to read for it
.try_filter(|object_meta| future::ready(object_meta.size > 0))
Copy link
Contributor Author

@Blizzara Blizzara Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for all (parquet) reads that don't provide schema

.try_collect()
.await?;

Expand Down
Loading