Skip to content

Commit

Permalink
edit: simplified code
Browse files Browse the repository at this point in the history
  • Loading branch information
dust1 authored and jiacai2050 committed May 17, 2023
1 parent 981099c commit c1b1117
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 179 deletions.
17 changes: 17 additions & 0 deletions common_types/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,23 @@ pub fn cast_nanosecond_to_mills(array: &ArrayRef) -> Result<Arc<dyn Array>> {
}
}

pub fn cast_mills_to_nanosecond(array: &ArrayRef) -> Result<Arc<dyn Array>> {
let column = ColumnarValue::Array(array.clone());
let mills_column = cast_column(
&column,
&DataType::Timestamp(TimeUnit::Nanosecond, None),
&DEFAULT_DATAFUSION_CAST_OPTIONS,
)
.with_context(|| CastTimestamp {
data_type: DataType::Timestamp(TimeUnit::Nanosecond, None),
})?;

match mills_column {
ColumnarValue::Array(array) => Ok(array),
_ => Err(Error::NotImplemented),
}
}

fn cast_array<'a, T: 'static>(datum_kind: &DatumKind, array: &'a ArrayRef) -> Result<&'a T> {
array
.as_any()
Expand Down
2 changes: 1 addition & 1 deletion df_operator/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub enum ColumnarValue {
}

impl ColumnarValue {
fn into_df_columnar_value(self) -> DfColumnarValue {
pub fn into_df_columnar_value(self) -> DfColumnarValue {
match self {
ColumnarValue::Array(v) => DfColumnarValue::Array(v.to_arrow_array_ref()),
ColumnarValue::Scalar(v) => DfColumnarValue::Scalar(v.into_df_scalar_value()),
Expand Down
246 changes: 68 additions & 178 deletions df_operator/src/udfs/time_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

//! time_bucket UDF.
use arrow::datatypes::IntervalDayTimeType;
use chrono::{FixedOffset, TimeZone};
use std::sync::Arc;

use arrow::{array::TimestampNanosecondArray, datatypes::IntervalDayTimeType};
use common_types::{
column::{ColumnBlock, ColumnBlockBuilder, TimestampColumn},
datum::{Datum, DatumKind},
time::Timestamp,
column::{cast_mills_to_nanosecond, cast_nanosecond_to_mills, ColumnBlock},
datum::DatumKind,
};
use common_util::{define_result, error::BoxError};
use datafusion::{
common::cast::as_timestamp_nanosecond_array,
error::DataFusionError,
physical_expr::datetime_expressions::{date_bin, date_trunc},
physical_plan::ColumnarValue as DfColumnarValue,
scalar::ScalarValue,
Expand Down Expand Up @@ -52,12 +54,11 @@ pub enum Error {
#[snafu(display("Period of year only support P1Y."))]
UnsupportedYear,

#[snafu(display(
"Failed to truncate timestamp, timestamp:{}, period:{:?}",
timestamp,
period
))]
TruncateTimestamp { timestamp: i64, period: Period },
#[snafu(display("time_bucket only support Array of values."))]
UnsupportedScalar,

#[snafu(display("Failed to truncate timestamp. err: {}", source))]
TruncateTimestamp { source: DataFusionError },

#[snafu(display("Failed to build result column, err:{}", source))]
BuildColumn { source: common_types::column::Error },
Expand Down Expand Up @@ -120,19 +121,24 @@ fn make_signature() -> TypeSignature {
TypeSignature::OneOf(sigs)
}

struct TimeBucket<'a> {
column: &'a TimestampColumn,
struct TimeBucket {
column: DfColumnarValue,
period: Period,
}

impl<'a> TimeBucket<'a> {
impl TimeBucket {
fn parse_args(args: &[ColumnarValue]) -> Result<TimeBucket> {
ensure!(args.len() >= 2, InvalidArgNum);

let column = match &args[0] {
ColumnarValue::Array(block) => block.as_timestamp().context(NotTimestampColumn)?,
ColumnarValue::Array(block) => {
let mills_array = block.to_arrow_array_ref();
let nanos_array = cast_mills_to_nanosecond(&mills_array).context(BuildColumn)?;
DfColumnarValue::Array(nanos_array)
}
_ => return NotTimestampColumn.fail(),
};

let period = match &args[1] {
ColumnarValue::Scalar(value) => {
let period_str = value.as_str().context(NotPeriod)?;
Expand All @@ -145,27 +151,14 @@ impl<'a> TimeBucket<'a> {
}

fn call(&self) -> Result<ColumnBlock> {
let mut out_column_builder =
ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, self.column.num_rows());
for ts_opt in self.column.iter() {
match ts_opt {
Some(ts) => {
let truncated = self.period.truncate(ts).context(TruncateTimestamp {
timestamp: ts,
period: self.period,
})?;
out_column_builder
.append(Datum::Timestamp(truncated))
.context(BuildColumn)?;
}
None => {
out_column_builder
.append(Datum::Null)
.context(BuildColumn)?;
}
let truncate = self.period.truncate(&self.column)?;
match truncate {
DfColumnarValue::Array(array) => {
let mills_array = cast_nanosecond_to_mills(&array).context(BuildColumn)?;
ColumnBlock::try_cast_arrow_array_ref(&mills_array).context(BuildColumn)
}
_ => UnsupportedScalar.fail(),
}
Ok(out_column_builder.build())
}
}

Expand Down Expand Up @@ -239,166 +232,63 @@ impl Period {
Ok(parsed)
}

fn truncate(&self, ts: Timestamp) -> Option<Timestamp> {
fn truncate(&self, array: &DfColumnarValue) -> Result<DfColumnarValue> {
const MILLIS_SECONDS: i32 = 1000;
const MINUTE_SECONDS: i32 = 60 * MILLIS_SECONDS;
const HOUR_SECONDS: i32 = 60 * MINUTE_SECONDS;

let truncated_ts = match self {
Period::Second(period) => Self::truncate_mills(ts, i32::from(*period) * MILLIS_SECONDS),
Period::Minute(period) => Self::truncate_mills(ts, i32::from(*period) * MINUTE_SECONDS),
Period::Hour(period) => Self::truncate_mills(ts, i32::from(*period) * HOUR_SECONDS),
Period::Day(period) => Self::truncate_day(ts, *period)?,
Period::Week => Self::truncate_time(ts, "week"),
Period::Month => Self::truncate_time(ts, "month"),
Period::Year => Self::truncate_time(ts, "year"),
};

Some(truncated_ts)
}

fn truncate_mills(ts: Timestamp, period: i32) -> Timestamp {
let offset = FixedOffset::east_opt(DEFAULT_TIMEZONE_OFFSET_SECS).expect("won't panic");
let datetime = offset.timestamp_millis_opt(ts.as_i64()).unwrap();
let ts = datetime.timestamp_nanos();
let time = DfColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None));

let stride = IntervalDayTimeType::make_value(0, period);
let stride = DfColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(stride)));
let origin = DfColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(-DEFAULT_TIMEZONE_OFFSET_SECS as i64 * 1_000_000_000),
Some("+00:00".to_owned()),
));

let result = date_bin(&[stride, time, origin]).unwrap();

let truncated_ts: i64 = match result {
DfColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(time), _)) => {
offset.timestamp_nanos(time).timestamp_millis()
match self {
Period::Second(period) => {
Self::truncate_by_date_bin(array, 0, i32::from(*period) * MILLIS_SECONDS)
}
_ => 0,
};

Timestamp::new(truncated_ts)
Period::Minute(period) => {
Self::truncate_by_date_bin(array, 0, i32::from(*period) * MINUTE_SECONDS)
}
Period::Hour(period) => {
Self::truncate_by_date_bin(array, 0, i32::from(*period) * HOUR_SECONDS)
}
Period::Day(period) => Self::truncate_by_date_bin(array, *period as i32, 0),
Period::Week => Self::truncate_by_date_trunc(array, "week"),
Period::Month => Self::truncate_by_date_trunc(array, "month"),
Period::Year => Self::truncate_by_date_trunc(array, "year"),
}
}

fn truncate_day(ts: Timestamp, period: u16) -> Option<Timestamp> {
let offset = FixedOffset::east_opt(DEFAULT_TIMEZONE_OFFSET_SECS).expect("won't panic");
let datetime = offset.timestamp_millis_opt(ts.as_i64()).unwrap();
let nanos_ts = datetime.timestamp_nanos();
let time = DfColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(nanos_ts), None));

let stride = IntervalDayTimeType::make_value(period as i32, 0);
let stride = DfColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(stride)));

fn truncate_by_date_bin(
array: &DfColumnarValue,
day: i32,
mills: i32,
) -> Result<DfColumnarValue> {
let truncate_time = IntervalDayTimeType::make_value(day, mills);
let stride = DfColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(truncate_time)));
let origin = DfColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(-DEFAULT_TIMEZONE_OFFSET_SECS as i64 * 1_000_000_000),
Some("+00:00".to_owned()),
));
let result = date_bin(&[stride, time, origin]).unwrap();

