From 6af0a5536ba8fe496f2e8f3a3a1ca2962e6dd8d2 Mon Sep 17 00:00:00 2001 From: yangchuan Date: Tue, 9 May 2023 11:29:28 +0800 Subject: [PATCH 1/5] Support First/Last aggregate functions. fix fix UT. Add first/last to ch blacklist refactor trigger ci fix match error. --- .../utils/CHExpressionUtil.scala | 2 + .../VeloxAggregateFunctionsSuite.scala | 137 +++++++++++++---- .../HashAggregateExecBaseTransformer.scala | 5 +- .../AggregateFunctionsBuilder.scala | 8 + .../expression/ExpressionMappings.scala | 6 +- .../GlutenHashAggregateExecTransformer.scala | 141 ++++++++++-------- .../utils/velox/VeloxTestSettings.scala | 6 +- 7 files changed, 205 insertions(+), 100 deletions(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala b/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala index 8edfa652884f..c870749116ad 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala @@ -51,6 +51,8 @@ object CHExpressionUtil { BIT_AND_AGG -> Set(EMPTY_TYPE), BIT_XOR_AGG -> Set(EMPTY_TYPE), CORR -> Set(EMPTY_TYPE), + FIRST -> Set(EMPTY_TYPE), + LAST -> Set(EMPTY_TYPE), COVAR_POP -> Set(EMPTY_TYPE), COVAR_SAMP -> Set(EMPTY_TYPE) ) diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala index 274125256fd8..fcb6dfaee068 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala @@ -49,38 +49,49 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { .set("spark.sql.sources.useV1SourceList", "avro") } + test("count") { val df = runQueryAndCompare( "select count(*) from lineitem where l_partkey in (1552, 674, 1062)") { - checkOperatorMatch[GlutenHashAggregateExecTransformer] } + checkOperatorMatch[GlutenHashAggregateExecTransformer] + } runQueryAndCompare( "select count(l_quantity), count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } } test("avg") { val df = runQueryAndCompare( "select avg(l_partkey) from lineitem where l_partkey < 1000") { - checkOperatorMatch[GlutenHashAggregateExecTransformer] } + checkOperatorMatch[GlutenHashAggregateExecTransformer] + } runQueryAndCompare( "select avg(l_quantity), count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } runQueryAndCompare( "select avg(cast (l_quantity as DECIMAL(12, 2))), " + "count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } runQueryAndCompare( "select avg(cast (l_quantity as DECIMAL(22, 2))), " + "count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } } test("sum") { @@ -91,8 +102,10 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { runQueryAndCompare( "select sum(l_quantity), count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } runQueryAndCompare( "select sum(cast (l_quantity as DECIMAL(22, 2))) from lineitem") { checkOperatorMatch[GlutenHashAggregateExecTransformer] @@ -101,14 +114,18 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { "select sum(cast (l_quantity as DECIMAL(12, 2))), " + "count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } runQueryAndCompare( "select sum(cast (l_quantity as DECIMAL(22, 2))), " + "count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } } test("min and max") { @@ -119,8 +136,10 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { runQueryAndCompare( "select min(l_partkey), max(l_partkey), count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } } test("groupby") { @@ -154,15 +173,17 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { runQueryAndCompare( "select stddev_samp(l_quantity), count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } } test("stddev_pop") { runQueryAndCompare( """ |select stddev_pop(l_quantity) from lineitem; - |""".stripMargin) { + |""".stripMargin) { checkOperatorMatch[GlutenHashAggregateExecTransformer] } runQueryAndCompare( @@ -175,8 +196,10 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { runQueryAndCompare( "select stddev_pop(l_quantity), count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } } test("var_samp") { @@ -196,8 +219,10 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { runQueryAndCompare( "select var_samp(l_quantity), count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } } test("var_pop") { @@ -217,8 +242,10 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { runQueryAndCompare( "select var_pop(l_quantity), count(distinct l_partkey) from lineitem") { df => { assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } } test("bit_and bit_or bit_xor") { @@ -226,9 +253,9 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { for (func <- bitAggs) { runQueryAndCompare( s""" - |select ${func}(l_linenumber) from lineitem - |group by l_orderkey; - |""".stripMargin) { + |select ${func}(l_linenumber) from lineitem + |group by l_orderkey; + |""".stripMargin) { checkOperatorMatch[GlutenHashAggregateExecTransformer] } runQueryAndCompare( @@ -283,6 +310,56 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { } } + test("first") { + runQueryAndCompare( + s""" + |select first(l_linenumber), first(l_linenumber, true) from lineitem; + |""".stripMargin) { + checkOperatorMatch[GlutenHashAggregateExecTransformer] + } + runQueryAndCompare( + s""" + |select first_value(l_linenumber), first_value(l_linenumber, true) from lineitem + |group by l_orderkey; + |""".stripMargin) { + checkOperatorMatch[GlutenHashAggregateExecTransformer] + } + runQueryAndCompare( + s""" + |select first(l_linenumber), first(l_linenumber, true), count(distinct l_partkey) from lineitem + |""".stripMargin) { df => { + assert(getExecutedPlan(df).count(plan => { + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } + } + + test("last") { + runQueryAndCompare( + s""" + |select last(l_linenumber), last(l_linenumber, true) from lineitem; + |""".stripMargin) { + checkOperatorMatch[GlutenHashAggregateExecTransformer] + } + runQueryAndCompare( + s""" + |select last_value(l_linenumber), last_value(l_linenumber, true) from lineitem + |group by l_orderkey; + |""".stripMargin) { + checkOperatorMatch[GlutenHashAggregateExecTransformer] + } + runQueryAndCompare( + s""" + |select last(l_linenumber), last(l_linenumber, true), count(distinct l_partkey) from lineitem + |""".stripMargin) { df => { + assert(getExecutedPlan(df).count(plan => { + plan.isInstanceOf[GlutenHashAggregateExecTransformer] + }) == 4) + } + } + } + test("distinct functions") { runQueryAndCompare("SELECT sum(DISTINCT l_partkey), count(*) FROM lineitem") { df => { diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/HashAggregateExecBaseTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/HashAggregateExecBaseTransformer.scala index 583504630c4b..5884edfda61c 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/HashAggregateExecBaseTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/HashAggregateExecBaseTransformer.scala @@ -441,11 +441,10 @@ abstract class HashAggregateExecBaseTransformer( val mode = exp.mode val aggregateFunc = exp.aggregateFunction aggregateFunc match { - case Average(_, _) => + case _: Average | _: First | _: Last => mode match { case Partial | PartialMerge => - val avg = aggregateFunc.asInstanceOf[Average] - val aggBufferAttr = avg.inputAggBufferAttributes + val aggBufferAttr = aggregateFunc.inputAggBufferAttributes for (index <- aggBufferAttr.indices) { val attr = ConverterUtils.getAttrFromExpr(aggBufferAttr(index)) aggregateAttr += attr diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/AggregateFunctionsBuilder.scala b/gluten-core/src/main/scala/io/glutenproject/expression/AggregateFunctionsBuilder.scala index d51ba9dda93f..b744477ee89b 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/AggregateFunctionsBuilder.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/AggregateFunctionsBuilder.scala @@ -38,6 +38,14 @@ object AggregateFunctionsBuilder { throw new UnsupportedOperationException(s"Aggregate function not supported for $aggregateFunc.") } + aggregateFunc match { + case first @ First(_, ignoreNull) => + if (ignoreNull) substraitAggFuncName = ExpressionMappings.FIRST_IGNORE_NULL + case last @ Last(_, ignoreNulls) => + if (ignoreNulls) substraitAggFuncName = ExpressionMappings.LAST_IGNORE_NULL + case _ => + } + val inputTypes: Seq[DataType] = aggregateFunc.children.map(child => child.dataType) ExpressionBuilder.newScalarFunction( diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala index 9b573b1fdf5f..e309d3c3f381 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala @@ -45,6 +45,9 @@ object ExpressionMappings { final val COVAR_POP = "covar_pop" final val COVAR_SAMP = "covar_samp" final val LAST = "last" + final val LAST_IGNORE_NULL = "last_ignore_null" + final val FIRST = "first" + final val FIRST_IGNORE_NULL = "first_ignore_null" // Function names used by Substrait plan. final val ADD = "add" @@ -423,7 +426,8 @@ object ExpressionMappings { Sig[Corr](CORR), Sig[CovPopulation](COVAR_POP), Sig[CovSample](COVAR_SAMP), - Sig[Last](LAST) + Sig[Last](LAST), + Sig[First](FIRST) ) /** Mapping Spark window expression to Substrait function name */ diff --git a/gluten-data/src/main/scala/io/glutenproject/execution/GlutenHashAggregateExecTransformer.scala b/gluten-data/src/main/scala/io/glutenproject/execution/GlutenHashAggregateExecTransformer.scala index 5ce2e5a0bd87..dcb7a4199e40 100644 --- a/gluten-data/src/main/scala/io/glutenproject/execution/GlutenHashAggregateExecTransformer.scala +++ b/gluten-data/src/main/scala/io/glutenproject/execution/GlutenHashAggregateExecTransformer.scala @@ -62,8 +62,8 @@ case class GlutenHashAggregateExecTransformer( for (expr <- aggregateExpressions) { val aggregateFunction = expr.aggregateFunction aggregateFunction match { - case _: Average | _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop | - _: Corr | _: CovPopulation | _: CovSample => + case _: Average | _: First | _: Last | _: StddevSamp | _: StddevPop | _: VarianceSamp | + _: VariancePop | _: Corr | _: CovPopulation | _: CovSample => expr.mode match { case Partial | PartialMerge => return true @@ -107,10 +107,9 @@ case class GlutenHashAggregateExecTransformer( throw new UnsupportedOperationException(s"${expr.mode} not supported.") } expr.aggregateFunction match { - case _: Average => - // Select sum from Velox Struct. + case _: Average | _: First | _: Last => + // Select first and second aggregate buffer from Velox Struct. expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 0)) - // Select count from Velox Struct. expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 1)) colIdx += 1 case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => @@ -177,6 +176,12 @@ case class GlutenHashAggregateExecTransformer( structTypeNodes.add(ConverterUtils.getTypeNode( GlutenDecimalUtil.getAvgSumDataType(avg), nullable = true)) structTypeNodes.add(ConverterUtils.getTypeNode(LongType, nullable = true)) + case first: First => + structTypeNodes.add(ConverterUtils.getTypeNode(first.dataType, nullable = true)) + structTypeNodes.add(ConverterUtils.getTypeNode(BooleanType, nullable = true)) + case last: Last => + structTypeNodes.add(ConverterUtils.getTypeNode(last.dataType, nullable = true)) + structTypeNodes.add(ConverterUtils.getTypeNode(BooleanType, nullable = true)) case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => // Use struct type to represent Velox Row(BIGINT, DOUBLE, DOUBLE). structTypeNodes.add(ConverterUtils @@ -226,66 +231,47 @@ case class GlutenHashAggregateExecTransformer( // This is a special handling for PartialMerge in the execution of distinct. // Use Partial phase instead for this aggregation. val modeKeyWord = modeToKeyWord(if (mixedPartialAndMerge) Partial else aggregateMode) + + def generateMergeCompanionNode(): Unit = { + aggregateMode match { + case Partial => + val partialNode = ExpressionBuilder.makeAggregateFunction( + VeloxAggregateFunctionsBuilder.create(args, aggregateFunction), + childrenNodeList, + modeKeyWord, + getIntermediateTypeNode(aggregateFunction)) + aggregateNodeList.add(partialNode) + case PartialMerge => + val aggFunctionNode = ExpressionBuilder.makeAggregateFunction( + VeloxAggregateFunctionsBuilder + .create(args, aggregateFunction, mixedPartialAndMerge), + childrenNodeList, + modeKeyWord, + getIntermediateTypeNode(aggregateFunction)) + aggregateNodeList.add(aggFunctionNode) + case Final => + val aggFunctionNode = ExpressionBuilder.makeAggregateFunction( + VeloxAggregateFunctionsBuilder.create(args, aggregateFunction), + childrenNodeList, + modeKeyWord, + ConverterUtils.getTypeNode(aggregateFunction.dataType, aggregateFunction.nullable)) + aggregateNodeList.add(aggFunctionNode) + case other => + throw new UnsupportedOperationException(s"$other is not supported.") + } + } + aggregateFunction match { - case _: Average | _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop | - _: Corr | _: CovPopulation | _: CovSample => - aggregateMode match { - case Partial => - val partialNode = ExpressionBuilder.makeAggregateFunction( - VeloxAggregateFunctionsBuilder.create(args, aggregateFunction), - childrenNodeList, - modeKeyWord, - getIntermediateTypeNode(aggregateFunction)) - aggregateNodeList.add(partialNode) - case PartialMerge => - val aggFunctionNode = ExpressionBuilder.makeAggregateFunction( - VeloxAggregateFunctionsBuilder - .create(args, aggregateFunction, mixedPartialAndMerge), - childrenNodeList, - modeKeyWord, - getIntermediateTypeNode(aggregateFunction)) - aggregateNodeList.add(aggFunctionNode) - case Final => - val aggFunctionNode = ExpressionBuilder.makeAggregateFunction( - VeloxAggregateFunctionsBuilder.create(args, aggregateFunction), - childrenNodeList, - modeKeyWord, - ConverterUtils.getTypeNode(aggregateFunction.dataType, aggregateFunction.nullable)) - aggregateNodeList.add(aggFunctionNode) - case other => - throw new UnsupportedOperationException(s"$other is not supported.") - } case sum: Sum if sum.dataType.isInstanceOf[DecimalType] => - aggregateMode match { - case Partial => - val partialNode = ExpressionBuilder.makeAggregateFunction( - VeloxAggregateFunctionsBuilder.create(args, aggregateFunction), - childrenNodeList, - modeKeyWord, - getIntermediateTypeNode(aggregateFunction)) - aggregateNodeList.add(partialNode) - case PartialMerge => - val aggFunctionNode = ExpressionBuilder.makeAggregateFunction( - VeloxAggregateFunctionsBuilder - .create(args, aggregateFunction, mixedPartialAndMerge), - childrenNodeList, - modeKeyWord, - getIntermediateTypeNode(aggregateFunction)) - aggregateNodeList.add(aggFunctionNode) - case Final => - val aggFunctionNode = ExpressionBuilder.makeAggregateFunction( - VeloxAggregateFunctionsBuilder.create(args, aggregateFunction), - childrenNodeList, - modeKeyWord, - ConverterUtils.getTypeNode(aggregateFunction.dataType, aggregateFunction.nullable)) - aggregateNodeList.add(aggFunctionNode) - case other => - throw new UnsupportedOperationException(s"$other is not supported.") - } + generateMergeCompanionNode() + case _: Average | _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop | + _: Corr | _: CovPopulation | _: CovSample | _: First | _: Last => + generateMergeCompanionNode() case _ => val aggFunctionNode = ExpressionBuilder.makeAggregateFunction( VeloxAggregateFunctionsBuilder.create( - args, aggregateFunction, aggregateMode == PartialMerge && mixedPartialAndMerge), + args, aggregateFunction, + aggregateMode == PartialMerge && mixedPartialAndMerge), childrenNodeList, modeKeyWord, ConverterUtils.getTypeNode(aggregateFunction.dataType, aggregateFunction.nullable)) @@ -302,11 +288,12 @@ case class GlutenHashAggregateExecTransformer( groupingExpressions.foreach(expression => { typeNodeList.add(ConverterUtils.getTypeNode(expression.dataType, expression.nullable)) }) + aggregateExpressions.foreach(expression => { val aggregateFunction = expression.aggregateFunction aggregateFunction match { - case _: Average | _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop | - _: Corr | _: CovPopulation | _: CovSample => + case _: Average | _: First | _: Last | _: StddevSamp | _: StddevPop | + _: VarianceSamp | _: VariancePop | _: Corr | _: CovPopulation | _: CovSample => expression.mode match { case Partial | PartialMerge => typeNodeList.add(getIntermediateTypeNode(aggregateFunction)) @@ -413,6 +400,22 @@ case class GlutenHashAggregateExecTransformer( case other => throw new UnsupportedOperationException(s"$other is not supported.") } + case _: First | _: Last => + aggregateExpression.mode match { + case PartialMerge | Final => + assert(functionInputAttributes.size == 2, + s"${aggregateExpression.mode.toString} of First/Last expects two input attributes.") + // Use a Velox function to combine the intermediate columns into struct. + val childNodes = new util.ArrayList[ExpressionNode]( + functionInputAttributes.toList.map(attr => { + ExpressionConverter + .replaceWithExpressionTransformer(attr, originalInputAttributes) + .doTransform(args) + }).asJava) + exprNodes.add(getRowConstructNode(args, childNodes, functionInputAttributes)) + case other => + throw new UnsupportedOperationException(s"$other is not supported.") + } case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => aggregateExpression.mode match { case PartialMerge | Final => @@ -588,8 +591,8 @@ case class GlutenHashAggregateExecTransformer( val aggregateFunc = aggExpr.aggregateFunction val childrenNodes = new util.ArrayList[ExpressionNode]() aggregateFunc match { - case _: Average | _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop | - _: Corr | _: CovPopulation | _: CovSample + case _: Average | _: First | _: Last | _: StddevSamp | _: StddevPop | _: VarianceSamp | + _: VariancePop | _: Corr | _: CovPopulation | _: CovSample if aggExpr.mode == PartialMerge | aggExpr.mode == Final => // Only occupies one column due to intermediate results are combined // by previous projection. @@ -745,7 +748,7 @@ object VeloxAggregateFunctionsBuilder { forMergeCompanion: Boolean = false): Long = { val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]] - val sigName = ExpressionMappings.expressionsMap.get(aggregateFunc.getClass) + var sigName = ExpressionMappings.expressionsMap.get(aggregateFunc.getClass) if (sigName.isEmpty) { throw new UnsupportedOperationException(s"not currently supported: $aggregateFunc.") } @@ -753,6 +756,14 @@ object VeloxAggregateFunctionsBuilder { // Use companion function for partial-merge aggregation functions on count distinct. val substraitAggFuncName = if (!forMergeCompanion) sigName.get else sigName.get + "_merge" + aggregateFunc match { + case First(_, ignoreNulls) => + if (ignoreNulls) sigName = ExpressionMappings.FIRST_IGNORE_NULL + case Last(_, ignoreNulls) => + if (ignoreNulls) sigName = ExpressionMappings.LAST_IGNORE_NULL + case _ => + } + ExpressionBuilder.newScalarFunction( functionMap, ConverterUtils.makeFuncName( diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index aa5f394a4e96..bb721ce1be4e 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -122,7 +122,8 @@ class VeloxTestSettings extends BackendTestSettings { // We can enable the below test for spark 3.4 and higher versions. "Gluten - describe", // decimal failed ut. - "SPARK-22271: mean overflows and returns null for some decimal variables" + "SPARK-22271: mean overflows and returns null for some decimal variables", + "SPARK-34165: Add count_distinct to summary" ) enableSuite[GlutenDataFrameNaFunctionsSuite] @@ -235,10 +236,13 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataFrameWindowFunctionsSuite] // Spill not supported yet. .exclude("Window spill with more than the inMemoryThreshold and spillThreshold") + .exclude("SPARK-21258: complex object in combination with spilling") .exclude("NaN and -0.0 in window partition keys") // NaN case // Rewrite with NaN test cases excluded. .exclude("covar_samp, var_samp (variance), stddev_samp (stddev) functions in specific window") .exclude("corr, covar_pop, stddev_pop functions in specific window") + // https://github.com/oap-project/gluten/pull/1606 + .exclude("last/first with ignoreNulls", "last/first on descending ordered window") enableSuite[GlutenDataFrameSelfJoinSuite] enableSuite[GlutenComplexTypeSuite] .exclude("CreateMap") From 0d5c61e44f9be4f63246a4806ee88b98e2b2491a Mon Sep 17 00:00:00 2001 From: yangchuan Date: Tue, 9 May 2023 11:37:32 +0800 Subject: [PATCH 2/5] Use tmp velox branch --- ep/build-velox/src/get_velox.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 3d10ba05edf2..ef30f2c029a0 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -2,8 +2,8 @@ set -exu -VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=main +VELOX_REPO=https://github.com/Yohahaha/velox.git +VELOX_BRANCH=first_last #Set on run gluten on HDFS ENABLE_HDFS=OFF From a9f62daf8cd8b0316c98df2d433e0a059de08938 Mon Sep 17 00:00:00 2001 From: yangchuan Date: Mon, 15 May 2023 10:09:35 +0800 Subject: [PATCH 3/5] enable tests fix --- .../expression/AggregateFunctionsBuilder.scala | 6 +++--- .../execution/GlutenHashAggregateExecTransformer.scala | 10 +++++----- .../glutenproject/utils/velox/VeloxTestSettings.scala | 2 -- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/AggregateFunctionsBuilder.scala b/gluten-core/src/main/scala/io/glutenproject/expression/AggregateFunctionsBuilder.scala index b744477ee89b..245446ab8398 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/AggregateFunctionsBuilder.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/AggregateFunctionsBuilder.scala @@ -27,7 +27,7 @@ object AggregateFunctionsBuilder { def create(args: java.lang.Object, aggregateFunc: AggregateFunction): Long = { val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]] - val substraitAggFuncName = ExpressionMappings.expressionsMap.get(aggregateFunc.getClass) + var substraitAggFuncName = ExpressionMappings.expressionsMap.get(aggregateFunc.getClass) if (substraitAggFuncName.isEmpty) { throw new UnsupportedOperationException(s"Could not find valid a substrait mapping name for $aggregateFunc.") } @@ -40,9 +40,9 @@ object AggregateFunctionsBuilder { aggregateFunc match { case first @ First(_, ignoreNull) => - if (ignoreNull) substraitAggFuncName = ExpressionMappings.FIRST_IGNORE_NULL + if (ignoreNull) substraitAggFuncName = Some(ExpressionMappings.FIRST_IGNORE_NULL) case last @ Last(_, ignoreNulls) => - if (ignoreNulls) substraitAggFuncName = ExpressionMappings.LAST_IGNORE_NULL + if (ignoreNulls) substraitAggFuncName = Some(ExpressionMappings.LAST_IGNORE_NULL) case _ => } diff --git a/gluten-data/src/main/scala/io/glutenproject/execution/GlutenHashAggregateExecTransformer.scala b/gluten-data/src/main/scala/io/glutenproject/execution/GlutenHashAggregateExecTransformer.scala index dcb7a4199e40..9514aad53656 100644 --- a/gluten-data/src/main/scala/io/glutenproject/execution/GlutenHashAggregateExecTransformer.scala +++ b/gluten-data/src/main/scala/io/glutenproject/execution/GlutenHashAggregateExecTransformer.scala @@ -753,17 +753,17 @@ object VeloxAggregateFunctionsBuilder { throw new UnsupportedOperationException(s"not currently supported: $aggregateFunc.") } - // Use companion function for partial-merge aggregation functions on count distinct. - val substraitAggFuncName = if (!forMergeCompanion) sigName.get else sigName.get + "_merge" - aggregateFunc match { case First(_, ignoreNulls) => - if (ignoreNulls) sigName = ExpressionMappings.FIRST_IGNORE_NULL + if (ignoreNulls) sigName = Some(ExpressionMappings.FIRST_IGNORE_NULL) case Last(_, ignoreNulls) => - if (ignoreNulls) sigName = ExpressionMappings.LAST_IGNORE_NULL + if (ignoreNulls) sigName = Some(ExpressionMappings.LAST_IGNORE_NULL) case _ => } + // Use companion function for partial-merge aggregation functions on count distinct. + val substraitAggFuncName = if (!forMergeCompanion) sigName.get else sigName.get + "_merge" + ExpressionBuilder.newScalarFunction( functionMap, ConverterUtils.makeFuncName( diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index bb721ce1be4e..a2767ed503c7 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -241,8 +241,6 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite with NaN test cases excluded. .exclude("covar_samp, var_samp (variance), stddev_samp (stddev) functions in specific window") .exclude("corr, covar_pop, stddev_pop functions in specific window") - // https://github.com/oap-project/gluten/pull/1606 - .exclude("last/first with ignoreNulls", "last/first on descending ordered window") enableSuite[GlutenDataFrameSelfJoinSuite] enableSuite[GlutenComplexTypeSuite] .exclude("CreateMap") From e24bfa754198427c7f0df0996f143fe234690c39 Mon Sep 17 00:00:00 2001 From: yangchuan Date: Tue, 16 May 2023 14:36:37 +0800 Subject: [PATCH 4/5] add comments. --- .../scala/io/glutenproject/utils/velox/VeloxTestSettings.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index a2767ed503c7..c36252d9b65f 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -123,6 +123,7 @@ class VeloxTestSettings extends BackendTestSettings { "Gluten - describe", // decimal failed ut. "SPARK-22271: mean overflows and returns null for some decimal variables", + // Not supported for approx_count_distinct "SPARK-34165: Add count_distinct to summary" ) From 479104e78a8d1ee0b08e2329e2de37b3fb0deafd Mon Sep 17 00:00:00 2001 From: yangchuan Date: Tue, 16 May 2023 14:37:07 +0800 Subject: [PATCH 5/5] Use main velox branch --- ep/build-velox/src/get_velox.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index ef30f2c029a0..3d10ba05edf2 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -2,8 +2,8 @@ set -exu -VELOX_REPO=https://github.com/Yohahaha/velox.git -VELOX_BRANCH=first_last +VELOX_REPO=https://github.com/oap-project/velox.git +VELOX_BRANCH=main #Set on run gluten on HDFS ENABLE_HDFS=OFF