Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet: Add tests for pruning on Int8/Int16/Int64 columns #9778

Merged
merged 3 commits into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ ctor = { workspace = true }
doc-comment = { workspace = true }
env_logger = { workspace = true }
half = { workspace = true, default-features = true }
paste = "^1.0"
postgres-protocol = "0.6.4"
postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] }
rand = { workspace = true, features = ["small_rng"] }
Expand Down
48 changes: 34 additions & 14 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use arrow::array::Decimal128Array;
use arrow::{
array::{
Array, ArrayRef, BinaryArray, Date32Array, Date64Array, FixedSizeBinaryArray,
Float64Array, Int32Array, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down Expand Up @@ -62,7 +63,7 @@ fn init() {
enum Scenario {
Timestamps,
Dates,
Int32,
Int,
Int32Range,
Float64,
Decimal,
Expand Down Expand Up @@ -389,12 +390,31 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
/// Return record batch with i32 sequence
///
/// Columns are named
/// "i" -> Int32Array
fn make_int32_batch(start: i32, end: i32) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let v: Vec<i32> = (start..end).collect();
let array = Arc::new(Int32Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
/// "i8" -> Int8Array
/// "i16" -> Int16Array
/// "i32" -> Int32Array
/// "i64" -> Int64Array
fn make_int_batches(start: i8, end: i8) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("i8", DataType::Int8, true),
Field::new("i16", DataType::Int16, true),
Field::new("i32", DataType::Int32, true),
Field::new("i64", DataType::Int64, true),
]));
let v8: Vec<i8> = (start..end).collect();
let v16: Vec<i16> = (start as _..end as _).collect();
let v32: Vec<i32> = (start as _..end as _).collect();
let v64: Vec<i64> = (start as _..end as _).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(Int8Array::from(v8)) as ArrayRef,
Arc::new(Int16Array::from(v16)) as ArrayRef,
Arc::new(Int32Array::from(v32)) as ArrayRef,
Arc::new(Int64Array::from(v64)) as ArrayRef,
],
)
.unwrap()
}

fn make_int32_range(start: i32, end: i32) -> RecordBatch {
Expand Down Expand Up @@ -589,12 +609,12 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_date_batch(TimeDelta::try_days(3600).unwrap()),
]
}
Scenario::Int32 => {
Scenario::Int => {
vec![
make_int32_batch(-5, 0),
make_int32_batch(-4, 1),
make_int32_batch(0, 5),
make_int32_batch(5, 10),
make_int_batches(-5, 0),
make_int_batches(-4, 1),
make_int_batches(0, 5),
make_int_batches(5, 10),
]
}
Scenario::Int32Range => {
Expand Down
276 changes: 141 additions & 135 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,112 +371,149 @@ async fn prune_date64() {
assert_eq!(output.result_rows, 1, "{}", output.description());
}

#[tokio::test]
// null count min max
// page-0 0 -5 -1
// page-1 0 -4 0
// page-2 0 0 4
// page-3 0 5 9
async fn prune_int32_lt() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where i < 1",
Some(0),
Some(5),
11,
)
.await;
// result of sql "SELECT * FROM t where i < 1" is same as
// "SELECT * FROM t where -i > -1"
test_prune(
Scenario::Int32,
"SELECT * FROM t where -i > -1",
Some(0),
Some(5),
11,
)
.await;
}

#[tokio::test]
async fn prune_int32_gt() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where i > 8",
Some(0),
Some(15),
1,
)
.await;

test_prune(
Scenario::Int32,
"SELECT * FROM t where -i < -8",
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn prune_int32_eq() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where i = 1",
Some(0),
Some(15),
1,
)
.await;
}
#[tokio::test]
async fn prune_int32_scalar_fun_and_eq() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where abs(i) = 1 and i = 1",
Some(0),
Some(15),
1,
)
.await;
macro_rules! int_tests {
($bits:expr) => {
paste::item! {
#[tokio::test]
// null count min max
// page-0 0 -5 -1
// page-1 0 -4 0
// page-2 0 0 4
// page-3 0 5 9
async fn [<prune_int $bits _lt>]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} < 1", $bits),
Some(0),
Some(5),
11,
)
.await;
// result of sql "SELECT * FROM t where i < 1" is same as
// "SELECT * FROM t where -i > -1"
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where -i{} > -1", $bits),
Some(0),
Some(5),
11,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _gt >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} > 8", $bits),
Some(0),
Some(15),
1,
)
.await;

test_prune(
Scenario::Int,
&format!("SELECT * FROM t where -i{} < -8", $bits),
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _eq >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} = 1", $bits),
Some(0),
Some(15),
1,
)
.await;
}
#[tokio::test]
async fn [<prune_int $bits _scalar_fun_and_eq >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where abs(i{}) = 1 and i{} = 1", $bits, $bits),
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _scalar_fun >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where abs(i{}) = 1", $bits),
Some(0),
Some(0),
3,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _complex_expr>]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{}+1 = 1", $bits),
Some(0),
Some(0),
2,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _complex_expr_subtract >]() {
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where 1-i{} > 1", $bits),
Some(0),
Some(0),
9,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _eq_in_list >]() {
// result of sql "SELECT * FROM t where in (1)"
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} in (1)", $bits),
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn [<prune_int $bits _eq_in_list_negated >]() {
// result of sql "SELECT * FROM t where not in (1)" prune nothing
test_prune(
Scenario::Int,
&format!("SELECT * FROM t where i{} not in (1)", $bits),
Some(0),
Some(0),
19,
)
.await;
}
}
}
}

#[tokio::test]
async fn prune_int32_scalar_fun() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where abs(i) = 1",
Some(0),
Some(0),
3,
)
.await;
}

#[tokio::test]
async fn prune_int32_complex_expr() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where i+1 = 1",
Some(0),
Some(0),
2,
)
.await;
}

#[tokio::test]
async fn prune_int32_complex_expr_subtract() {
test_prune(
Scenario::Int32,
"SELECT * FROM t where 1-i > 1",
Some(0),
Some(0),
9,
)
.await;
}
int_tests!(8);
int_tests!(16);
int_tests!(32);
int_tests!(64);

#[tokio::test]
// null count min max
Expand Down Expand Up @@ -556,37 +593,6 @@ async fn prune_f64_complex_expr_subtract() {
.await;
}

#[tokio::test]
// null count min max
// page-0 0 -5 -1
// page-1 0 -4 0
// page-2 0 0 4
// page-3 0 5 9
async fn prune_int32_eq_in_list() {
// result of sql "SELECT * FROM t where in (1)"
test_prune(
Scenario::Int32,
"SELECT * FROM t where i in (1)",
Some(0),
Some(15),
1,
)
.await;
}

#[tokio::test]
async fn prune_int32_eq_in_list_negated() {
// result of sql "SELECT * FROM t where not in (1)" prune nothing
test_prune(
Scenario::Int32,
"SELECT * FROM t where i not in (1)",
Some(0),
Some(0),
19,
)
.await;
}

#[tokio::test]
async fn prune_decimal_lt() {
// The data type of decimal_col is decimal(9,2)
Expand Down
Loading
Loading