match result {
DfColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(time), _)) => {
let truncated_ts = offset.timestamp_nanos(time).timestamp_millis();
Some(Timestamp::new(truncated_ts))
}
_ => None,
}
date_bin(&[stride, array.clone(), origin]).context(TruncateTimestamp)
}

fn truncate_time(ts: Timestamp, granularity: &str) -> Timestamp {
let offset = FixedOffset::east_opt(DEFAULT_TIMEZONE_OFFSET_SECS).expect("won't panic");
// Convert to local time. Won't panic.
let datetime = offset.timestamp_millis_opt(ts.as_i64()).unwrap();

let ts_nanos = datetime.timestamp_nanos();
let time = DfColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts_nanos), None));
fn truncate_by_date_trunc(
array: &DfColumnarValue,
granularity: &str,
) -> Result<DfColumnarValue> {
let granularity = DfColumnarValue::Scalar(ScalarValue::Utf8(Some(granularity.to_string())));
let result = date_trunc(&[granularity, time]).unwrap();
let year_truncated_ts: i64 = match result {
DfColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(time), _)) => {
offset.timestamp_nanos(time).timestamp_millis()
let trunc_array = date_trunc(&[granularity, array.clone()]).context(TruncateTimestamp)?;

let list = match trunc_array {
DfColumnarValue::Array(array) => {
let array = as_timestamp_nanosecond_array(&array).context(TruncateTimestamp)?;
array
.iter()
.map(|ts| {
ts.map(|t| Ok(t - DEFAULT_TIMEZONE_OFFSET_SECS as i64 * 1_000_000_000))
.transpose()
})
.collect::<Result<TimestampNanosecondArray>>()?
}
_ => 0,
_ => return UnsupportedScalar.fail(),
};

Timestamp::new(year_truncated_ts - DEFAULT_TIMEZONE_OFFSET_SECS as i64 * 1000)
}
}

