Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement current_time scalar function #4054

Merged
merged 5 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1651,3 +1651,34 @@ async fn test_current_date() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_current_time() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select current_time() dt";
let results = execute_to_batches(&ctx, sql).await;
assert_eq!(
results[0]
.schema()
.field_with_name("dt")
.unwrap()
.data_type()
.to_owned(),
DataType::Time64(TimeUnit::Nanosecond)
);

let sql = "select case when current_time() = (now()::bigint % 86400000000000)::time then 'OK' else 'FAIL' end result";
let results = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+--------+",
"| result |",
"+--------+",
"| OK |",
"+--------+",
];
Comment on lines +1671 to +1680
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


assert_batches_eq!(expected, &results);
Ok(())
}
5 changes: 5 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub enum BuiltinScalarFunction {
Now,
///current_date
CurrentDate,
/// current_time
CurrentTime,
/// translate
Translate,
/// trim
Expand All @@ -181,6 +183,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Random
| BuiltinScalarFunction::Now
| BuiltinScalarFunction::CurrentDate
| BuiltinScalarFunction::CurrentTime
)
}
/// Returns the [Volatility] of the builtin function.
Expand Down Expand Up @@ -259,6 +262,7 @@ impl BuiltinScalarFunction {
// Stable builtin functions
BuiltinScalarFunction::Now => Volatility::Stable,
BuiltinScalarFunction::CurrentDate => Volatility::Stable,
BuiltinScalarFunction::CurrentTime => Volatility::Stable,

// Volatile builtin functions
BuiltinScalarFunction::Random => Volatility::Volatile,
Expand Down Expand Up @@ -315,6 +319,7 @@ impl FromStr for BuiltinScalarFunction {
"concat_ws" => BuiltinScalarFunction::ConcatWithSeparator,
"chr" => BuiltinScalarFunction::Chr,
"current_date" => BuiltinScalarFunction::CurrentDate,
"current_time" => BuiltinScalarFunction::CurrentTime,
"date_part" | "datepart" => BuiltinScalarFunction::DatePart,
"date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc,
"date_bin" => BuiltinScalarFunction::DateBin,
Expand Down
8 changes: 8 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,14 @@ pub fn current_date() -> Expr {
}
}

/// Returns current UTC time as a [`DataType::Time64`] value
pub fn current_time() -> Expr {
Expr::ScalarFunction {
fun: BuiltinScalarFunction::CurrentTime,
args: vec![],
}
}

/// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression.
pub fn case(expr: Expr) -> CaseBuilder {
CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None)
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ pub fn return_type(
Some("UTC".to_owned()),
)),
BuiltinScalarFunction::CurrentDate => Ok(DataType::Date32),
BuiltinScalarFunction::CurrentTime => Ok(DataType::Time64(TimeUnit::Nanosecond)),
BuiltinScalarFunction::Translate => {
utf8_to_str_type(&input_expr_types[0], "translate")
}
Expand Down
13 changes: 13 additions & 0 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,19 @@ pub fn make_current_date(
move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Date32(days)))
}

/// Create an implementation of `current_time()` that always returns the
/// specified current time.
///
/// The semantics of `current_time()` require it to return the same value
/// wherever it appears within a single statement. This value is
/// chosen during planning time.
pub fn make_current_time(
now_ts: DateTime<Utc>,
) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue> {
let nano = Some(now_ts.timestamp_nanos() % 86400000000000);
move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Time64(nano)))
}

fn quarter_month(date: &NaiveDateTime) -> u32 {
1 + 3 * ((date.month() - 1) / 3)
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@ pub fn create_physical_fun(
execution_props.query_execution_start_time,
))
}
BuiltinScalarFunction::CurrentTime => {
// bind value for current_time at plan time
Arc::new(datetime_expressions::make_current_time(
execution_props.query_execution_start_time,
))
Comment on lines +438 to +440
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@mingmwang all the current_time within a query uses the same query_execution_start_time

}
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
make_scalar_function(string_expressions::initcap::<i32>)(args)
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ enum ScalarFunction {
DateBin=68;
ArrowTypeof=69;
CurrentDate=70;
CurrentTime=71;
}

message ScalarFunctionNode {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds,
ScalarFunction::Now => Self::Now,
ScalarFunction::CurrentDate => Self::CurrentDate,
ScalarFunction::CurrentTime => Self::CurrentTime,
ScalarFunction::Translate => Self::Translate,
ScalarFunction::RegexpMatch => Self::RegexpMatch,
ScalarFunction::Coalesce => Self::Coalesce,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds,
BuiltinScalarFunction::Now => Self::Now,
BuiltinScalarFunction::CurrentDate => Self::CurrentDate,
BuiltinScalarFunction::CurrentTime => Self::CurrentTime,
BuiltinScalarFunction::Translate => Self::Translate,
BuiltinScalarFunction::RegexpMatch => Self::RegexpMatch,
BuiltinScalarFunction::Coalesce => Self::Coalesce,
Expand Down