From 06bd109fa3749aab9eabe1ffa675ee6cf715f58e Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Fri, 30 Jul 2021 10:08:38 +0800 Subject: [PATCH] [NSE-207] Fix aggregate and refresh UT test script (#426) * fix expr id difference in Partial Aggregate * add fallback to window * refresh ut * use stol instead of stoi * enable ut full test --- .../execution/ColumnarHashAggregateExec.scala | 43 ++-- .../oap/execution/ColumnarWindowExec.scala | 26 +++ .../com/intel/oap/misc/DateTimeSuite.scala | 6 +- .../com/intel/oap/tpc/ds/TPCDSSuite.scala | 6 +- .../ArrowColumnarBatchSerializerSuite.scala | 4 +- .../spark/sql/DataFrameAggregateSuite.scala | 2 +- .../sql/DataFrameWindowFunctionsSuite.scala | 4 +- .../apache/spark/sql/DateFunctionsSuite.scala | 2 +- .../apache/spark/sql/PlanStabilitySuite.scala | 4 +- .../apache/spark/sql/RepartitionSuite.scala | 2 +- .../NativeDataFrameAggregateSuite.scala | 2 +- .../nativesql/NativeRepartitionSuite.scala | 2 + .../nativesql/NativeSQLConvertedSuite.scala | 57 ++---- .../ext/hash_aggregate_kernel.cc | 4 +- .../ext/typed_action_codegen_impl.h | 2 +- native-sql-engine/tools/failed_ut_list.log | 183 ++++++++++++++++++ native-sql-engine/tools/run_ut.sh | 32 ++- 17 files changed, 286 insertions(+), 95 deletions(-) create mode 100644 native-sql-engine/tools/failed_ut_list.log diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala index 169bfd443..eff751347 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala @@ -66,7 +66,7 @@ case class ColumnarHashAggregateExec( aggregateExpressions: Seq[AggregateExpression], aggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, - resultExpressions: Seq[NamedExpression], + var resultExpressions: Seq[NamedExpression], child: SparkPlan) extends BaseAggregateExec with ColumnarCodegenSupport @@ -76,6 +76,20 @@ case class ColumnarHashAggregateExec( val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo override def supportsColumnar = true + var resAttributes: Seq[Attribute] = resultExpressions.map(_.toAttribute) + if (aggregateExpressions != null && aggregateExpressions.nonEmpty) { + aggregateExpressions.head.mode match { + case Partial => + // To fix the expression ids in result expressions being different with those from + // inputAggBufferAttributes, in Partial Aggregate, + // result attributes are recalculated to set the result expressions. + resAttributes = groupingExpressions.map(_.toAttribute) ++ + aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) + resultExpressions = resAttributes + case _ => + } + } + // Members declared in org.apache.spark.sql.execution.AliasAwareOutputPartitioning override protected def outputExpressions: Seq[NamedExpression] = resultExpressions @@ -83,7 +97,7 @@ case class ColumnarHashAggregateExec( protected def doProduce(ctx: CodegenContext): String = throw new UnsupportedOperationException() // Members declared in org.apache.spark.sql.catalyst.plans.QueryPlan - override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) + override def output: Seq[Attribute] = resAttributes // Members declared in org.apache.spark.sql.execution.SparkPlan protected override def doExecute() @@ -398,30 +412,7 @@ case class ColumnarHashAggregateExec( expr.mode match { case Final => val out_res = 0 - resultColumnVectors(idx).dataType match { - case t: IntegerType => - resultColumnVectors(idx) - .put(0, out_res.asInstanceOf[Number].intValue) - case t: LongType => - resultColumnVectors(idx) - .put(0, out_res.asInstanceOf[Number].longValue) - case t: DoubleType => - resultColumnVectors(idx) - .put(0, out_res.asInstanceOf[Number].doubleValue()) - case t: FloatType => - resultColumnVectors(idx) - .put(0, out_res.asInstanceOf[Number].floatValue()) - case t: ByteType => - resultColumnVectors(idx) - .put(0, out_res.asInstanceOf[Number].byteValue()) - case t: ShortType => - resultColumnVectors(idx) - .put(0, out_res.asInstanceOf[Number].shortValue()) - case t: StringType => - val values = (out_res :: Nil).map(_.toByte).toArray - resultColumnVectors(idx) - .putBytes(0, 1, values, 0) - } + putDataIntoVector(resultColumnVectors, out_res, idx) idx += 1 case _ => } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 153fec797..be0596077 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -50,6 +50,7 @@ import scala.collection.mutable.ListBuffer import scala.util.Random import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils +import util.control.Breaks._ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], @@ -60,6 +61,8 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) + buildCheck() + override def requiredChildDistribution: Seq[Distribution] = { if (partitionSpec.isEmpty) { // Only show warning when the number of bytes is larger than 100 MiB? @@ -91,6 +94,29 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], val sparkConf = sparkContext.getConf val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo + def buildCheck(): Unit = { + var allLiteral = true + try { + breakable { + for (func <- validateWindowFunctions()) { + for (child <- func._2.children) { + if (!child.isInstanceOf[Literal]) { + allLiteral = false + break + } + } + } + } + } catch { + case e: Throwable => + throw new UnsupportedOperationException(s"${e.getMessage}") + } + if (allLiteral) { + throw new UnsupportedOperationException( + s"Window functions' children all being Literal is not supported.") + } + } + def checkAggFunctionSpec(windowSpec: WindowSpecDefinition): Unit = { if (windowSpec.orderSpec.nonEmpty) { throw new UnsupportedOperationException("unsupported operation for " + diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala index 0682b4eb3..2fe2c950c 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala @@ -213,7 +213,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { } // FIXME ZONE issue - ignore("date type - cast from timestamp") { + test("date type - cast from timestamp") { withTempView("dates") { val dates = (0L to 3L).map(i => i * 24 * 1000 * 3600) .map(i => Tuple1(new Timestamp(i))) @@ -248,7 +248,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { } // todo: fix field/literal implicit conversion in ColumnarExpressionConverter - ignore("date type - join on, bhj") { + test("date type - join on, bhj") { withTempView("dates1", "dates2") { val dates1 = (0L to 3L).map(i => i * 1000 * 3600 * 24) .map(i => Tuple1(new Date(i))).toDF("time1") @@ -750,7 +750,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { } } - ignore("datetime function - to_date with format") { // todo GetTimestamp IS PRIVATE ? + test("datetime function - to_date with format") { // todo GetTimestamp IS PRIVATE ? withTempView("dates") { val dates = Seq("2009-07-30", "2009-07-31", "2009-08-01") diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index 5766f1f3e..379af32e2 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -70,7 +70,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { super.afterAll() } - ignore("window queries") { + test("window queries") { runner.runTPCQuery("q12", 1, true) runner.runTPCQuery("q20", 1, true) runner.runTPCQuery("q36", 1, true) @@ -103,7 +103,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { df.show() } - ignore("window function with decimal input") { + test("window function with decimal input") { val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_current_price)" + " OVER (PARTITION BY i_class_id) FROM item LIMIT 1000") df.explain() @@ -118,7 +118,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { df.show() } - ignore("window function with decimal input 2") { + test("window function with decimal input 2") { val df = spark.sql("SELECT i_item_sk, i_class_id, RANK()" + " OVER (PARTITION BY i_class_id ORDER BY i_current_price) FROM item LIMIT 1000") df.explain() diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializerSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializerSuite.scala index dec15eb28..5e64ded78 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializerSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializerSuite.scala @@ -36,7 +36,7 @@ class ArrowColumnarBatchSerializerSuite extends SparkFunSuite with SharedSparkSe SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer number of output rows") } - ignore("deserialize all null") { + test("deserialize all null") { val input = getTestResourcePath("test-data/native-splitter-output-all-null") val serializer = new ArrowColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() @@ -64,7 +64,7 @@ class ArrowColumnarBatchSerializerSuite extends SparkFunSuite with SharedSparkSe deserializedStream.close() } - ignore("deserialize nullable string") { + test("deserialize nullable string") { val input = getTestResourcePath("test-data/native-splitter-output-nullable-string") val serializer = new ArrowColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 011215da2..ad782e4af 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1001,7 +1001,7 @@ class DataFrameAggregateSuite extends QueryTest } Seq(true, false).foreach { value => - ignore(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") { + test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) { withTempView("t1", "t2") { diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 76edcbec9..207b2963f 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -553,7 +553,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest Row("b", 2, 4, 8))) } - ignore("null inputs") { + test("null inputs") { val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2)) .toDF("key", "value") val window = Window.orderBy() @@ -908,7 +908,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest } } - ignore("NaN and -0.0 in window partition keys") { + test("NaN and -0.0 in window partition keys") { val df = Seq( (Float.NaN, Double.NaN), (0.0f/0.0f, 0.0/0.0), diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index e4a48812a..d7bbf597f 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -45,7 +45,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1) } - ignore("function current_timestamp and now") { + test("function current_timestamp and now") { val df1 = Seq((1, 2), (3, 1)).toDF("a", "b") checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1)) diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index e16e04042..0545baf29 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -69,7 +69,8 @@ import org.apache.spark.sql.internal.SQLConf * }}} */ // scalastyle:on line.size.limit -@deprecated("This test suite is not suitable for native sql engine.", "Mo Rui") +// This test suite is not suitable for native sql engine. (Mo Rui) +/* trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite { private val originalMaxToStringFields = conf.maxToStringFields @@ -338,3 +339,4 @@ class TPCDSModifiedPlanStabilityWithStatsSuite extends PlanStabilitySuite { } } } +*/ diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/RepartitionSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/RepartitionSuite.scala index 3aaffbc4e..b429d9d5e 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/RepartitionSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/RepartitionSuite.scala @@ -178,7 +178,7 @@ class ReuseExchangeSuite extends RepartitionSuite { override lazy val input = spark.read.parquet(filePath) - ignore("columnar exchange same result") { + test("columnar exchange same result") { val df1 = input.groupBy("n_regionkey").agg(Map("n_nationkey" -> "sum")) val hashAgg1 = df1.queryExecution.executedPlan.collectFirst { case agg: ColumnarHashAggregateExec => agg diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeDataFrameAggregateSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeDataFrameAggregateSuite.scala index 00f18dbfc..4539f00ab 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeDataFrameAggregateSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeDataFrameAggregateSuite.scala @@ -1000,7 +1000,7 @@ class NativeDataFrameAggregateSuite extends QueryTest } Seq(true, false).foreach { value => - ignore(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") { + test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") { withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) { withTempView("t1", "t2") { diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeRepartitionSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeRepartitionSuite.scala index b0b0496e2..dd48729a6 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeRepartitionSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeRepartitionSuite.scala @@ -74,6 +74,7 @@ class NativeTPCHTableRepartitionSuite extends NativeRepartitionSuite { override lazy val input = spark.read.format("arrow").load(filePath) + /* ignore("tpch table round robin partitioning") { withRepartition(df => df.repartition(2)) } @@ -95,6 +96,7 @@ class NativeTPCHTableRepartitionSuite extends NativeRepartitionSuite { df => df.groupBy("n_regionkey").agg(Map("n_nationkey" -> "sum")), df => df.repartition(2)) } + */ } class NativeDisableColumnarShuffleSuite extends NativeRepartitionSuite { diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeSQLConvertedSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeSQLConvertedSuite.scala index a567dd7e8..44ac2c536 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeSQLConvertedSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/nativesql/NativeSQLConvertedSuite.scala @@ -92,7 +92,7 @@ class NativeSQLConvertedSuite extends QueryTest Row(null, 9))) } - ignore("SMJ") { + test("SMJ") { Seq[(String, Integer, Integer, Long, Double, Double, Double, Timestamp, Date)]( ("val1a", 6, 8, 10L, 15.0, 20D, 20E2, Timestamp.valueOf("2014-04-04 00:00:00.000"), Date.valueOf("2014-04-04")), ("val1b", 8, 16, 19L, 17.0, 25D, 26E2, Timestamp.valueOf("2014-05-04 01:01:00.000"), Date.valueOf("2014-05-04")), @@ -227,7 +227,7 @@ class NativeSQLConvertedSuite extends QueryTest Row(0.0))) } - ignore("int4 and int8 exception") { + test("int4 and int8 exception") { Seq(0, 123456, -123456, 2147483647, -2147483647) .toDF("f1").createOrReplaceTempView("INT4_TBL") val df = sql("SELECT '' AS five, i.f1, i.f1 * smallint('2') AS x FROM INT4_TBL i") @@ -250,7 +250,7 @@ class NativeSQLConvertedSuite extends QueryTest df.show() } - ignore("two inner joins with condition") { + test("two inner joins with condition") { spark .read .format("csv") @@ -285,10 +285,16 @@ class NativeSQLConvertedSuite extends QueryTest "where b.f1 = t.thousand and a.f1 = b.f1 and (a.f1+b.f1+999) = t.tenthous") checkAnswer(df, Seq()) - /** window_part1 -- window has incorrect result */ + /** join -- SMJ left semi */ + val df2 = sql("select count(*) from tenk1 a where unique1 in" + + " (select unique1 from tenk1 b join tenk1 c using (unique1) where b.unique2 = 42)") + checkAnswer(df2, Seq(Row(1))) + } + ignore("window incorrect result") { + /** window_part1 */ val df1 = sql("SELECT sum(unique1) over (rows between current row and unbounded following)," + - "unique1, four FROM tenk1 WHERE unique1 < 10") + "unique1, four FROM tenk1 WHERE unique1 < 10") checkAnswer(df1, Seq( Row(0, 0, 0), Row(10, 3, 3), @@ -300,12 +306,6 @@ class NativeSQLConvertedSuite extends QueryTest Row(41, 2, 2), Row(45, 4, 0), Row(7, 7, 3))) - - /** join -- SMJ left semi has segfault */ - - val df2 = sql("select count(*) from tenk1 a where unique1 in" + - " (select unique1 from tenk1 b join tenk1 c using (unique1) where b.unique2 = 42)") - checkAnswer(df2, Seq(Row(1))) } test("min_max") { @@ -592,33 +592,6 @@ class NativeSQLConvertedSuite extends QueryTest } test("groupingsets") { - spark - .read - .format("csv") - .options(Map("delimiter" -> "\t", "header" -> "false")) - .schema( - """ - |unique1 int, - |unique2 int, - |two int, - |four int, - |ten int, - |twenty int, - |hundred int, - |thousand int, - |twothousand int, - |fivethous int, - |tenthous int, - |odd int, - |even int, - |stringu1 string, - |stringu2 string, - |string4 string - """.stripMargin) - .load(testFile("test-data/postgresql/tenk.data")) - .write - .format("parquet") - .saveAsTable("tenk1") val df = sql("select four, x from (select four, ten, 'foo' as x from tenk1) as t" + " group by grouping sets (four, x) having x = 'foo'") checkAnswer(df, Seq(Row(null, "foo"))) @@ -692,7 +665,7 @@ class NativeSQLConvertedSuite extends QueryTest checkAnswer(df, Seq(Row(1, 1))) } - ignore("scalar-subquery-select -- SMJ LeftAnti has incorrect result") { + test("scalar-subquery-select -- SMJ LeftAnti has incorrect result") { Seq[(String, Integer, Integer, Long, Double, Double, Double, Timestamp, Date)]( ("val1a", 6, 8, 10L, 15.0, 20D, 20E2, Timestamp.valueOf("2014-04-04 00:00:00.000"), Date.valueOf("2014-04-04")), ("val1b", 8, 16, 19L, 17.0, 25D, 26E2, Timestamp.valueOf("2014-05-04 01:01:00.000"), Date.valueOf("2014-05-04")), @@ -756,7 +729,7 @@ class NativeSQLConvertedSuite extends QueryTest Row("val1e", 10))) } - test("join") { +// test("join") { // Seq[(Integer, Integer, String)]( // (1, 4, "one"), // (2, 3, "two"), @@ -814,7 +787,5 @@ class NativeSQLConvertedSuite extends QueryTest // (4, null)) // .toDF("y1", "y2") // .createOrReplaceTempView("y") - - } - +// } } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc index fa32f9655..5b0dab945 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc @@ -492,7 +492,7 @@ class HashAggregateKernel::Impl { 0) { auto res_type_list = {result_field_list[result_id]}; result_id += 1; - int arg = std::stoi(action_name_list[action_id].substr(20)); + int arg = std::stol(action_name_list[action_id].substr(20)); RETURN_NOT_OK(MakeCountLiteralAction(ctx_, arg, res_type_list, &action)); } else if (action_name_list[action_id].compare("action_stddev_samp_partial") == 0) { @@ -650,7 +650,7 @@ class HashAggregateKernel::Impl { } else if (action_name.compare(0, 20, "action_countLiteral_") == 0) { auto res_type_list = {result_field_list[result_id]}; result_id += 1; - int arg = std::stoi(action_name.substr(20)); + int arg = std::stol(action_name.substr(20)); RETURN_NOT_OK(MakeCountLiteralAction(ctx_, arg, res_type_list, &action)); } else if (action_name.compare("action_stddev_samp_partial") == 0) { auto res_type_list = {result_field_list[result_id], diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/typed_action_codegen_impl.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/typed_action_codegen_impl.h index 5e5c79a06..9a50bd694 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/typed_action_codegen_impl.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/typed_action_codegen_impl.h @@ -122,7 +122,7 @@ class TypedActionCodeGenImpl { name, child_list_, input_list_, input_fields_list_, named_projector_); } else if (action_name_.compare(0, 20, "action_countLiteral_") == 0) { - auto lit = std::stoi(action_name_.substr(20)); + auto lit = std::stol(action_name_.substr(20)); *action_codegen = std::make_shared( "count_literal_" + std::to_string(lit), lit, child_list_, input_list_, input_fields_list_, named_projector_); diff --git a/native-sql-engine/tools/failed_ut_list.log b/native-sql-engine/tools/failed_ut_list.log new file mode 100644 index 000000000..7dbc22cb0 --- /dev/null +++ b/native-sql-engine/tools/failed_ut_list.log @@ -0,0 +1,183 @@ +- SPARK-8828 sum should return null if all input values are null *** FAILED *** +- SPARK-3173 Timestamp support in the parser *** FAILED *** +- EXCEPT *** FAILED *** +- MINUS *** FAILED *** +- INTERSECT *** FAILED *** +- string timestamp comparison *** FAILED *** +- SPARK-29239: Subquery should not cause NPE when eliminating subexpression *** FAILED *** +- prune results by current_time, complete mode - state format version 1 *** FAILED *** +- prune results by current_time, complete mode - state format version 2 *** FAILED *** +- changing schema of state when restarting query - schema check off - state format version 1 *** FAILED *** +- function current_timestamp and now *** FAILED *** +- dayofyear *** FAILED *** +- hour *** FAILED *** +- function to_date *** FAILED *** +- unix_timestamp *** FAILED *** +- to_unix_timestamp *** FAILED *** +- to_timestamp *** FAILED *** +- SPARK-30668: use legacy timestamp parser in to_timestamp *** FAILED *** +- timestamp type conversion *** FAILED *** +- SHOW PARTITIONS V1: SPARK-33591: null as a partition value *** FAILED *** +- SPARK-33867: Test DataFrame.where for LocalDate and Instant *** FAILED *** +- static scan metrics *** FAILED *** +- partition pruning in broadcast hash joins with aliases *** FAILED *** +- partition pruning in broadcast hash joins *** FAILED *** +- avoid reordering broadcast join keys to match input hash partitioning *** FAILED *** +- Plan broadcast pruning only when the broadcast can be reused *** FAILED *** +- SPARK-32817: DPP throws error when the broadcast side is empty *** FAILED *** +- groupBy *** FAILED *** +- Spark vectorized reader - with partition data column - select only top-level fields *** FAILED *** +- Spark vectorized reader - with partition data column - select one deep nested complex field after join *** FAILED *** +- Spark vectorized reader - with partition data column - select one deep nested complex field after outer join *** FAILED *** +- setAuthenticationConfigIfNeeded must set authentication if not set *** FAILED *** +- Write timestamps correctly with timestampFormat option and timeZone option *** FAILED *** +- exception mode for parsing date/timestamp string *** FAILED *** +- Chained Scalar Pandas UDFs should be combined to a single physical node *** FAILED *** +- Mixed Batched Python UDFs and Pandas UDF should be separate physical node *** FAILED *** +- Independent Batched Python UDFs and Scalar Pandas UDFs should be combined separately *** FAILED *** +- Dependent Batched Python UDFs and Scalar Pandas UDFs should not be combined *** FAILED *** +- metrics of the shuffle reader *** FAILED *** +- test SortMergeJoin (with spill) *** FAILED *** +- test SortMergeJoin output ordering *** FAILED *** +- unsafe broadcast hash join updates peak execution memory *** FAILED *** +- unsafe broadcast hash outer join updates peak execution memory *** FAILED *** +- unsafe broadcast left semi join updates peak execution memory *** FAILED *** +- SPARK-23192: broadcast hint should be retained after using the cached data *** FAILED *** +- SPARK-23214: cached data should not carry extra hint info *** FAILED *** +- broadcast hint in SQL *** FAILED *** +- Broadcast timeout *** FAILED *** +- broadcast join where streamed side's output partitioning is HashPartitioning *** FAILED *** +- broadcast join where streamed side's output partitioning is PartitioningCollection *** FAILED *** +- BroadcastHashJoinExec output partitioning size should be limited with a config *** FAILED *** +- SPARK-8005 input_file_name *** FAILED *** +- SPARK-32516: legacy path option behavior in load() *** FAILED *** +- SPARK-32160: Disallow to create SparkSession in executors *** FAILED *** +- SPARK-32160: Allow to create SparkSession in executors if the config is set *** FAILED *** +- SPARK-32991: Use conf in shared state as the original configuration for RESET *** FAILED *** +- SPARK-32991: RESET should work properly with multi threads *** FAILED *** +- SPARK-33944: warning setting hive.metastore.warehouse.dir using session options *** FAILED *** +- SPARK-33944: no warning setting spark.sql.warehouse.dir using session options *** FAILED *** +- SPARK-10634 timestamp written and read as INT64 - truncation *** FAILED *** +- SPARK-10301 requested schema clipping - schemas with disjoint sets of fields *** FAILED *** +- SPARK-26677: negated null-safe equality comparison should not filter matched row groups *** FAILED *** +- returning batch for wide table *** FAILED *** +- SPARK-15370: COUNT bug in Aggregate *** FAILED *** +- ListQuery and Exists should work even no correlated references *** FAILED *** +- SPARK-23957 Remove redundant sort from subquery plan(scalar subquery) *** FAILED *** +- date type - cast to string *** FAILED *** +- date type - cast from string *** FAILED *** +- date type - cast to timestamp *** FAILED *** +- date type - cast from timestamp *** FAILED *** +- datetime function - unix_date *** FAILED *** +- datetime function - to_date with format *** FAILED *** +- SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema *** FAILED *** +- basic usage *** FAILED *** +- input row metrics *** FAILED *** +- verify ServerThread only accepts the first connection *** FAILED *** +- filter pushdown - timestamp *** FAILED *** +- deserialize all null *** FAILED *** +- deserialize nullable string *** FAILED *** +- Give up splitting aggregate code if a parameter length goes over the limit *** FAILED *** +- Give up splitting subexpression code if a parameter length goes over the limit *** FAILED *** +- groupBy *** FAILED *** +- pivot with timestamp and count should not print internal representation *** FAILED *** +- Columnar Cache Plugin *** FAILED *** +- SPARK-28224: Aggregate sum big decimal overflow *** FAILED *** +- SPARK-28067: Aggregate sum should not return wrong results for decimal overflow *** FAILED *** +- describe *** FAILED *** +- SPARK-18350 show with session local timezone *** FAILED *** +- SPARK-18350 show with session local timezone, vertical = true *** FAILED *** +- NaN is greater than all other non-NaN numeric values *** FAILED *** +- sameResult() on aggregate *** FAILED *** +- SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit *** FAILED *** +- SPARK-22271: mean overflows and returns null for some decimal variables *** FAILED *** +- groupBy *** FAILED *** +- Write timestamps correctly with timestampFormat option and timeZone option *** FAILED *** +- exception mode for parsing date/timestamp string *** FAILED *** +- UDF input_file_name() *** FAILED *** +- SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes effect *** FAILED *** +- Spark vectorized reader - with partition data column - select only top-level fields *** FAILED *** +- Spark vectorized reader - with partition data column - select only expressions without references *** FAILED *** +- Spark vectorized reader - with partition data column - select one deep nested complex field after join *** FAILED *** +- Spark vectorized reader - with partition data column - select one deep nested complex field after outer join *** FAILED *** +- column type inference *** FAILED *** +- parse partitions *** FAILED *** +- SPARK-26327: FileSourceScanExec metrics *** FAILED *** +- static scan metrics *** FAILED *** +- partition pruning in broadcast hash joins with aliases *** FAILED *** +- partition pruning in broadcast hash joins *** FAILED *** +- avoid reordering broadcast join keys to match input hash partitioning *** FAILED *** +- Plan broadcast pruning only when the broadcast can be reused *** FAILED *** +- SPARK-32817: DPP throws error when the broadcast side is empty *** FAILED *** +- SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps *** FAILED *** +- SPARK-31159: rebasing timestamps in write *** FAILED *** +- filter pushdown - timestamp *** FAILED *** +- SPARK-31284, SPARK-31423: rebasing timestamps in write *** FAILED *** +- column type inference *** FAILED *** +- parse partitions *** FAILED *** +- Spark vectorized reader - with partition data column - select only top-level fields *** FAILED *** +- Spark vectorized reader - with partition data column - select one deep nested complex field after join *** FAILED *** +- Spark vectorized reader - with partition data column - select one deep nested complex field after outer join *** FAILED *** +- timeZone setting in dynamic partition writes *** FAILED *** +- ReadOnlySQLConf is correctly created at the executor side *** FAILED *** +- case-sensitive config should work for json schema inference *** FAILED *** +- SPARK-24727 CODEGEN_CACHE_MAX_ENTRIES is correctly referenced at the executor side *** FAILED *** +- SPARK-22219: refactor to control to generate comment *** FAILED *** +- SPARK-28939: propagate SQLConf also in conversions to RDD *** FAILED *** +- SPARK-30556 propagate local properties to subquery execution thread *** FAILED *** +- SPARK-22590 propagate local properties to broadcast execution thread *** FAILED *** +- SPARK-10634 timestamp written and read as INT64 - truncation *** FAILED *** +- SPARK-10301 requested schema clipping - schemas with disjoint sets of fields *** FAILED *** +- SPARK-26677: negated null-safe equality comparison should not filter matched row groups *** FAILED *** +- returning batch for wide table *** FAILED *** +- parquet timestamp conversion *** FAILED *** +- Check schemas for expression examples *** FAILED *** +- SPARK-20725: partial aggregate should behave correctly for sameResult *** FAILED *** +- Generated code on driver should not embed platform-specific constant *** FAILED *** +- distributed test *** FAILED *** +- groupBy *** FAILED *** +- environmental variables *** FAILED *** +- columnar exchange same result *** FAILED *** +- BroadcastExchange should cancel the job group if timeout *** FAILED *** +- Spark vectorized reader - with partition data column - select only top-level fields *** FAILED *** +- Spark vectorized reader - with partition data column - select only expressions without references *** FAILED *** +- Spark vectorized reader - with partition data column - select one deep nested complex field after join *** FAILED *** +- Spark vectorized reader - with partition data column - select one deep nested complex field after outer join *** FAILED *** +- alternative output committer, merge schema *** FAILED *** +- alternative output committer, no merge schema *** FAILED *** +- Parquet output committer, merge schema *** FAILED *** +- Parquet output committer, no merge schema *** FAILED *** +- input_file_name, input_file_block_start, input_file_block_length - more than one source *** FAILED *** +- input_file_name, input_file_block_start, input_file_block_length - FileScanRDD *** FAILED *** +- arrow_udf test *** FAILED *** +- unsafe broadcast hash join updates peak execution memory *** FAILED *** +- unsafe broadcast hash outer join updates peak execution memory *** FAILED *** +- unsafe broadcast left semi join updates peak execution memory *** FAILED *** +- SPARK-23192: broadcast hint should be retained after using the cached data *** FAILED *** +- SPARK-23214: cached data should not carry extra hint info *** FAILED *** +- broadcast hint in SQL *** FAILED *** +- Broadcast timeout *** FAILED *** +- broadcast join where streamed side's output partitioning is HashPartitioning *** FAILED *** +- broadcast join where streamed side's output partitioning is PartitioningCollection *** FAILED *** +- BroadcastHashJoinExec output partitioning size should be limited with a config *** FAILED *** +- Casting long as timestamp *** FAILED *** +- Write timestamps correctly with timestampFormat option and timeZone option *** FAILED *** +- exception mode for parsing date/timestamp string *** FAILED *** +- columnar batch scan implementation *** FAILED *** +- Casting long as timestamp *** FAILED *** +- Write timestamps correctly with timestampFormat option and timeZone option *** FAILED *** +- exception mode for parsing date/timestamp string *** FAILED *** +- Casting long as timestamp *** FAILED *** +- Write timestamps correctly with timestampFormat option and timeZone option *** FAILED *** +- exception mode for parsing date/timestamp string *** FAILED *** +- Write timestamps correctly with timestampFormat option and timeZone option *** FAILED *** +- exception mode for parsing date/timestamp string *** FAILED *** +- SPARK-33521: universal type conversions of partition values *** FAILED *** +- SPARK-15824 - Execute an INSERT wrapped in a WITH statement immediately *** FAILED *** +- except *** FAILED *** +- intersect *** FAILED *** +- SPARK-17123: Performing set operations that combine non-scala native types *** FAILED *** +- union by name - type coercion *** FAILED *** +- SPARK-34144: write Date and Timestampt, read LocalDate and Instant *** FAILED *** +- SPARK-34144: write LocalDate and Instant, read Date and Timestampt *** FAILED *** +- Explain formatted *** FAILED *** diff --git a/native-sql-engine/tools/run_ut.sh b/native-sql-engine/tools/run_ut.sh index 767c98ec7..93fa5a083 100755 --- a/native-sql-engine/tools/run_ut.sh +++ b/native-sql-engine/tools/run_ut.sh @@ -13,16 +13,18 @@ then else echo "SPARK_HOME is $spark_home" fi -mvn clean test -P full-scala-compiler -am -pl native-sql-engine/core -Dbuild_arrow=OFF -Dbuild_protobuf=OFF -DfailIfNoTests=false -DargLine="-Dspark.test.home=$spark_home" -Dexec.skip=true -Dmaven.test.failure.ignore=true &> native-sql-engine/tools/log-file.log +mvn clean test -P full-scala-compiler -Dbuild_arrow=OFF -Dbuild_protobuf=OFF -DfailIfNoTests=false -DargLine="-Dspark.test.home=$spark_home" -Dexec.skip=true -Dmaven.test.failure.ignore=true &> native-sql-engine/tools/log-file.log cd native-sql-engine/tools/ + +known_fails=183 tests_total=0 module_tested=0 -module_should_test=1 +module_should_test=7 while read -r line ; do num=$(echo "$line" | grep -o -E '[0-9]+') tests_total=$((tests_total+num)) done <<<"$(grep "Total number of tests run:" log-file.log)" - + succeed_total=0 while read -r line ; do [[ $line =~ [^0-9]*([0-9]+)\, ]] @@ -30,12 +32,26 @@ while read -r line ; do succeed_total=$((succeed_total+num)) let module_tested++ done <<<"$(grep "succeeded" log-file.log)" -echo "Tests total: $tests_total, Succeed Total: $succeed_total" - -if test $tests_total -eq $succeed_total -a $module_tested -eq $module_should_test +failed_count=$((tests_total-succeed_total)) +echo "Tests total: $tests_total, Succeed Total: $succeed_total, Known Fails: $known_fails, Actual Fails: $failed_count." + +cat log-file.log | grep "\*** FAILED \***" | grep -v "TESTS FAILED ***" | grep -v "TEST FAILED ***" &> new_failed_list.log +comm -1 -3 <(sort failed_ut_list.log) <(sort new_failed_list.log) &> diff.log +if [ -s diff.log ] +then + echo "Below are newly failed tests:" + while read p; do + echo "$p" + done