Skip to content

Commit

Permalink
Minor: Simplify date_trunc code and add comments (#6783)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jun 28, 2023
1 parent a76b09e commit 0a4714d
Showing 1 changed file with 16 additions and 31 deletions.
47 changes: 16 additions & 31 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ fn quarter_month(date: &NaiveDateTime) -> u32 {
1 + 3 * ((date.month() - 1) / 3)
}

fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
/// Tuncates the single `value`, expressed in nanoseconds since the
/// epoch, for granularities greater than 1 second, in taking into
/// account that some granularities are not uniform durations of time
/// (e.g. months are not always the same lengths, leap seconds, etc)
fn date_trunc_coarse(granularity: &str, value: i64) -> Result<i64> {
if granularity == "millisecond" || granularity == "microsecond" {
return Ok(value);
}
Expand Down Expand Up @@ -266,11 +270,11 @@ fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
Ok(value.unwrap().timestamp_nanos())
}

// truncates a single value with the given timeunit to the specified granularity
fn _date_trunc(
tu: TimeUnit,
value: &Option<i64>,
granularity: &str,
f: impl Fn(Option<i64>) -> Result<Option<i64>>,
) -> Result<Option<i64>, DataFusionError> {
let scale = match tu {
TimeUnit::Second => 1_000_000_000,
Expand All @@ -284,9 +288,7 @@ fn _date_trunc(
};

// convert to nanoseconds
let Some(nano) = (f)(Some(value * scale))? else {
return Ok(None);
};
let nano = date_trunc_coarse(granularity, scale * value)?;

let result = match tu {
TimeUnit::Second => match granularity {
Expand Down Expand Up @@ -328,29 +330,24 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
));
};

let f = |x: Option<i64>| {
x.map(|x| date_trunc_single(granularity.as_str(), x))
.transpose()
};

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Nanosecond, v, granularity.as_str(), f)?;
let value = _date_trunc(TimeUnit::Nanosecond, v, granularity.as_str())?;
let value = ScalarValue::TimestampNanosecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Microsecond, v, granularity.as_str(), f)?;
let value = _date_trunc(TimeUnit::Microsecond, v, granularity.as_str())?;
let value = ScalarValue::TimestampMicrosecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Millisecond, v, granularity.as_str(), f)?;
let value = _date_trunc(TimeUnit::Millisecond, v, granularity.as_str())?;
let value = ScalarValue::TimestampMillisecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Second, v, granularity.as_str(), f)?;
let value = _date_trunc(TimeUnit::Second, v, granularity.as_str())?;
let value = ScalarValue::TimestampSecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
Expand All @@ -361,9 +358,7 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let array = as_timestamp_second_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(TimeUnit::Second, &x, granularity.as_str(), f)
})
.map(|x| _date_trunc(TimeUnit::Second, &x, granularity.as_str()))
.collect::<Result<TimestampSecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
}
Expand All @@ -372,12 +367,7 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Millisecond,
&x,
granularity.as_str(),
f,
)
_date_trunc(TimeUnit::Millisecond, &x, granularity.as_str())
})
.collect::<Result<TimestampMillisecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
Expand All @@ -387,12 +377,7 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Microsecond,
&x,
granularity.as_str(),
f,
)
_date_trunc(TimeUnit::Microsecond, &x, granularity.as_str())
})
.collect::<Result<TimestampMicrosecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
Expand All @@ -402,7 +387,7 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let array = array
.iter()
.map(|x| {
_date_trunc(TimeUnit::Nanosecond, &x, granularity.as_str(), f)
_date_trunc(TimeUnit::Nanosecond, &x, granularity.as_str())
})
.collect::<Result<TimestampNanosecondArray>>()?;

Expand Down Expand Up @@ -996,7 +981,7 @@ mod tests {
cases.iter().for_each(|(original, granularity, expected)| {
let left = string_to_timestamp_nanos(original).unwrap();
let right = string_to_timestamp_nanos(expected).unwrap();
let result = date_trunc_single(granularity, left).unwrap();
let result = date_trunc_coarse(granularity, left).unwrap();
assert_eq!(result, right, "{original} = {expected}");
});
}
Expand Down

0 comments on commit 0a4714d

Please sign in to comment.