Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-451] backport patches to 1.2 (#448)
Browse files Browse the repository at this point in the history
* Minor: Remove debug code for PR#387 (#431)

* [NSE-436] Fix for Arrow Data Source test suite (#437)

Closes #436

* [NSE-254]Solve the redundant arrow library issue (#440)

* [NSE-254]Issue0410 jar size (#441)

* [NSE-254]Solve the redundant arrow library issue

* Remove mvn jar plugin 3.2.0

* fix packaging (#442)

Signed-off-by: Yuan Zhou <[email protected]>

Co-authored-by: Yuan Zhou <[email protected]>

* [NSE-207] Fix aggregate and refresh UT test script (#426)

* fix expr id difference in Partial Aggregate

* add fallback to window

* refresh ut

* use stol instead of stoi

* enable ut full test

* [NSE-429] TPC-DS Q14a/b get slowed down within setting spark.oap.sql.columnar.sortmergejoin.lazyread=true (#432)

* wip

* debug commit

* wip

* discard proxy pattern

* fix

* fix

* fix

* fix unexpected O(n^2) time

* fix1

* fix2

* Add a bunch of caches; Adjust prefetch batch length to 16

* format

* fix

* disable spark.oap.sql.columnar.sortmergejoin.lazyread by default

* fix

Co-authored-by: Hongze Zhang <[email protected]>
Co-authored-by: Wei-Ting Chen <[email protected]>
Co-authored-by: Rui Mo <[email protected]>
  • Loading branch information
4 people authored Aug 5, 2021
1 parent 6b12a93 commit d4c3a21
Show file tree
Hide file tree
Showing 23 changed files with 435 additions and 189 deletions.
2 changes: 1 addition & 1 deletion arrow-data-source/standard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {

test("file descriptor leak - v1") {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile1)
spark.catalog.createTable("ptab2", path, "arrow")
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.arrow(path)
frame.createOrReplaceTempView("ptab2")

def getFdCount: Long = {
ManagementFactory.getOperatingSystemMXBean
Expand Down
2 changes: 1 addition & 1 deletion native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
def getCpu(): Boolean = {
val source = scala.io.Source.fromFile("/proc/cpuinfo")
val lines = try source.mkString finally source.close()
return true
//TODO(): check CPU flags to enable/disable AVX512
if (lines.contains("GenuineIntel")) {
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class ColumnarHashAggregateExec(
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
var resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends BaseAggregateExec
with ColumnarCodegenSupport
Expand All @@ -76,14 +76,28 @@ case class ColumnarHashAggregateExec(
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
override def supportsColumnar = true

var resAttributes: Seq[Attribute] = resultExpressions.map(_.toAttribute)
if (aggregateExpressions != null && aggregateExpressions.nonEmpty) {
aggregateExpressions.head.mode match {
case Partial =>
// To fix the expression ids in result expressions being different with those from
// inputAggBufferAttributes, in Partial Aggregate,
// result attributes are recalculated to set the result expressions.
resAttributes = groupingExpressions.map(_.toAttribute) ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
resultExpressions = resAttributes
case _ =>
}
}

// Members declared in org.apache.spark.sql.execution.AliasAwareOutputPartitioning
override protected def outputExpressions: Seq[NamedExpression] = resultExpressions

// Members declared in org.apache.spark.sql.execution.CodegenSupport
protected def doProduce(ctx: CodegenContext): String = throw new UnsupportedOperationException()

// Members declared in org.apache.spark.sql.catalyst.plans.QueryPlan
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
override def output: Seq[Attribute] = resAttributes

// Members declared in org.apache.spark.sql.execution.SparkPlan
protected override def doExecute()
Expand Down Expand Up @@ -398,30 +412,7 @@ case class ColumnarHashAggregateExec(
expr.mode match {
case Final =>
val out_res = 0
resultColumnVectors(idx).dataType match {
case t: IntegerType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].intValue)
case t: LongType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].longValue)
case t: DoubleType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].doubleValue())
case t: FloatType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].floatValue())
case t: ByteType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].byteValue())
case t: ShortType =>
resultColumnVectors(idx)
.put(0, out_res.asInstanceOf[Number].shortValue())
case t: StringType =>
val values = (out_res :: Nil).map(_.toByte).toArray
resultColumnVectors(idx)
.putBytes(0, 1, values, 0)
}
putDataIntoVector(resultColumnVectors, out_res, idx)
idx += 1
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.Random

import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils
import util.control.Breaks._

case class ColumnarWindowExec(windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
Expand All @@ -60,6 +61,8 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression],

override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute)

buildCheck()

override def requiredChildDistribution: Seq[Distribution] = {
if (partitionSpec.isEmpty) {
// Only show warning when the number of bytes is larger than 100 MiB?
Expand Down Expand Up @@ -91,6 +94,29 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression],
val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo

def buildCheck(): Unit = {
var allLiteral = true
try {
breakable {
for (func <- validateWindowFunctions()) {
for (child <- func._2.children) {
if (!child.isInstanceOf[Literal]) {
allLiteral = false
break
}
}
}
}
} catch {
case e: Throwable =>
throw new UnsupportedOperationException(s"${e.getMessage}")
}
if (allLiteral) {
throw new UnsupportedOperationException(
s"Window functions' children all being Literal is not supported.")
}
}

def checkAggFunctionSpec(windowSpec: WindowSpecDefinition): Unit = {
if (windowSpec.orderSpec.nonEmpty) {
throw new UnsupportedOperationException("unsupported operation for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
}

// FIXME ZONE issue
ignore("date type - cast from timestamp") {
test("date type - cast from timestamp") {
withTempView("dates") {
val dates = (0L to 3L).map(i => i * 24 * 1000 * 3600)
.map(i => Tuple1(new Timestamp(i)))
Expand Down Expand Up @@ -248,7 +248,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
}

// todo: fix field/literal implicit conversion in ColumnarExpressionConverter
ignore("date type - join on, bhj") {
test("date type - join on, bhj") {
withTempView("dates1", "dates2") {
val dates1 = (0L to 3L).map(i => i * 1000 * 3600 * 24)
.map(i => Tuple1(new Date(i))).toDF("time1")
Expand Down Expand Up @@ -750,7 +750,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
}
}

ignore("datetime function - to_date with format") { // todo GetTimestamp IS PRIVATE ?
test("datetime function - to_date with format") { // todo GetTimestamp IS PRIVATE ?
withTempView("dates") {

val dates = Seq("2009-07-30", "2009-07-31", "2009-08-01")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
super.afterAll()
}

ignore("window queries") {
test("window queries") {
runner.runTPCQuery("q12", 1, true)
runner.runTPCQuery("q20", 1, true)
runner.runTPCQuery("q36", 1, true)
Expand Down Expand Up @@ -103,7 +103,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
df.show()
}

ignore("window function with decimal input") {
test("window function with decimal input") {
val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_current_price)" +
" OVER (PARTITION BY i_class_id) FROM item LIMIT 1000")
df.explain()
Expand All @@ -118,7 +118,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
df.show()
}

ignore("window function with decimal input 2") {
test("window function with decimal input 2") {
val df = spark.sql("SELECT i_item_sk, i_class_id, RANK()" +
" OVER (PARTITION BY i_class_id ORDER BY i_current_price) FROM item LIMIT 1000")
df.explain()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ArrowColumnarBatchSerializerSuite extends SparkFunSuite with SharedSparkSe
SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer number of output rows")
}

ignore("deserialize all null") {
test("deserialize all null") {
val input = getTestResourcePath("test-data/native-splitter-output-all-null")
val serializer =
new ArrowColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance()
Expand Down Expand Up @@ -64,7 +64,7 @@ class ArrowColumnarBatchSerializerSuite extends SparkFunSuite with SharedSparkSe
deserializedStream.close()
}

ignore("deserialize nullable string") {
test("deserialize nullable string") {
val input = getTestResourcePath("test-data/native-splitter-output-nullable-string")
val serializer =
new ArrowColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ class DataFrameAggregateSuite extends QueryTest
}

Seq(true, false).foreach { value =>
ignore(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) {
withTempView("t1", "t2") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
Row("b", 2, 4, 8)))
}

ignore("null inputs") {
test("null inputs") {
val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2))
.toDF("key", "value")
val window = Window.orderBy()
Expand Down Expand Up @@ -908,7 +908,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}
}

ignore("NaN and -0.0 in window partition keys") {
test("NaN and -0.0 in window partition keys") {
val df = Seq(
(Float.NaN, Double.NaN),
(0.0f/0.0f, 0.0/0.0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
}

ignore("function current_timestamp and now") {
test("function current_timestamp and now") {
val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ import org.apache.spark.sql.internal.SQLConf
* }}}
*/
// scalastyle:on line.size.limit
@deprecated("This test suite is not suitable for native sql engine.", "Mo Rui")
// This test suite is not suitable for native sql engine. (Mo Rui)
/*
trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
private val originalMaxToStringFields = conf.maxToStringFields
Expand Down Expand Up @@ -338,3 +339,4 @@ class TPCDSModifiedPlanStabilityWithStatsSuite extends PlanStabilitySuite {
}
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class ReuseExchangeSuite extends RepartitionSuite {

override lazy val input = spark.read.parquet(filePath)

ignore("columnar exchange same result") {
test("columnar exchange same result") {
val df1 = input.groupBy("n_regionkey").agg(Map("n_nationkey" -> "sum"))
val hashAgg1 = df1.queryExecution.executedPlan.collectFirst {
case agg: ColumnarHashAggregateExec => agg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ class NativeDataFrameAggregateSuite extends QueryTest
}

Seq(true, false).foreach { value =>
ignore(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) {
withTempView("t1", "t2") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class NativeTPCHTableRepartitionSuite extends NativeRepartitionSuite {

override lazy val input = spark.read.format("arrow").load(filePath)

/*
ignore("tpch table round robin partitioning") {
withRepartition(df => df.repartition(2))
}
Expand All @@ -95,6 +96,7 @@ class NativeTPCHTableRepartitionSuite extends NativeRepartitionSuite {
df => df.groupBy("n_regionkey").agg(Map("n_nationkey" -> "sum")),
df => df.repartition(2))
}
*/
}

class NativeDisableColumnarShuffleSuite extends NativeRepartitionSuite {
Expand Down
Loading

0 comments on commit d4c3a21

Please sign in to comment.