From 936b9ae034397fe37169d5118dc68dfdf738f371 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 20 Aug 2024 23:13:52 -0700 Subject: [PATCH] Use specified commit to test --- native/Cargo.lock | 42 ++++- native/Cargo.toml | 18 +- .../core/src/execution/datafusion/planner.rs | 177 +++++++----------- native/core/src/execution/operators/filter.rs | 2 +- .../apache/comet/exec/CometJoinSuite.scala | 16 +- 5 files changed, 115 insertions(+), 140 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 7af73e769d..b8c9081e2c 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "ahash" version = "0.8.11" @@ -374,7 +380,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", ] @@ -805,6 +811,7 @@ dependencies = [ [[package]] name = "datafusion" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -853,6 +860,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow-schema", "async-trait", @@ -950,6 +958,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -970,6 +979,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "tokio", ] @@ -977,6 +987,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "chrono", @@ -996,6 +1007,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1016,6 +1028,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "datafusion-common", @@ -1025,6 +1038,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "arrow-buffer", @@ -1050,6 +1064,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1069,6 +1084,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1081,6 +1097,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "arrow-array", @@ -1101,6 +1118,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "datafusion-common", "datafusion-expr", @@ -1111,6 +1129,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "async-trait", @@ -1129,6 +1148,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1159,6 +1179,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1171,6 +1192,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-functions-aggregate" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1185,6 +1207,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "datafusion-common", "datafusion-execution", @@ -1196,6 +1219,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "ahash", "arrow", @@ -1230,6 +1254,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "41.0.0" +source = "git+https://github.com/viirya/arrow-datafusion.git?rev=f98693e#f98693e16fa7494f2521096ee3c35aae7dac98fc" dependencies = [ "arrow", "arrow-array", @@ -1343,12 +1368,12 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.31" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f211bbe8e69bbd0cfdea405084f128ae8b4aaa6b0b522fc8f2b009084797920" +checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -1980,6 +2005,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + [[package]] name = "multimap" version = "0.8.3" diff --git a/native/Cargo.toml b/native/Cargo.toml index 61681af703..82598671c6 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -39,15 +39,15 @@ arrow-buffer = { version = "52.2.0" } arrow-data = { version = "52.2.0" } arrow-schema = { version = "52.2.0" } parquet = { version = "52.2.0", default-features = false, features = ["experimental"] } -datafusion-common = { path = "../../arrow-datafusion/datafusion/common" } -datafusion = { default-features = false, path = "../../arrow-datafusion/datafusion/core", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { path = "../../arrow-datafusion/datafusion/functions", features = ["crypto_expressions"] } -datafusion-functions-nested = { path = "../../arrow-datafusion/datafusion/functions-nested", default-features = false } -datafusion-expr = { path = "../../arrow-datafusion/datafusion/expr", default-features = false } -datafusion-execution = { path = "../../arrow-datafusion/datafusion/execution", default-features = false } -datafusion-physical-plan = { path = "../../arrow-datafusion/datafusion/physical-plan", default-features = false } -datafusion-physical-expr-common = { path = "../../arrow-datafusion/datafusion/physical-expr-common", default-features = false } -datafusion-physical-expr = { path = "../../arrow-datafusion/datafusion/physical-expr", default-features = false } +datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e" } +datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", features = ["crypto_expressions"] } +datafusion-functions-nested = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-execution = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-physical-plan = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-physical-expr-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } +datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.2.0" } datafusion-comet-proto = { path = "proto", version = "0.2.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index ed8a00c751..44ce80389e 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -105,13 +105,13 @@ use datafusion_common::{ use datafusion_expr::expr::find_df_window_func; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition}; use datafusion_physical_expr::window::WindowExpr; -use datafusion_physical_expr_common::aggregate::create_aggregate_expr; -use datafusion_physical_expr_common::expressions::Literal; +use datafusion_physical_expr::expressions::Literal; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; +use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; // For clippy error on type_complexity. type ExecResult = Result; @@ -1269,52 +1269,36 @@ impl PhysicalPlanner { Arc::new(Literal::new(ScalarValue::Int64(Some(0)))), )); - create_aggregate_expr( - &sum_udaf(), - &[child], - &[], - &[], - &[], - schema.as_ref(), - Some("count".to_string()), - false, - false, - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + AggregateExprBuilder::new(sum_udaf(), vec![child]) + .schema(schema) + .alias("count") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Min(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); - create_aggregate_expr( - &min_udaf(), - &[child], - &[], - &[], - &[], - schema.as_ref(), - Some("min".to_string()), - false, - false, - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + + AggregateExprBuilder::new(min_udaf(), vec![child]) + .schema(schema) + .alias("min") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Max(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); - create_aggregate_expr( - &max_udaf(), - &[child], - &[], - &[], - &[], - schema.as_ref(), - Some("max".to_string()), - false, - false, - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + + AggregateExprBuilder::new(max_udaf(), vec![child]) + .schema(schema) + .alias("max") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Sum(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; @@ -1328,18 +1312,13 @@ impl PhysicalPlanner { // cast to the result data type of SUM if necessary, we should not expect // a cast failure since it should have already been checked at Spark side let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); - create_aggregate_expr( - &sum_udaf(), - &[child], - &[], - &[], - &[], - schema.as_ref(), - Some("sum".to_string()), - false, - false, - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + + AggregateExprBuilder::new(sum_udaf(), vec![child]) + .schema(schema) + .alias("sum") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) } } } @@ -1366,79 +1345,54 @@ impl PhysicalPlanner { AggExprStruct::First(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let func = datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new()); - create_aggregate_expr( - &func, - &[child], - &[], - &[], - &[], - &schema, - Some("first".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(Arc::new(func), vec![child]) + .schema(schema) + .alias("first") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::Last(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let func = datafusion_expr::AggregateUDF::new_from_impl(LastValue::new()); - create_aggregate_expr( - &func, - &[child], - &[], - &[], - &[], - &schema, - Some("last".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(Arc::new(func), vec![child]) + .schema(schema) + .alias("last") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::BitAndAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; - create_aggregate_expr( - &bit_and_udaf(), - &[child], - &[], - &[], - &[], - &schema, - Some("bit_and".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(bit_and_udaf(), vec![child]) + .schema(schema) + .alias("bit_and") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::BitOrAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; - create_aggregate_expr( - &bit_or_udaf(), - &[child], - &[], - &[], - &[], - &schema, - Some("bit_or".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(bit_or_udaf(), vec![child]) + .schema(schema) + .alias("bit_or") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::BitXorAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; - create_aggregate_expr( - &bit_xor_udaf(), - &[child], - &[], - &[], - &[], - &schema, - Some("bit_xor".to_string()), - false, - false, - ) - .map_err(|e| e.into()) + + AggregateExprBuilder::new(bit_xor_udaf(), vec![child]) + .schema(schema) + .alias("bit_xor") + .with_ignore_nulls(false) + .with_distinct(false) + .build().map_err(|e| e.into()) } AggExprStruct::Covariance(expr) => { let child1 = self.create_expr(expr.child1.as_ref().unwrap(), schema.clone())?; @@ -1634,11 +1588,10 @@ impl PhysicalPlanner { &window_func, window_func_name, &window_args, - &[], partition_by, sort_exprs, window_frame.into(), - &input_schema, + input_schema.as_ref(), false, // TODO: Ignore nulls ) .map_err(|e| ExecutionError::DataFusionError(e.to_string())) diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 902529ba51..88efb7bd49 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -126,7 +126,7 @@ impl FilterExec { let schema = input.schema(); if !check_support(predicate, &schema) { let selectivity = default_selectivity as f64 / 100.0; - let mut stats = input_stats.into_inexact(); + let mut stats = input_stats.to_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); stats.total_byte_size = stats .total_byte_size diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 0879e278d2..2cc129409f 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -42,7 +42,6 @@ class CometJoinSuite extends CometTestBase { } } - /* test("join - self join") { val df1 = testData.select(testData("key")).as("df1") val df2 = testData.select(testData("key")).as("df2") @@ -336,7 +335,6 @@ class CometJoinSuite extends CometTestBase { } } } - */ test("SortMergeJoin with join filter") { withSQLConf( @@ -344,7 +342,6 @@ class CometJoinSuite extends CometTestBase { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") { - /* val df1 = sql( "SELECT * FROM tbl_a JOIN tbl_b ON tbl_a._2 = tbl_b._1 AND " + "tbl_a._1 > tbl_b._2") @@ -369,24 +366,16 @@ class CometJoinSuite extends CometTestBase { "SELECT * FROM tbl_b RIGHT JOIN tbl_a ON tbl_a._2 = tbl_b._1 " + "AND tbl_a._1 > tbl_b._2") checkSparkAnswerAndOperator(df5) - */ val df6 = sql( "SELECT * FROM tbl_a FULL JOIN tbl_b ON tbl_a._2 = tbl_b._1 " + - "AND tbl_a._1 > tbl_b._2 ORDER BY tbl_a._1, tbl_a._2") - df6.explain() - df6.show(100) - // checkSparkAnswer(df6) + "AND tbl_a._1 > tbl_b._2") + checkSparkAnswerAndOperator(df6) - /* val df7 = sql( "SELECT * FROM tbl_b FULL JOIN tbl_a ON tbl_a._2 = tbl_b._1 " + "AND tbl_a._1 > tbl_b._2") - df7.explain() checkSparkAnswerAndOperator(df7) - */ - - /* val left = sql("SELECT * FROM tbl_a") val right = sql("SELECT * FROM tbl_b") @@ -406,7 +395,6 @@ class CometJoinSuite extends CometTestBase { val df11 = right.join(left, left("_2") === right("_1") && left("_2") >= right("_1"), "leftanti") checkSparkAnswerAndOperator(df11) - */ } } }