#[cfg(test)]
mod test {

use common_types::time::Timestamp;

use super::Period;

struct TimeBucketTest {
pub ts: i64,
pub truncate_ts: i64,
}

#[test]
fn test_time_bucket_day() {
let tests = [TimeBucketTest {
ts: 1656777600000,
truncate_ts: 1656777600000,
}];
let period = Period::parse("P1D").unwrap();
for test in tests {
let ts = Timestamp::new(test.ts);
let truncate_ts = period.truncate(ts).unwrap();
assert_eq!(truncate_ts.as_i64(), test.truncate_ts);
}
}

#[test]
fn test_time_bucket_week() {
let tests = [TimeBucketTest {
ts: 1683383083000, // 2023-5-6 22:24:43
truncate_ts: 1682870400000, // 2023-5-1 0:0:0
}];
let period = Period::parse("P1W").unwrap();
for test in tests {
let ts = Timestamp::new(test.ts);
let truncate_ts = period.truncate(ts).unwrap();
assert_eq!(truncate_ts.as_i64(), test.truncate_ts);
}
}

#[test]
fn test_time_bucket_year() {
let tests = [
TimeBucketTest {
ts: 1656777600000,
truncate_ts: 1640966400000,
},
TimeBucketTest {
ts: 1659484800000,
truncate_ts: 1640966400000,
},
TimeBucketTest {
ts: 1659571200000,
truncate_ts: 1640966400000,
},
TimeBucketTest {
ts: 1659577320000,
truncate_ts: 1640966400000,
},
TimeBucketTest {
ts: 1659577422000,
truncate_ts: 1640966400000,
},
TimeBucketTest {
ts: 1659577423000,
truncate_ts: 1640966400000,
},
];
let period = Period::parse("P1Y").unwrap();
for test in tests {
let ts = Timestamp::new(test.ts);
let truncate_ts = period.truncate(ts).unwrap();
assert_eq!(truncate_ts.as_i64(), test.truncate_ts);
}
Ok(DfColumnarValue::Array(Arc::new(list)))
}
}

0 comments on commit c1b1117

Please sign in to comment.