From 96e0ee610c348753f4cf6f33d817cf2c11a23692 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 21 May 2024 15:14:52 -0400 Subject: [PATCH] test: add more tests for statistics reading (#10592) * test: add more tests for statistics reading * Link bug tickets to the tests and run fmt --- .../core/tests/parquet/arrow_statistics.rs | 291 +++++++++++++++++- datafusion/core/tests/parquet/mod.rs | 2 +- 2 files changed, 277 insertions(+), 16 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 272afea7b28a..432d109b2235 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -22,8 +22,8 @@ use std::fs::File; use std::sync::Arc; use arrow_array::{ - make_array, Array, ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, - RecordBatch, UInt64Array, + make_array, Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array, }; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ @@ -624,20 +624,281 @@ async fn test_dates_64_diff_rg_sizes() { .run("date64"); } +// BUG: +// https://github.com/apache/datafusion/issues/10604 +#[tokio::test] +async fn test_uint() { + let row_per_group = 4; + + // This creates a parquet files of 4 columns named "u8", "u16", "u32", "u64" + // "u8" --> UInt8Array + // "u16" --> UInt16Array + // "u32" --> UInt32Array + // "u64" --> UInt64Array + + // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 5 row groups with size 4 + let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + + // u8 + // BUG: expect UInt8Array but returns Int32Array + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt8Array + expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt8Array + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + } + .run("u8"); + + // u16 + // BUG: expect UInt16Array but returns Int32Array + let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt16Array + expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt16Array + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + } + .run("u16"); + + // u32 + // BUG: expect UInt32Array but returns Int32Array + let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt32Array + expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt32Array + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + } + .run("u32"); + + // u64 + // BUG: expect UInt64rray but returns Int64Array + let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt64Array + expected_max: Arc::new(Int64Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt64Array + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + } + .run("u64"); +} + +#[tokio::test] +async fn test_int32_range() { + let row_per_group = 5; + // This creates a parquet file of 1 column "i" + // file has 2 record batches, each has 2 rows. They will be saved into one row group + let reader = parquet_file_many_columns(Scenario::Int32Range, row_per_group).await; + + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0])), + expected_max: Arc::new(Int32Array::from(vec![300000])), + expected_null_counts: UInt64Array::from(vec![0]), + expected_row_counts: UInt64Array::from(vec![4]), + } + .run("i"); +} + +// BUG: not convert UInt32Array to Int32Array +// https://github.com/apache/datafusion/issues/10604 +#[tokio::test] +async fn test_uint32_range() { + let row_per_group = 5; + // This creates a parquet file of 1 column "u" + // file has 2 record batches, each has 2 rows. They will be saved into one row group + let reader = parquet_file_many_columns(Scenario::UInt32Range, row_per_group).await; + + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0])), // shoudld be UInt32Array + expected_max: Arc::new(Int32Array::from(vec![300000])), // shoudld be UInt32Array + expected_null_counts: UInt64Array::from(vec![0]), + expected_row_counts: UInt64Array::from(vec![4]), + } + .run("u"); +} + +#[tokio::test] +async fn test_float64() { + let row_per_group = 5; + // This creates a parquet file of 1 column "f" + // file has 4 record batches, each has 5 rows. They will be saved into 4 row groups + let reader = parquet_file_many_columns(Scenario::Float64, row_per_group).await; + + Test { + reader, + expected_min: Arc::new(Float64Array::from(vec![-5.0, -4.0, -0.0, 5.0])), + expected_max: Arc::new(Float64Array::from(vec![-1.0, 0.0, 4.0, 9.0])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("f"); +} + +#[tokio::test] +async fn test_decimal() { + let row_per_group = 5; + // This creates a parquet file of 1 column "decimal_col" with decimal data type and precicion 9, scale 2 + // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups + let reader = parquet_file_many_columns(Scenario::Decimal, row_per_group).await; + + Test { + reader, + expected_min: Arc::new( + Decimal128Array::from(vec![100, -500, 2000]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + expected_max: Arc::new( + Decimal128Array::from(vec![600, 600, 6000]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("decimal_col"); +} + +// BUG: not convert BinaryArray to StringArray +// https://github.com/apache/datafusion/issues/10605 +#[tokio::test] +async fn test_byte() { + let row_per_group = 5; + + // This creates a parquet file of 4 columns + // "name" + // "service_string" + // "service_binary" + // "service_fixedsize" + + // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups + let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + + // column "name" + Test { + reader, + expected_min: Arc::new(StringArray::from(vec![ + "all frontends", + "mixed", + "all backends", + ])), + expected_max: Arc::new(StringArray::from(vec![ + "all frontends", + "mixed", + "all backends", + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("name"); + + // column "service_string" + let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + Test { + reader, + expected_min: Arc::new(StringArray::from(vec![ + "frontend five", + "backend one", + "backend eight", + ])), + expected_max: Arc::new(StringArray::from(vec![ + "frontend two", + "frontend six", + "backend six", + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("service_string"); + + // column "service_binary" + let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + Test { + reader, + expected_min: Arc::new(StringArray::from(vec![ + "frontend five", + "backend one", + "backend eight", + ])), // Shuld be BinaryArray + expected_max: Arc::new(StringArray::from(vec![ + "frontend two", + "frontend six", + "backend six", + ])), // Shuld be BinaryArray + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("service_binary"); + + // column "service_fixedsize" + // b"fe1", b"be1", b"be4" + let min_input = vec![vec![102, 101, 49], vec![98, 101, 49], vec![98, 101, 52]]; + // b"fe5", b"fe6", b"be8" + let max_input = vec![vec![102, 101, 55], vec![102, 101, 54], vec![98, 101, 56]]; + let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + Test { + reader, + expected_min: Arc::new( + FixedSizeBinaryArray::try_from_iter(min_input.into_iter()).unwrap(), + ), + expected_max: Arc::new( + FixedSizeBinaryArray::try_from_iter(max_input.into_iter()).unwrap(), + ), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("service_fixedsize"); +} + +// PeriodsInColumnNames +#[tokio::test] +async fn test_period_in_column_names() { + let row_per_group = 5; + // This creates a parquet file of 2 columns "name" and "service.name" + // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups + let reader = + parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; + + // column "name" + Test { + reader, + expected_min: Arc::new(StringArray::from(vec![ + "HTTP GET / DISPATCH", + "HTTP PUT / DISPATCH", + "HTTP GET / DISPATCH", + ])), + expected_max: Arc::new(StringArray::from(vec![ + "HTTP GET / DISPATCH", + "HTTP PUT / DISPATCH", + "HTTP GET / DISPATCH", + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("name"); + + // column "service.name" + let reader = + parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; + Test { + reader, + expected_min: Arc::new(StringArray::from(vec!["frontend", "backend", "backend"])), + expected_max: Arc::new(StringArray::from(vec![ + "frontend", "frontend", "backend", + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("service.name"); +} + // TODO: -// Other data types to tests -// `u8`, `u16`, `u32`, and `u64`, -// UInt, -// UInt32Range, -// Float64, -// Decimal, -// DecimalBloomFilterInt32, -// DecimalBloomFilterInt64, -// DecimalLargePrecision, -// DecimalLargePrecisionBloomFilter, -// ByteArray, -// PeriodsInColumnNames, -// WithNullValuesPageLevel, // WITHOUT Stats /////// NEGATIVE TESTS /////// diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 6e3f366b4373..bdc39c269d29 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -427,7 +427,7 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch { .unwrap() } -/// Return record batch with i8, i16, i32, and i64 sequences +/// Return record batch with u8, u16, u32, and u64 sequences /// /// Columns are named /// "u8" -> UInt8Array