From b1702c4a671402181f45815ecf6298aae26e9e68 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 8 Aug 2021 22:04:49 -0700 Subject: [PATCH] [ballista] support date_part and date_turnc ser/de, pass tpch 7 --- ballista/rust/core/proto/ballista.proto | 17 ++-- .../core/src/serde/logical_plan/from_proto.rs | 99 ++++++++----------- .../core/src/serde/logical_plan/to_proto.rs | 5 +- .../src/serde/physical_plan/from_proto.rs | 1 + benchmarks/run.sh | 2 +- benchmarks/src/bin/tpch.rs | 1 + datafusion/src/logical_plan/expr.rs | 19 +++- datafusion/src/logical_plan/mod.rs | 15 +-- datafusion/src/physical_plan/functions.rs | 4 +- datafusion/src/prelude.rs | 9 +- 10 files changed, 88 insertions(+), 84 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 9dbce81c21f1..2538a10ceda3 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -144,18 +144,19 @@ enum ScalarFunction { TOTIMESTAMP = 24; ARRAY = 25; NULLIF = 26; - DATETRUNC = 27; - MD5 = 28; - SHA224 = 29; - SHA256 = 30; - SHA384 = 31; - SHA512 = 32; - LN = 33; + DATEPART = 27; + DATETRUNC = 28; + MD5 = 29; + SHA224 = 30; + SHA256 = 31; + SHA384 = 32; + SHA512 = 33; + LN = 34; } message ScalarFunctionNode { ScalarFunction fun = 1; - repeated LogicalExprNode expr = 2; + repeated LogicalExprNode args = 2; } enum AggregateFunction { diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 2665e33137b5..31b8b6d3bcbc 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -988,77 +988,58 @@ impl TryInto for &protobuf::LogicalExprNode { expr.fun )) })?; + let args = &expr.args; + match scalar_function { - protobuf::ScalarFunction::Sqrt => { - Ok(sqrt((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Sin => Ok(sin((&expr.expr[0]).try_into()?)), - protobuf::ScalarFunction::Cos => Ok(cos((&expr.expr[0]).try_into()?)), - protobuf::ScalarFunction::Tan => Ok(tan((&expr.expr[0]).try_into()?)), - // protobuf::ScalarFunction::Asin => Ok(asin(&expr.expr[0]).try_into()?)), - // protobuf::ScalarFunction::Acos => Ok(acos(&expr.expr[0]).try_into()?)), - protobuf::ScalarFunction::Atan => { - Ok(atan((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Exp => Ok(exp((&expr.expr[0]).try_into()?)), - protobuf::ScalarFunction::Log2 => { - Ok(log2((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Ln => Ok(ln((&expr.expr[0]).try_into()?)), - protobuf::ScalarFunction::Log10 => { - Ok(log10((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Floor => { - Ok(floor((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Ceil => { - Ok(ceil((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Round => { - Ok(round((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Trunc => { - Ok(trunc((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Abs => Ok(abs((&expr.expr[0]).try_into()?)), + protobuf::ScalarFunction::Sqrt => Ok(sqrt((&args[0]).try_into()?)), + protobuf::ScalarFunction::Sin => Ok(sin((&args[0]).try_into()?)), + protobuf::ScalarFunction::Cos => Ok(cos((&args[0]).try_into()?)), + protobuf::ScalarFunction::Tan => Ok(tan((&args[0]).try_into()?)), + // protobuf::ScalarFunction::Asin => Ok(asin(&args[0]).try_into()?)), + // protobuf::ScalarFunction::Acos => Ok(acos(&args[0]).try_into()?)), + protobuf::ScalarFunction::Atan => Ok(atan((&args[0]).try_into()?)), + protobuf::ScalarFunction::Exp => Ok(exp((&args[0]).try_into()?)), + protobuf::ScalarFunction::Log2 => Ok(log2((&args[0]).try_into()?)), + protobuf::ScalarFunction::Ln => Ok(ln((&args[0]).try_into()?)), + protobuf::ScalarFunction::Log10 => Ok(log10((&args[0]).try_into()?)), + protobuf::ScalarFunction::Floor => Ok(floor((&args[0]).try_into()?)), + protobuf::ScalarFunction::Ceil => Ok(ceil((&args[0]).try_into()?)), + protobuf::ScalarFunction::Round => Ok(round((&args[0]).try_into()?)), + protobuf::ScalarFunction::Trunc => Ok(trunc((&args[0]).try_into()?)), + protobuf::ScalarFunction::Abs => Ok(abs((&args[0]).try_into()?)), protobuf::ScalarFunction::Signum => { - Ok(signum((&expr.expr[0]).try_into()?)) + Ok(signum((&args[0]).try_into()?)) } protobuf::ScalarFunction::Octetlength => { - Ok(length((&expr.expr[0]).try_into()?)) - } - // // protobuf::ScalarFunction::Concat => Ok(concat((&expr.expr[0]).try_into()?)), - protobuf::ScalarFunction::Lower => { - Ok(lower((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Upper => { - Ok(upper((&expr.expr[0]).try_into()?)) - } - protobuf::ScalarFunction::Trim => { - Ok(trim((&expr.expr[0]).try_into()?)) + Ok(length((&args[0]).try_into()?)) } - protobuf::ScalarFunction::Ltrim => { - Ok(ltrim((&expr.expr[0]).try_into()?)) + // // protobuf::ScalarFunction::Concat => Ok(concat((&args[0]).try_into()?)), + protobuf::ScalarFunction::Lower => Ok(lower((&args[0]).try_into()?)), + protobuf::ScalarFunction::Upper => Ok(upper((&args[0]).try_into()?)), + protobuf::ScalarFunction::Trim => Ok(trim((&args[0]).try_into()?)), + protobuf::ScalarFunction::Ltrim => Ok(ltrim((&args[0]).try_into()?)), + protobuf::ScalarFunction::Rtrim => Ok(rtrim((&args[0]).try_into()?)), + // protobuf::ScalarFunction::Totimestamp => Ok(to_timestamp((&args[0]).try_into()?)), + // protobuf::ScalarFunction::Array => Ok(array((&args[0]).try_into()?)), + // // protobuf::ScalarFunction::Nullif => Ok(nulli((&args[0]).try_into()?)), + protobuf::ScalarFunction::Datepart => { + Ok(date_part((&args[0]).try_into()?, (&args[1]).try_into()?)) } - protobuf::ScalarFunction::Rtrim => { - Ok(rtrim((&expr.expr[0]).try_into()?)) + protobuf::ScalarFunction::Datetrunc => { + Ok(date_trunc((&args[0]).try_into()?, (&args[1]).try_into()?)) } - // protobuf::ScalarFunction::Totimestamp => Ok(to_timestamp((&expr.expr[0]).try_into()?)), - // protobuf::ScalarFunction::Array => Ok(array((&expr.expr[0]).try_into()?)), - // // protobuf::ScalarFunction::Nullif => Ok(nulli((&expr.expr[0]).try_into()?)), - // protobuf::ScalarFunction::Datetrunc => Ok(date_trunc((&expr.expr[0]).try_into()?)), - // protobuf::ScalarFunction::Md5 => Ok(md5((&expr.expr[0]).try_into()?)), + // protobuf::ScalarFunction::Md5 => Ok(md5((&args[0]).try_into()?)), protobuf::ScalarFunction::Sha224 => { - Ok(sha224((&expr.expr[0]).try_into()?)) + Ok(sha224((&args[0]).try_into()?)) } protobuf::ScalarFunction::Sha256 => { - Ok(sha256((&expr.expr[0]).try_into()?)) + Ok(sha256((&args[0]).try_into()?)) } protobuf::ScalarFunction::Sha384 => { - Ok(sha384((&expr.expr[0]).try_into()?)) + Ok(sha384((&args[0]).try_into()?)) } protobuf::ScalarFunction::Sha512 => { - Ok(sha512((&expr.expr[0]).try_into()?)) + Ok(sha512((&args[0]).try_into()?)) } _ => Err(proto_error( "Protobuf deserialization error: Unsupported scalar function", @@ -1119,10 +1100,10 @@ impl TryInto for &protobuf::Field { } } -use datafusion::physical_plan::datetime_expressions::{date_trunc, to_timestamp}; use datafusion::physical_plan::{aggregates, windows}; use datafusion::prelude::{ - array, length, lower, ltrim, md5, rtrim, sha224, sha256, sha384, sha512, trim, upper, + array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256, + sha384, sha512, trim, upper, }; use std::convert::TryFrom; diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 87f26a118e78..1a3834af59d9 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1065,7 +1065,7 @@ impl TryInto for &Expr { Expr::ScalarVariable(_) => unimplemented!(), Expr::ScalarFunction { ref fun, ref args } => { let fun: protobuf::ScalarFunction = fun.try_into()?; - let expr: Vec = args + let args: Vec = args .iter() .map(|e| e.try_into()) .collect::, BallistaError>>()?; @@ -1074,7 +1074,7 @@ impl TryInto for &Expr { protobuf::logical_expr_node::ExprType::ScalarFunction( protobuf::ScalarFunctionNode { fun: fun.into(), - expr, + args, }, ), ), @@ -1374,6 +1374,7 @@ impl TryInto for &BuiltinScalarFunction { } BuiltinScalarFunction::Array => Ok(protobuf::ScalarFunction::Array), BuiltinScalarFunction::NullIf => Ok(protobuf::ScalarFunction::Nullif), + BuiltinScalarFunction::DatePart => Ok(protobuf::ScalarFunction::Datepart), BuiltinScalarFunction::DateTrunc => Ok(protobuf::ScalarFunction::Datetrunc), BuiltinScalarFunction::MD5 => Ok(protobuf::ScalarFunction::Md5), BuiltinScalarFunction::SHA224 => Ok(protobuf::ScalarFunction::Sha224), diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 509044b3d1ba..678bcde8fa73 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -501,6 +501,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Totimestamp => BuiltinScalarFunction::ToTimestamp, ScalarFunction::Array => BuiltinScalarFunction::Array, ScalarFunction::Nullif => BuiltinScalarFunction::NullIf, + ScalarFunction::Datepart => BuiltinScalarFunction::DatePart, ScalarFunction::Datetrunc => BuiltinScalarFunction::DateTrunc, ScalarFunction::Md5 => BuiltinScalarFunction::MD5, ScalarFunction::Sha224 => BuiltinScalarFunction::SHA224, diff --git a/benchmarks/run.sh b/benchmarks/run.sh index 8e36424da89f..b1f47a24c2d8 100755 --- a/benchmarks/run.sh +++ b/benchmarks/run.sh @@ -20,7 +20,7 @@ set -e # This bash script is meant to be run inside the docker-compose environment. Check the README for instructions cd / -for query in 1 3 5 6 10 12 +for query in 1 3 5 6 7 10 12 do /tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug done diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 978fbaa9afe7..10b5c2db795f 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1140,6 +1140,7 @@ mod tests { test_round_trip!(q3, 3); test_round_trip!(q5, 5); test_round_trip!(q6, 6); + test_round_trip!(q7, 7); test_round_trip!(q10, 10); test_round_trip!(q12, 12); } diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index 8b0e647261da..e4952840487b 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -1421,7 +1421,20 @@ macro_rules! unary_scalar_expr { }; } -// generate methods for creating the supported unary expressions +/// Create an convenience function representing a /binaryunary scalar function +macro_rules! binary_scalar_expr { + ($ENUM:ident, $FUNC:ident) => { + #[doc = "this scalar function is not documented yet"] + pub fn $FUNC(arg1: Expr, arg2: Expr) -> Expr { + Expr::ScalarFunction { + fun: functions::BuiltinScalarFunction::$ENUM, + args: vec![arg1, arg2], + } + } + }; +} + +// generate methods for creating the supported unary/binary expressions // math functions unary_scalar_expr!(Sqrt, sqrt); @@ -1478,6 +1491,10 @@ unary_scalar_expr!(Translate, translate); unary_scalar_expr!(Trim, trim); unary_scalar_expr!(Upper, upper); +// date functions +binary_scalar_expr!(DatePart, date_part); +binary_scalar_expr!(DateTrunc, date_trunc); + /// returns an array of fixed size with each argument on it. pub fn array(args: Vec) -> Expr { Expr::ScalarFunction { diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs index a021d06f0950..7f5ac2491843 100644 --- a/datafusion/src/logical_plan/mod.rs +++ b/datafusion/src/logical_plan/mod.rs @@ -38,13 +38,14 @@ pub use display::display_schema; pub use expr::{ abs, acos, and, array, ascii, asin, atan, avg, binary_expr, bit_length, btrim, case, ceil, character_length, chr, col, columnize_expr, combine_filters, concat, concat_ws, - cos, count, count_distinct, create_udaf, create_udf, exp, exprlist_to_fields, floor, - in_list, initcap, left, length, lit, ln, log10, log2, lower, lpad, ltrim, max, md5, - min, normalize_col, normalize_cols, now, octet_length, or, random, regexp_match, - regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad, rtrim, - sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, - substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col, unnormalize_cols, - upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal, Recursion, + cos, count, count_distinct, create_udaf, create_udf, date_part, date_trunc, exp, + exprlist_to_fields, floor, in_list, initcap, left, length, lit, ln, log10, log2, + lower, lpad, ltrim, max, md5, min, normalize_col, normalize_cols, now, octet_length, + or, random, regexp_match, regexp_replace, repeat, replace, replace_col, reverse, + right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, + sqrt, starts_with, strpos, substr, sum, tan, to_hex, translate, trim, trunc, + unnormalize_col, unnormalize_cols, upper, when, Column, Expr, ExprRewriter, + ExpressionVisitor, Literal, Recursion, }; pub use extension::UserDefinedLogicalNode; pub use operators::Operator; diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 7bb3cb456e9f..a005f56dd02a 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -277,8 +277,8 @@ impl FromStr for BuiltinScalarFunction { "concat" => BuiltinScalarFunction::Concat, "concat_ws" => BuiltinScalarFunction::ConcatWithSeparator, "chr" => BuiltinScalarFunction::Chr, - "date_part" => BuiltinScalarFunction::DatePart, - "date_trunc" => BuiltinScalarFunction::DateTrunc, + "date_part" | "datepart" => BuiltinScalarFunction::DatePart, + "date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc, "initcap" => BuiltinScalarFunction::InitCap, "left" => BuiltinScalarFunction::Left, "length" => BuiltinScalarFunction::CharacterLength, diff --git a/datafusion/src/prelude.rs b/datafusion/src/prelude.rs index e7ad04e74d1a..168e1d5df41a 100644 --- a/datafusion/src/prelude.rs +++ b/datafusion/src/prelude.rs @@ -29,9 +29,10 @@ pub use crate::dataframe::DataFrame; pub use crate::execution::context::{ExecutionConfig, ExecutionContext}; pub use crate::logical_plan::{ array, ascii, avg, bit_length, btrim, character_length, chr, col, concat, concat_ws, - count, create_udf, in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, - min, now, octet_length, random, regexp_replace, repeat, replace, reverse, right, - rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr, - sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning, + count, create_udf, date_part, date_trunc, in_list, initcap, left, length, lit, lower, + lpad, ltrim, max, md5, min, now, octet_length, random, regexp_replace, repeat, + replace, reverse, right, rpad, rtrim, sha224, sha256, sha384, sha512, split_part, + starts_with, strpos, substr, sum, to_hex, translate, trim, upper, Column, JoinType, + Partitioning, }; pub use crate::physical_plan::csv::CsvReadOptions;