Skip to content

Commit

Permalink
Minor: refactor data_trunc to reduce duplicated code (apache#8430)
Browse files Browse the repository at this point in the history
* refactor data_trunc

* fix cast to timestamp array

* fix cast to timestamp scalar

* fix doc
  • Loading branch information
Weijun-H authored and appletreeisyellow committed Dec 15, 2023
1 parent ebdc7da commit ddfa774
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 99 deletions.
15 changes: 15 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use arrow::{
},
};
use arrow_array::cast::as_list_array;
use arrow_array::types::ArrowTimestampType;
use arrow_array::{ArrowNativeTypeOp, Scalar};

/// A dynamically typed, nullable single value, (the single-valued counter-part
Expand Down Expand Up @@ -774,6 +775,20 @@ impl ScalarValue {
ScalarValue::IntervalMonthDayNano(Some(val))
}

/// Returns a [`ScalarValue`] representing
/// `value` and `tz_opt` timezone
pub fn new_timestamp<T: ArrowTimestampType>(
value: Option<i64>,
tz_opt: Option<Arc<str>>,
) -> Self {
match T::UNIT {
TimeUnit::Second => ScalarValue::TimestampSecond(value, tz_opt),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(value, tz_opt),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(value, tz_opt),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(value, tz_opt),
}
}

/// Create a zero value in the given type.
pub fn new_zero(datatype: &DataType) -> Result<ScalarValue> {
assert!(datatype.is_primitive());
Expand Down
137 changes: 38 additions & 99 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ use arrow::{
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
},
};
use arrow_array::types::ArrowTimestampType;
use arrow_array::{
timezone::Tz, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampSecondArray,
};
use chrono::prelude::*;
use chrono::{Duration, Months, NaiveDate};
use datafusion_common::cast::{
as_date32_array, as_date64_array, as_generic_string_array,
as_date32_array, as_date64_array, as_generic_string_array, as_primitive_array,
as_timestamp_microsecond_array, as_timestamp_millisecond_array,
as_timestamp_nanosecond_array, as_timestamp_second_array,
};
Expand Down Expand Up @@ -335,7 +336,7 @@ fn date_trunc_coarse(granularity: &str, value: i64, tz: Option<Tz>) -> Result<i6
}

// truncates a single value with the given timeunit to the specified granularity
fn _date_trunc(
fn general_date_trunc(
tu: TimeUnit,
value: &Option<i64>,
tz: Option<Tz>,
Expand Down Expand Up @@ -403,123 +404,61 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
return exec_err!("Granularity of `date_trunc` must be non-null scalar Utf8");
};

fn process_array<T: ArrowTimestampType>(
array: &dyn Array,
granularity: String,
tz_opt: &Option<Arc<str>>,
) -> Result<ColumnarValue> {
let parsed_tz = parse_tz(tz_opt)?;
let array = as_primitive_array::<T>(array)?;
let array = array
.iter()
.map(|x| general_date_trunc(T::UNIT, &x, parsed_tz, granularity.as_str()))
.collect::<Result<PrimitiveArray<T>>>()?
.with_timezone_opt(tz_opt.clone());
Ok(ColumnarValue::Array(Arc::new(array)))
}

fn process_scalr<T: ArrowTimestampType>(
v: &Option<i64>,
granularity: String,
tz_opt: &Option<Arc<str>>,
) -> Result<ColumnarValue> {
let parsed_tz = parse_tz(tz_opt)?;
let value = general_date_trunc(T::UNIT, v, parsed_tz, granularity.as_str())?;
let value = ScalarValue::new_timestamp::<T>(value, tz_opt.clone());
Ok(ColumnarValue::Scalar(value))
}

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
let parsed_tz = parse_tz(tz_opt)?;
let value =
_date_trunc(TimeUnit::Nanosecond, v, parsed_tz, granularity.as_str())?;
let value = ScalarValue::TimestampNanosecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
process_scalr::<TimestampNanosecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
let parsed_tz = parse_tz(tz_opt)?;
let value =
_date_trunc(TimeUnit::Microsecond, v, parsed_tz, granularity.as_str())?;
let value = ScalarValue::TimestampMicrosecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
process_scalr::<TimestampMicrosecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
let parsed_tz = parse_tz(tz_opt)?;
let value =
_date_trunc(TimeUnit::Millisecond, v, parsed_tz, granularity.as_str())?;
let value = ScalarValue::TimestampMillisecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
process_scalr::<TimestampMillisecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
let parsed_tz = parse_tz(tz_opt)?;
let value =
_date_trunc(TimeUnit::Second, v, parsed_tz, granularity.as_str())?;
let value = ScalarValue::TimestampSecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
process_scalr::<TimestampSecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Array(array) => {
let array_type = array.data_type();
match array_type {
DataType::Timestamp(TimeUnit::Second, tz_opt) => {
let parsed_tz = parse_tz(tz_opt)?;
let array = as_timestamp_second_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Second,
&x,
parsed_tz,
granularity.as_str(),
)
})
.collect::<Result<TimestampSecondArray>>()?
.with_timezone_opt(tz_opt.clone());
ColumnarValue::Array(Arc::new(array))
process_array::<TimestampSecondType>(array, granularity, tz_opt)?
}
DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
let parsed_tz = parse_tz(tz_opt)?;
let array = as_timestamp_millisecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Millisecond,
&x,
parsed_tz,
granularity.as_str(),
)
})
.collect::<Result<TimestampMillisecondArray>>()?
.with_timezone_opt(tz_opt.clone());
ColumnarValue::Array(Arc::new(array))
process_array::<TimestampMillisecondType>(array, granularity, tz_opt)?
}
DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
let parsed_tz = parse_tz(tz_opt)?;
let array = as_timestamp_microsecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Microsecond,
&x,
parsed_tz,
granularity.as_str(),
)
})
.collect::<Result<TimestampMicrosecondArray>>()?
.with_timezone_opt(tz_opt.clone());
ColumnarValue::Array(Arc::new(array))
process_array::<TimestampMicrosecondType>(array, granularity, tz_opt)?
}
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
let parsed_tz = parse_tz(tz_opt)?;
let array = as_timestamp_nanosecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Nanosecond,
&x,
parsed_tz,
granularity.as_str(),
)
})
.collect::<Result<TimestampNanosecondArray>>()?
.with_timezone_opt(tz_opt.clone());
ColumnarValue::Array(Arc::new(array))
}
_ => {
let parsed_tz = None;
let array = as_timestamp_nanosecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Nanosecond,
&x,
parsed_tz,
granularity.as_str(),
)
})
.collect::<Result<TimestampNanosecondArray>>()?;

ColumnarValue::Array(Arc::new(array))
process_array::<TimestampNanosecondType>(array, granularity, tz_opt)?
}
_ => process_array::<TimestampNanosecondType>(array, granularity, &None)?,
}
}
_ => {
Expand Down

0 comments on commit ddfa774

Please sign in to comment.