-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prune pages are all null in ParquetExec by row_counts and fix NOT NULL prune #10051
Changes from 1 commit
dee9265
efcf6c3
88e62fd
641540c
8d173c6
65dc157
75a6d0a
7b014a2
b2d3641
f66cce3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -28,7 +28,7 @@ use arrow::{ | |||||||||||
record_batch::RecordBatch, | ||||||||||||
util::pretty::pretty_format_batches, | ||||||||||||
}; | ||||||||||||
use arrow_array::new_null_array; | ||||||||||||
use arrow_array::make_array; | ||||||||||||
use chrono::{Datelike, Duration, TimeDelta}; | ||||||||||||
use datafusion::{ | ||||||||||||
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, | ||||||||||||
|
@@ -77,6 +77,7 @@ enum Scenario { | |||||||||||
ByteArray, | ||||||||||||
PeriodsInColumnNames, | ||||||||||||
WithNullValues, | ||||||||||||
WithNullValuesPageLevel, | ||||||||||||
} | ||||||||||||
|
||||||||||||
enum Unit { | ||||||||||||
|
@@ -632,22 +633,60 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch { | |||||||||||
RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap() | ||||||||||||
} | ||||||||||||
|
||||||||||||
/// Return record batch with i8, i16, i32, and i64 sequences with all Null values | ||||||||||||
fn make_all_null_values() -> RecordBatch { | ||||||||||||
/// Return record batch with i8, i16, i32, and i64 sequences with Null values | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would help to explain the shape of this data a bit more to help readers. Something like the following (though note it would change if you change the argments as I suggest below)
Suggested change
|
||||||||||||
/// here 5 rows in page when using Unit::Page | ||||||||||||
fn make_int_batches_with_null( | ||||||||||||
null_values: usize, | ||||||||||||
no_null_values_start: usize, | ||||||||||||
no_null_values_end: usize, | ||||||||||||
Comment on lines
+640
to
+641
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you might be able to simplify this method by sending in 2 parameters. Right now it looks like it interleaves nulls arbitrarily but really the nulls are always at the start and non nulls are at the end num_nulls: usize,
non_nulls: usize |
||||||||||||
) -> RecordBatch { | ||||||||||||
let schema = Arc::new(Schema::new(vec![ | ||||||||||||
Field::new("i8", DataType::Int8, true), | ||||||||||||
Field::new("i16", DataType::Int16, true), | ||||||||||||
Field::new("i32", DataType::Int32, true), | ||||||||||||
Field::new("i64", DataType::Int64, true), | ||||||||||||
])); | ||||||||||||
|
||||||||||||
let v8: Vec<i8> = (no_null_values_start as _..no_null_values_end as _).collect(); | ||||||||||||
let v16: Vec<i16> = (no_null_values_start as _..no_null_values_end as _).collect(); | ||||||||||||
let v32: Vec<i32> = (no_null_values_start as _..no_null_values_end as _).collect(); | ||||||||||||
let v64: Vec<i64> = (no_null_values_start as _..no_null_values_end as _).collect(); | ||||||||||||
|
||||||||||||
RecordBatch::try_new( | ||||||||||||
schema, | ||||||||||||
vec![ | ||||||||||||
new_null_array(&DataType::Int8, 5), | ||||||||||||
new_null_array(&DataType::Int16, 5), | ||||||||||||
new_null_array(&DataType::Int32, 5), | ||||||||||||
new_null_array(&DataType::Int64, 5), | ||||||||||||
make_array( | ||||||||||||
Int8Array::from_iter( | ||||||||||||
v8.into_iter() | ||||||||||||
.map(Some) | ||||||||||||
.chain(std::iter::repeat(None).take(null_values)), | ||||||||||||
) | ||||||||||||
.to_data(), | ||||||||||||
), | ||||||||||||
make_array( | ||||||||||||
Int16Array::from_iter( | ||||||||||||
v16.into_iter() | ||||||||||||
.map(Some) | ||||||||||||
.chain(std::iter::repeat(None).take(null_values)), | ||||||||||||
) | ||||||||||||
.to_data(), | ||||||||||||
), | ||||||||||||
make_array( | ||||||||||||
Int32Array::from_iter( | ||||||||||||
v32.into_iter() | ||||||||||||
.map(Some) | ||||||||||||
.chain(std::iter::repeat(None).take(null_values)), | ||||||||||||
) | ||||||||||||
.to_data(), | ||||||||||||
), | ||||||||||||
make_array( | ||||||||||||
Int64Array::from_iter( | ||||||||||||
v64.into_iter() | ||||||||||||
.map(Some) | ||||||||||||
.chain(std::iter::repeat(None).take(null_values)), | ||||||||||||
) | ||||||||||||
.to_data(), | ||||||||||||
), | ||||||||||||
], | ||||||||||||
) | ||||||||||||
.unwrap() | ||||||||||||
|
@@ -824,9 +863,17 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> { | |||||||||||
} | ||||||||||||
Scenario::WithNullValues => { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please also add the "some null/some not null" null values case to this scenario as well?
|
||||||||||||
vec![ | ||||||||||||
make_all_null_values(), | ||||||||||||
make_int_batches_with_null(5, 0, 0), | ||||||||||||
make_int_batches(1, 6), | ||||||||||||
make_all_null_values(), | ||||||||||||
make_int_batches_with_null(5, 0, 0), | ||||||||||||
] | ||||||||||||
} | ||||||||||||
Scenario::WithNullValuesPageLevel => { | ||||||||||||
vec![ | ||||||||||||
make_int_batches_with_null(5, 1, 6), | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this scenario should have at least one batch of all nulls (e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think this will be prune by row group -> column level |
||||||||||||
make_int_batches(1, 11), | ||||||||||||
make_int_batches_with_null(1, 1, 10), | ||||||||||||
make_int_batches_with_null(5, 1, 6), | ||||||||||||
] | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -871,6 +871,55 @@ async fn without_pushdown_filter() { | |
assert!(bytes_scanned_with_filter > bytes_scanned_without_filter); | ||
} | ||
|
||
#[tokio::test] | ||
// Data layout like this: | ||
// row_group1: page1(1~5), page2(All Null) | ||
// row_group2: page1(1~5), page2(6~10) | ||
// row_group3: page1(1~5), page2(6~9 + Null) | ||
// row_group4: page1(1~5), page2(All Null) | ||
// total 40 rows | ||
async fn test_pages_with_null_values() { | ||
test_prune( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It took me a while to convince myself that this was actually setting up the scenario as described. I eventually found it here: I wonder if it would be possible to add some better comments to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i will make another pr to improve this tests |
||
Scenario::WithNullValuesPageLevel, | ||
"SELECT * FROM t where i8 <= 6", | ||
Some(0), | ||
// expect prune two pages which 10 rows | ||
Ted-Jiang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Some(10), | ||
22, | ||
) | ||
.await; | ||
|
||
test_prune( | ||
Scenario::WithNullValuesPageLevel, | ||
"SELECT * FROM t where \"i16\" is not null", | ||
Some(0), | ||
// expect prune two pages which 10 rows | ||
Ted-Jiang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Some(10), | ||
29, | ||
) | ||
.await; | ||
|
||
test_prune( | ||
Scenario::WithNullValuesPageLevel, | ||
"SELECT * FROM t where \"i32\" is null", | ||
Some(0), | ||
// expect prune 5 pages which 25 rows | ||
Ted-Jiang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Some(25), | ||
11, | ||
) | ||
.await; | ||
|
||
test_prune( | ||
Scenario::WithNullValuesPageLevel, | ||
"SELECT * FROM t where \"i64\" > 6", | ||
Some(0), | ||
// 6 pages will be pruned which 30 rows | ||
Ted-Jiang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Some(30), | ||
7, | ||
) | ||
.await; | ||
} | ||
|
||
fn cast_count_metric(metric: MetricValue) -> Option<usize> { | ||
match metric { | ||
MetricValue::Count { count, .. } => Some(count.value()), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do this same calculation without allocating intermediate vec s-- something like this:
🤔 the name
col_offset_indexes
is somewhat confusing to me as they arePageLocation
s --maybe we could rename that field to
page_locations
🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb thanks to your always conscientious review and kindly suggestions . 👍
Avoid allocate in b2d3641
In my mind
page_locations
store the offsets in the column chunk , i think they are the same meaning 🤣