Skip to content

Commit

Permalink
date_part support fractions of second (#4385)
Browse files Browse the repository at this point in the history
* support seconds fraction in date_part

* support seconds fraction in date_part

* to be insync with pgsql

* fix merge

* fix bench

* fix bench
  • Loading branch information
comphead authored Dec 6, 2022
1 parent 19cddf5 commit cedb05a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 26 deletions.
6 changes: 3 additions & 3 deletions benchmarks/src/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,18 @@ pub fn get_answer_schema(n: usize) -> Schema {
7 => Schema::new(vec![
Field::new("supp_nation", DataType::Utf8, true),
Field::new("cust_nation", DataType::Utf8, true),
Field::new("l_year", DataType::Int32, true),
Field::new("l_year", DataType::Float64, true),
Field::new("revenue", DataType::Decimal128(15, 2), true),
]),

8 => Schema::new(vec![
Field::new("o_year", DataType::Int32, true),
Field::new("o_year", DataType::Float64, true),
Field::new("mkt_share", DataType::Decimal128(15, 2), true),
]),

9 => Schema::new(vec![
Field::new("nation", DataType::Utf8, true),
Field::new("o_year", DataType::Int32, true),
Field::new("o_year", DataType::Float64, true),
Field::new("sum_profit", DataType::Decimal128(15, 2), true),
]),

Expand Down
32 changes: 28 additions & 4 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,12 +1285,36 @@ async fn test_extract_date_part() -> Result<()> {
"12"
);
test_expression!(
"EXTRACT(second FROM to_timestamp('2020-09-08T12:00:12+00:00'))",
"12"
"EXTRACT(second FROM to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
"12.12345678"
);
test_expression!(
"date_part('second', to_timestamp('2020-09-08T12:00:12+00:00'))",
"12"
"EXTRACT(millisecond FROM to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
"12123.45678"
);
test_expression!(
"EXTRACT(microsecond FROM to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
"12123456.78"
);
// test_expression!(
// "EXTRACT(nanosecond FROM to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
// "1212345678"
// );
test_expression!(
"date_part('second', to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
"12.12345678"
);
test_expression!(
"date_part('millisecond', to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
"12123.45678"
);
test_expression!(
"date_part('microsecond', to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
"12123456.78"
);
test_expression!(
"date_part('nanosecond', to_timestamp('2020-09-08T12:00:12.12345678+00:00'))",
"12123456780"
);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub fn return_type(
}
BuiltinScalarFunction::Concat => Ok(DataType::Utf8),
BuiltinScalarFunction::ConcatWithSeparator => Ok(DataType::Utf8),
BuiltinScalarFunction::DatePart => Ok(DataType::Int32),
BuiltinScalarFunction::DatePart => Ok(DataType::Float64),
BuiltinScalarFunction::DateTrunc => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
Expand Down
91 changes: 73 additions & 18 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

//! DateTime expressions
use arrow::compute::cast;
use arrow::{
array::TimestampNanosecondArray, compute::kernels::temporal, datatypes::TimeUnit,
temporal_conversions::timestamp_ns_to_datetime,
};
use arrow::{
array::{Array, ArrayRef, OffsetSizeTrait, PrimitiveArray},
array::{Array, ArrayRef, Float64Array, OffsetSizeTrait, PrimitiveArray},
compute::kernels::cast_utils::string_to_timestamp_nanos,
datatypes::{
ArrowPrimitiveType, DataType, IntervalDayTimeType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
ArrowNumericType, ArrowPrimitiveType, ArrowTemporalType, DataType,
IntervalDayTimeType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
},
};
use chrono::prelude::*;
Expand Down Expand Up @@ -401,30 +403,36 @@ pub fn date_bin(args: &[ColumnarValue]) -> Result<ColumnarValue> {
macro_rules! extract_date_part {
($ARRAY: expr, $FN:expr) => {
match $ARRAY.data_type() {
DataType::Date32 => match as_date32_array($ARRAY) {
Ok(array) => Ok($FN(array)?),
Err(e) => Err(e),
},
DataType::Date32 => {
let array = as_date32_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
DataType::Date64 => {
let array = as_date64_array($ARRAY)?;
Ok($FN(array)?)
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
DataType::Timestamp(time_unit, None) => match time_unit {
TimeUnit::Second => {
let array = as_timestamp_second_array($ARRAY)?;
Ok($FN(array)?)
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
TimeUnit::Millisecond => {
let array = as_timestamp_millisecond_array($ARRAY)?;
Ok($FN(array)?)
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
TimeUnit::Microsecond => {
let array = as_timestamp_microsecond_array($ARRAY)?;
Ok($FN(array)?)
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
TimeUnit::Nanosecond => {
let array = as_timestamp_nanosecond_array($ARRAY)?;
Ok($FN(array)?)
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
},
datatype => Err(DataFusionError::Internal(format!(
Expand Down Expand Up @@ -469,23 +477,70 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
"dow" => extract_date_part!(&array, temporal::num_days_from_sunday),
"hour" => extract_date_part!(&array, temporal::hour),
"minute" => extract_date_part!(&array, temporal::minute),
"second" => extract_date_part!(&array, temporal::second),
"second" => extract_date_part!(&array, seconds),
"millisecond" => extract_date_part!(&array, millis),
"microsecond" => extract_date_part!(&array, micros),
"nanosecond" => extract_date_part!(&array, nanos),
_ => Err(DataFusionError::Execution(format!(
"Date part '{}' not supported",
date_part
))),
}?;

Ok(if is_scalar {
ColumnarValue::Scalar(ScalarValue::try_from_array(
&(Arc::new(arr) as ArrayRef),
0,
)?)
ColumnarValue::Scalar(ScalarValue::try_from_array(&arr?, 0)?)
} else {
ColumnarValue::Array(Arc::new(arr))
ColumnarValue::Array(arr?)
})
}

fn to_ticks<T>(array: &PrimitiveArray<T>, frac: i32) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let zipped = temporal::second(array)?
.values()
.iter()
.zip(temporal::nanosecond(array)?.values().iter())
.map(|o| ((*o.0 as f64 + (*o.1 as f64) / 1_000_000_000.0) * (frac as f64)))
.collect::<Vec<f64>>();

Ok(Float64Array::from(zipped))
}

fn seconds<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
to_ticks(array, 1)
}

fn millis<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
to_ticks(array, 1_000)
}

fn micros<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
to_ticks(array, 1_000_000)
}

fn nanos<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
to_ticks(array, 1_000_000_000)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down

0 comments on commit cedb05a

Please sign in to comment.