Skip to content

Commit

Permalink
Extract parquet statistics from timestamps with timezones (apache#10766)
Browse files Browse the repository at this point in the history
* Fix incorrect statistics read for timestamp columns in parquet
  • Loading branch information
xinlifoobar authored and findepi committed Jul 16, 2024
1 parent 95d1b62 commit 23dd169
Show file tree
Hide file tree
Showing 3 changed files with 615 additions and 99 deletions.
285 changes: 263 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::{array::ArrayRef, datatypes::DataType};
use arrow::{array::ArrayRef, datatypes::DataType, datatypes::TimeUnit};
use arrow_array::{new_empty_array, new_null_array, UInt64Array};
use arrow_schema::{Field, FieldRef, Schema};
use datafusion_common::{
Expand Down Expand Up @@ -112,6 +112,26 @@ macro_rules! get_statistic {
Some(DataType::UInt64) => {
Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
}
Some(DataType::Timestamp(unit, timezone)) => {
Some(match unit {
TimeUnit::Second => ScalarValue::TimestampSecond(
Some(*s.$func()),
timezone.clone(),
),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(
Some(*s.$func()),
timezone.clone(),
),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(
Some(*s.$func()),
timezone.clone(),
),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(
Some(*s.$func()),
timezone.clone(),
),
})
}
_ => Some(ScalarValue::Int64(Some(*s.$func()))),
}
}
Expand Down Expand Up @@ -395,7 +415,8 @@ mod test {
use arrow_array::{
new_null_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray,
Int8Array, RecordBatch, StringArray, StructArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
Expand Down Expand Up @@ -536,28 +557,209 @@ mod test {
}

#[test]
#[should_panic(
expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Int64, got TimestampNanosecond(NULL, None)"
)]
// Due to https://github.com/apache/datafusion/issues/8295
fn roundtrip_timestamp() {
Test {
input: timestamp_array([
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
input: timestamp_seconds_array(
[
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
None,
// row group 3
None,
None,
None,
],
None,
// row group 3
),
expected_min: timestamp_seconds_array([Some(1), Some(5), None], None),
expected_max: timestamp_seconds_array([Some(3), Some(9), None], None),
}
.run();

Test {
input: timestamp_milliseconds_array(
[
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
None,
// row group 3
None,
None,
None,
],
None,
),
expected_min: timestamp_milliseconds_array([Some(1), Some(5), None], None),
expected_max: timestamp_milliseconds_array([Some(3), Some(9), None], None),
}
.run();

Test {
input: timestamp_microseconds_array(
[
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
None,
// row group 3
None,
None,
None,
],
None,
),
expected_min: timestamp_microseconds_array([Some(1), Some(5), None], None),
expected_max: timestamp_microseconds_array([Some(3), Some(9), None], None),
}
.run();

Test {
input: timestamp_nanoseconds_array(
[
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
None,
// row group 3
None,
None,
None,
],
None,
]),
expected_min: timestamp_array([Some(1), Some(5), None]),
expected_max: timestamp_array([Some(3), Some(9), None]),
),
expected_min: timestamp_nanoseconds_array([Some(1), Some(5), None], None),
expected_max: timestamp_nanoseconds_array([Some(3), Some(9), None], None),
}
.run()
}

#[test]
fn roundtrip_timestamp_timezoned() {
Test {
input: timestamp_seconds_array(
[
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
None,
// row group 3
None,
None,
None,
],
Some("UTC"),
),
expected_min: timestamp_seconds_array([Some(1), Some(5), None], Some("UTC")),
expected_max: timestamp_seconds_array([Some(3), Some(9), None], Some("UTC")),
}
.run();

Test {
input: timestamp_milliseconds_array(
[
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
None,
// row group 3
None,
None,
None,
],
Some("UTC"),
),
expected_min: timestamp_milliseconds_array(
[Some(1), Some(5), None],
Some("UTC"),
),
expected_max: timestamp_milliseconds_array(
[Some(3), Some(9), None],
Some("UTC"),
),
}
.run();

Test {
input: timestamp_microseconds_array(
[
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
None,
// row group 3
None,
None,
None,
],
Some("UTC"),
),
expected_min: timestamp_microseconds_array(
[Some(1), Some(5), None],
Some("UTC"),
),
expected_max: timestamp_microseconds_array(
[Some(3), Some(9), None],
Some("UTC"),
),
}
.run();

Test {
input: timestamp_nanoseconds_array(
[
// row group 1
Some(1),
None,
Some(3),
// row group 2
Some(9),
Some(5),
None,
// row group 3
None,
None,
None,
],
Some("UTC"),
),
expected_min: timestamp_nanoseconds_array(
[Some(1), Some(5), None],
Some("UTC"),
),
expected_max: timestamp_nanoseconds_array(
[Some(3), Some(9), None],
Some("UTC"),
),
}
.run()
}
Expand Down Expand Up @@ -914,8 +1116,8 @@ mod test {
// File has no min/max for timestamp_col
.with_column(ExpectedColumn {
name: "timestamp_col",
expected_min: timestamp_array([None]),
expected_max: timestamp_array([None]),
expected_min: timestamp_nanoseconds_array([None], None),
expected_max: timestamp_nanoseconds_array([None], None),
})
.with_column(ExpectedColumn {
name: "year",
Expand Down Expand Up @@ -1135,9 +1337,48 @@ mod test {
Arc::new(array)
}

fn timestamp_array(input: impl IntoIterator<Item = Option<i64>>) -> ArrayRef {
fn timestamp_seconds_array(
input: impl IntoIterator<Item = Option<i64>>,
timzezone: Option<&str>,
) -> ArrayRef {
let array: TimestampSecondArray = input.into_iter().collect();
match timzezone {
Some(tz) => Arc::new(array.with_timezone(tz)),
None => Arc::new(array),
}
}

fn timestamp_milliseconds_array(
input: impl IntoIterator<Item = Option<i64>>,
timzezone: Option<&str>,
) -> ArrayRef {
let array: TimestampMillisecondArray = input.into_iter().collect();
match timzezone {
Some(tz) => Arc::new(array.with_timezone(tz)),
None => Arc::new(array),
}
}

fn timestamp_microseconds_array(
input: impl IntoIterator<Item = Option<i64>>,
timzezone: Option<&str>,
) -> ArrayRef {
let array: TimestampMicrosecondArray = input.into_iter().collect();
match timzezone {
Some(tz) => Arc::new(array.with_timezone(tz)),
None => Arc::new(array),
}
}

fn timestamp_nanoseconds_array(
input: impl IntoIterator<Item = Option<i64>>,
timzezone: Option<&str>,
) -> ArrayRef {
let array: TimestampNanosecondArray = input.into_iter().collect();
Arc::new(array)
match timzezone {
Some(tz) => Arc::new(array.with_timezone(tz)),
None => Arc::new(array),
}
}

fn utf8_array<'a>(input: impl IntoIterator<Item = Option<&'a str>>) -> ArrayRef {
Expand Down
Loading

0 comments on commit 23dd169

Please sign in to comment.