From 54213824f51433e5ec585a8142d6bc99bd2ceb16 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 20 Aug 2024 23:13:52 -0700 Subject: [PATCH] Use specified commit to test --- native/Cargo.lock | 84 ++++++++- native/Cargo.toml | 18 +- .../core/src/execution/datafusion/planner.rs | 177 +++++++----------- native/core/src/execution/operators/filter.rs | 2 +- .../apache/comet/exec/CometJoinSuite.scala | 16 +- 5 files changed, 158 insertions(+), 139 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 67f196489e..aad554d52c 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -805,6 +805,7 @@ dependencies = [ [[package]] name = "datafusion" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -822,9 +823,11 @@ dependencies = [ "datafusion-expr", "datafusion-functions", "datafusion-functions-aggregate", + "datafusion-functions-window", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", + "datafusion-physical-expr-functions-aggregate", "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-sql", @@ -851,6 +854,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow-schema", "async-trait", @@ -948,6 +952,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -961,12 +966,14 @@ dependencies = [ "libc", "num_cpus", "object_store", + "paste", "sqlparser", ] [[package]] name = "datafusion-common-runtime" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "tokio", ] @@ -974,6 +981,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "chrono", @@ -993,6 +1001,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1000,6 +1009,9 @@ dependencies = [ "arrow-buffer", "chrono", "datafusion-common", + "datafusion-expr-common", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr-common", "paste", "serde_json", "sqlparser", @@ -1007,9 +1019,20 @@ dependencies = [ "strum_macros", ] +[[package]] +name = "datafusion-expr-common" +version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" +dependencies = [ + "arrow", + "datafusion-common", + "paste", +] + [[package]] name = "datafusion-functions" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "arrow-buffer", @@ -1035,6 +1058,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1042,15 +1066,32 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr", "datafusion-physical-expr-common", + "half", "log", "paste", "sqlparser", ] +[[package]] +name = "datafusion-functions-aggregate-common" +version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "datafusion-expr-common", + "datafusion-physical-expr-common", + "rand", +] + [[package]] name = "datafusion-functions-nested" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "arrow-array", @@ -1068,9 +1109,21 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-functions-window" +version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" +dependencies = [ + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr-common", + "log", +] + [[package]] name = "datafusion-optimizer" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "async-trait", @@ -1089,6 +1142,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1102,6 +1156,8 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-expr-common", + "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", "hashbrown", @@ -1117,28 +1173,47 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", "datafusion-common", - "datafusion-expr", + "datafusion-expr-common", "hashbrown", "rand", ] +[[package]] +name = "datafusion-physical-expr-functions-aggregate" +version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "datafusion-expr", + "datafusion-expr-common", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr-common", + "rand", +] + [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-physical-expr", "datafusion-physical-plan", + "itertools 0.12.1", ] [[package]] name = "datafusion-physical-plan" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1153,8 +1228,10 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate", + "datafusion-functions-aggregate-common", "datafusion-physical-expr", "datafusion-physical-expr-common", + "datafusion-physical-expr-functions-aggregate", "futures", "half", "hashbrown", @@ -1171,6 +1248,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "arrow-array", @@ -2665,9 +2743,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "sqlparser" -version = "0.49.0" +version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a404d0e14905361b918cb8afdb73605e25c1d5029312bd9785142dcb3aa49e" +checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac" dependencies = [ "log", "sqlparser_derive", diff --git a/native/Cargo.toml b/native/Cargo.toml index 61681af703..82598671c6 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -39,15 +39,15 @@ arrow-buffer = { version = "52.2.0" } arrow-data = { version = "52.2.0" } arrow-schema = { version = "52.2.0" } parquet = { version = "52.2.0", default-features = false, features = ["experimental"] } -datafusion-common = { path = "../../arrow-datafusion/datafusion/common" } -datafusion = { default-features = false, path = "../../arrow-datafusion/datafusion/core", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { path = "../../arrow-datafusion/datafusion/functions", features = ["crypto_expressions"] } -datafusion-functions-nested = { path = "../../arrow-datafusion/datafusion/functions-nested", default-features = false } -datafusion-expr = { path = "../../arrow-datafusion/datafusion/expr", default-features = false } -datafusion-execution = { path = "../../arrow-datafusion/datafusion/execution", default-features = false } -datafusion-physical-plan = { path = "../../arrow-datafusion/datafusion/physical-plan", default-features = false } -datafusion-physical-expr-common = { path = "../../arrow-datafusion/datafusion/physical-expr-common", default-features = false } -datafusion-physical-expr = { path = "../../arrow-datafusion/datafusion/physical-expr", default-features = false } +datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e" } +datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", features = ["crypto_expressions"] } +datafusion-functions-nested = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-execution = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-physical-plan = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-physical-expr-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.2.0" } datafusion-comet-proto = { path = "proto", version = "0.2.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index ed8a00c751..44ce80389e 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -105,13 +105,13 @@ use datafusion_common::{ use datafusion_expr::expr::find_df_window_func; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition}; use datafusion_physical_expr::window::WindowExpr; -use datafusion_physical_expr_common::aggregate::create_aggregate_expr; -use datafusion_physical_expr_common::expressions::Literal; +use datafusion_physical_expr::expressions::Literal; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; +use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; // For clippy error on type_complexity. type ExecResult = Result; @@ -1269,52 +1269,36 @@ impl PhysicalPlanner { Arc::new(Literal::new(ScalarValue::Int64(Some(0)))), )); - create_aggregate_expr( - &sum_udaf(), - &[child], - &[], - &[], - &[], - schema.as_ref(), - Some("count".to_string()), - false, - false, - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + AggregateExprBuilder::new(sum_udaf(), vec![child]) + .schema(schema) + .alias("count") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Min(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); - create_aggregate_expr( - &min_udaf(), - &[child], - &[], - &[], - &[], - schema.as_ref(), - Some("min".to_string()), - false, - false, - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + + AggregateExprBuilder::new(min_udaf(), vec![child]) + .schema(schema) + .alias("min") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Max(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); - create_aggregate_expr( - &max_udaf(), - &[child], - &[], - &[], - &[], - schema.as_ref(), - Some("max".to_string()), - false, - false, - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + + AggregateExprBuilder::new(max_udaf(), vec![child]) + .schema(schema) + .alias("max") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Sum(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; @@ -1328,18 +1312,13 @@ impl PhysicalPlanner { // cast to the result data type of SUM if necessary, we should not expect // a cast failure since it should have already been checked at Spark side let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); - create_aggregate_expr( - &sum_udaf(), - &[child], - &[], - &[], - &[], - schema.as_ref(), - Some("sum".to_string()), - false, - false, - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + + AggregateExprBuilder::new(sum_udaf(), vec![child]) + .schema(schema) + .alias("sum") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) } } } @@ -1366,79 +1345,54 @@ impl PhysicalPlanner { AggExprStruct::First(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let func = datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new()); - create_aggregate_expr( - &func, - &[child], - &[], - &[], - &[], - &schema, - Some("first".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(Arc::new(func), vec![child]) + .schema(schema) + .alias("first") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::Last(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let func = datafusion_expr::AggregateUDF::new_from_impl(LastValue::new()); - create_aggregate_expr( - &func, - &[child], - &[], - &[], - &[], - &schema, - Some("last".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(Arc::new(func), vec![child]) + .schema(schema) + .alias("last") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::BitAndAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; - create_aggregate_expr( - &bit_and_udaf(), - &[child], - &[], - &[], - &[], - &schema, - Some("bit_and".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(bit_and_udaf(), vec![child]) + .schema(schema) + .alias("bit_and") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::BitOrAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; - create_aggregate_expr( - &bit_or_udaf(), - &[child], - &[], - &[], - &[], - &schema, - Some("bit_or".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(bit_or_udaf(), vec![child]) + .schema(schema) + .alias("bit_or") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::BitXorAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; - create_aggregate_expr( - &bit_xor_udaf(), - &[child], - &[], - &[], - &[], - &schema, - Some("bit_xor".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(bit_xor_udaf(), vec![child]) + .schema(schema) + .alias("bit_xor") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::Covariance(expr) => { let child1 = self.create_expr(expr.child1.as_ref().unwrap(), schema.clone())?; @@ -1634,11 +1588,10 @@ impl PhysicalPlanner { &window_func, window_func_name, &window_args, - &[], partition_by, sort_exprs, window_frame.into(), - &input_schema, + input_schema.as_ref(), false, // TODO: Ignore nulls ) .map_err(|e| ExecutionError::DataFusionError(e.to_string())) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 902529ba51..88efb7bd49 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -126,7 +126,7 @@ impl FilterExec { let schema = input.schema(); if !check_support(predicate, &schema) { let selectivity = default_selectivity as f64 / 100.0; - let mut stats = input_stats.into_inexact(); + let mut stats = input_stats.to_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); stats.total_byte_size = stats .total_byte_size diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 0879e278d2..2cc129409f 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -42,7 +42,6 @@ class CometJoinSuite extends CometTestBase { } } - /* test("join - self join") { val df1 = testData.select(testData("key")).as("df1") val df2 = testData.select(testData("key")).as("df2") @@ -336,7 +335,6 @@ class CometJoinSuite extends CometTestBase { } } } - */ test("SortMergeJoin with join filter") { withSQLConf( @@ -344,7 +342,6 @@ class CometJoinSuite extends CometTestBase { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") { - /* val df1 = sql( "SELECT * FROM tbl_a JOIN tbl_b ON tbl_a._2 = tbl_b._1 AND " + "tbl_a._1 > tbl_b._2") @@ -369,24 +366,16 @@ class CometJoinSuite extends CometTestBase { "SELECT * FROM tbl_b RIGHT JOIN tbl_a ON tbl_a._2 = tbl_b._1 " + "AND tbl_a._1 > tbl_b._2") checkSparkAnswerAndOperator(df5) - */ val df6 = sql( "SELECT * FROM tbl_a FULL JOIN tbl_b ON tbl_a._2 = tbl_b._1 " + - "AND tbl_a._1 > tbl_b._2 ORDER BY tbl_a._1, tbl_a._2") - df6.explain() - df6.show(100) - // checkSparkAnswer(df6) + "AND tbl_a._1 > tbl_b._2") + checkSparkAnswerAndOperator(df6) - /* val df7 = sql( "SELECT * FROM tbl_b FULL JOIN tbl_a ON tbl_a._2 = tbl_b._1 " + "AND tbl_a._1 > tbl_b._2") - df7.explain() checkSparkAnswerAndOperator(df7) - */ - - /* val left = sql("SELECT * FROM tbl_a") val right = sql("SELECT * FROM tbl_b") @@ -406,7 +395,6 @@ class CometJoinSuite extends CometTestBase { val df11 = right.join(left, left("_2") === right("_1") && left("_2") >= right("_1"), "leftanti") checkSparkAnswerAndOperator(df11) - */ } } }