Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-359] [NSE-273] Introduce shim layer to fix compatibility issues for gazelle on spark 3.1 & 3.2 #742

Merged
merged 39 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b454397
Initial commit
PHILO-HE Feb 14, 2022
bcfdda8
Add withNewChildInternal & withNewChildrenInternal for spark 3.2 comp…
PHILO-HE Feb 15, 2022
a678961
Fix compatibility issues for ParquetFileFormat, etc.
PHILO-HE Feb 16, 2022
6f10324
Fix compatibility issue for ColumnarBatchScanExec
PHILO-HE Feb 17, 2022
1f62997
Fix compatibility issues for ColumnarBroadcastHashJoinExec
PHILO-HE Feb 18, 2022
fbd0818
Fix compatibility issues for ColumnarHashAggregateExec
PHILO-HE Feb 18, 2022
1d85f39
Fix compatibility issues for ColumnarShuffledHashJoinExec, etc
PHILO-HE Feb 18, 2022
62a411c
Remove ColumnarScalarSubquery and the code where it is used, not rele…
PHILO-HE Feb 18, 2022
6e6bc88
Fix compatibility issues for ColumnarShuffleManager, etc.
PHILO-HE Feb 21, 2022
dfe32ef
Fix compatibility issues for ColumnarArrowPythonRunner etc.
PHILO-HE Feb 22, 2022
3fb5f5c
Set different scala version for spark 3.1/3.2
PHILO-HE Feb 22, 2022
f121a2c
Fix compatibility issues for ColumnarCustomShuffleReaderExec.scala
PHILO-HE Feb 22, 2022
f621b7a
Fix compatibility issues for ShuffledColumnarBatchRDD
PHILO-HE Feb 23, 2022
659b5f4
Refactor the code to refix compatibility issues for ColumnarCustomShu…
PHILO-HE Feb 23, 2022
e1908c5
Multipe fixes in match/case statements
PHILO-HE Feb 23, 2022
2c24d7a
Set jackson versions for different versions of spark
PHILO-HE Feb 23, 2022
7e62299
Fix compatibility issues caused by renaming CustomShuffleReaderExec t…
PHILO-HE Feb 24, 2022
3440ccb
Small fixes
PHILO-HE Feb 24, 2022
71036c8
Fix compatibility issues for ReaderIterator
PHILO-HE Feb 25, 2022
9249de6
Move AdaptiveSparkPlanExec & MemoryStore under shim layer
PHILO-HE Feb 25, 2022
eddf4cc
Fix compatibility issues for ShufflePartitionUtils
PHILO-HE Feb 25, 2022
ca83b1f
Fix issues found in building
PHILO-HE Feb 25, 2022
6e26c4e
Fix cyclic dependency and import missing dependencies
PHILO-HE Feb 28, 2022
7cde900
Fix accessibility issues about IndexShuffleBlockResolver, BaseShuffle…
PHILO-HE Feb 28, 2022
c4157fa
Fix dependency issue for ColumnarBatchScanExec
PHILO-HE Feb 28, 2022
fd0d30c
Fix compile issues in spark311 module
PHILO-HE Mar 1, 2022
725f0fd
Fix compatibility issues for ReaderIterator
PHILO-HE Mar 1, 2022
52a2c62
Move Utils.doFetchFile to specific package and do some code refactor
PHILO-HE Mar 1, 2022
c9dddd7
Make ColumnarBatchScanExec abstract to break the cyclic dependency
PHILO-HE Mar 2, 2022
6e075e9
Use a more concise way to fix compatibility issue for OutputWriter
PHILO-HE Mar 2, 2022
46f0009
Add 3.1 or 3.2 shim layer dependency for NSE core module according to…
PHILO-HE Mar 2, 2022
e868227
Convert some child plan to expected type
PHILO-HE Mar 2, 2022
54863e5
Fix compile issues on spark 3.2.0
PHILO-HE Mar 3, 2022
540bf32
Change the extension from ShuffledJoin to ColumnarShuffledJoin for Co…
PHILO-HE Mar 3, 2022
7b326a2
Remove useless code
PHILO-HE Mar 3, 2022
bee7260
Change spark version to 3.2.1 for profile spark-3.2
PHILO-HE Mar 3, 2022
e36e3d9
Fix compatibility issues from spark 3.2.0 to 3.2.1
PHILO-HE Mar 4, 2022
9a72e3f
Fix dependency issue for github action test
PHILO-HE Mar 8, 2022
6dad071
Put fully-qualified class name under resources/META-INF/services for …
PHILO-HE Mar 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,8 @@ case class RowToArrowColumnarExec(child: SparkPlan) extends UnaryExecNode {
}
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): RowToArrowColumnarExec =
copy(child = newChild)
}
6 changes: 6 additions & 0 deletions arrow-data-source/parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
<artifactId>spark-arrow-datasource-standard</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.util.{Failure, Try}

import com.intel.oap.spark.sql.execution.datasources.arrow.ArrowFileFormat
import com.intel.oap.sql.shims.SparkShimLoader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, JobID, OutputCommitter, TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
Expand Down Expand Up @@ -274,6 +275,7 @@ class ParquetFileFormat
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand All @@ -292,11 +294,17 @@ class ParquetFileFormat

lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData

val datetimeRebaseMode =
SparkShimLoader.getSparkShims.getDatetimeRebaseMode(footerFileMetaData, parquetOptions)

// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters =
SparkShimLoader.getSparkShims.newParquetFilters(parquetSchema: MessageType,
pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStringStartWith,
pushDownInFilterThreshold, isCaseSensitive, datetimeRebaseMode)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand All @@ -322,10 +330,6 @@ class ParquetFileFormat
None
}

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))

val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
</pluginRepositories>

<dependencies>
<dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
Expand Down
6 changes: 6 additions & 0 deletions arrow-data-source/standard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
<artifactId>spark-arrow-datasource-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ object ArrowWriteExtension {
private case class ColumnarToFakeRowLogicAdaptor(child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output

// For spark 3.2.
protected def withNewChildInternal(newChild: LogicalPlan): ColumnarToFakeRowLogicAdaptor =
copy(child = newChild)
}

private case class ColumnarToFakeRowAdaptor(child: SparkPlan) extends ColumnarToRowTransition {
Expand All @@ -149,6 +153,10 @@ object ArrowWriteExtension {
}

override def output: Seq[Attribute] = child.output

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarToFakeRowAdaptor =
copy(child = newChild)
}

case class SimpleStrategy() extends Strategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
override def close(): Unit = {
writeQueue.close()
}

// Do NOT add override keyword for compatibility on spark 3.1.
def path(): String = {
path
}
}
}
}
Expand Down
35 changes: 31 additions & 4 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@
<nativesql.build_protobuf>${build_protobuf}</nativesql.build_protobuf>
<nativesql.build_jemalloc>${build_jemalloc}</nativesql.build_jemalloc>
</properties>

<profiles>
<profile>
<id>spark-3.1.1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark311</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.2.0</id>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark320</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
<!-- Prevent our dummy JAR from being included in Spark distributions or uploaded to YARN -->
<dependency>
Expand Down Expand Up @@ -166,19 +193,19 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -299,7 +326,7 @@
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ case class CoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode {
new CloseableColumnBatchIterator(res)
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): CoalesceBatchesExec =
copy(child = newChild)
}

object CoalesceBatchesExec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ case class ColumnarConditionProjectExec(
}
}

def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
// In spark 3.2, PredicateHelper has already introduced isNullIntolerant with completely same
// code. If we use the same method name, override keyword is required. But in spark3.1, no
// method is overridden. So we use an independent method name.
def isNullIntolerantInternal(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerantInternal)
case _ => false
}

Expand All @@ -110,7 +113,7 @@ case class ColumnarConditionProjectExec(

val notNullAttributes = if (condition != null) {
val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
case IsNotNull(a) => isNullIntolerant(a) && a.references.subsetOf(child.outputSet)
case IsNotNull(a) => isNullIntolerantInternal(a) && a.references.subsetOf(child.outputSet)
case _ => false
}
notNullPreds.flatMap(_.references).distinct.map(_.exprId)
Expand Down Expand Up @@ -267,6 +270,9 @@ case class ColumnarConditionProjectExec(
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarConditionProjectExec =
copy(child = newChild)
}

case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
Expand Down Expand Up @@ -308,6 +314,10 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

// For spark 3.2.
protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): ColumnarUnionExec =
copy(children = newChildren)
}

//TODO(): consolidate locallimit and globallimit
Expand Down Expand Up @@ -380,6 +390,10 @@ case class ColumnarLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExe
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

protected def withNewChildInternal(newChild: SparkPlan):
ColumnarLocalLimitExec =
copy(child = newChild)

}

case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
Expand Down Expand Up @@ -451,4 +465,8 @@ case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitEx
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

protected def withNewChildInternal(newChild: SparkPlan):
ColumnarGlobalLimitExec =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import com.google.common.collect.Lists
import com.intel.oap.GazellePluginConfig
import com.intel.oap.expression._
import com.intel.oap.vectorized.{ExpressionEvaluator, _}
import com.intel.oap.sql.shims.SparkShimLoader
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.{ArrowType, Field}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
Expand All @@ -35,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils}
Expand All @@ -59,7 +62,7 @@ case class ColumnarBroadcastHashJoinExec(
nullAware: Boolean = false)
extends BaseJoinExec
with ColumnarCodegenSupport
with ShuffledJoin {
with ColumnarShuffledJoin {

val sparkConf = sparkContext.getConf
val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo
Expand Down Expand Up @@ -89,6 +92,9 @@ case class ColumnarBroadcastHashJoinExec(
}
buildCheck()

// A method in ShuffledJoin of spark3.2.
def isSkewJoin: Boolean = false

def buildCheck(): Unit = {
joinType match {
case _: InnerLike =>
Expand Down Expand Up @@ -145,13 +151,13 @@ case class ColumnarBroadcastHashJoinExec(
throw new UnsupportedOperationException(
s"ColumnarBroadcastHashJoinExec doesn't support doExecute")
}

val isNullAwareAntiJoin : Boolean = nullAware

val broadcastHashJoinOutputPartitioningExpandLimit: Int = sqlContext.getConf(
"spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit").trim().toInt

override lazy val outputPartitioning: Partitioning = {
val broadcastHashJoinOutputPartitioningExpandLimit: Int =
SparkShimLoader
.getSparkShims
.getBroadcastHashJoinOutputPartitioningExpandLimit(this: SparkPlan)
joinType match {
case _: InnerLike if broadcastHashJoinOutputPartitioningExpandLimit > 0 =>
streamedPlan.outputPartitioning match {
Expand Down Expand Up @@ -193,7 +199,10 @@ case class ColumnarBroadcastHashJoinExec(
// Seq("a", "b", "c"), Seq("a", "b", "y"), Seq("a", "x", "c"), Seq("a", "x", "y").
// The expanded expressions are returned as PartitioningCollection.
private def expandOutputPartitioning(partitioning: HashPartitioning): PartitioningCollection = {
val maxNumCombinations = broadcastHashJoinOutputPartitioningExpandLimit
val maxNumCombinations =
SparkShimLoader
.getSparkShims
.getBroadcastHashJoinOutputPartitioningExpandLimit(this: SparkPlan)
var currentNumCombinations = 0

def generateExprCombinations(current: Seq[Expression],
Expand Down Expand Up @@ -640,4 +649,9 @@ case class ColumnarBroadcastHashJoinExec(
}

}

// For spark 3.2.
protected def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan):
ColumnarBroadcastHashJoinExec =
copy(left = newLeft, right = newRight)
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ case class ColumnarCoalesceExec(numPartitions: Int, child: SparkPlan) extends Un
child.executeColumnar().coalesce(numPartitions, shuffle = false)
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarCoalesceExec =
copy(child = newChild)
}

object ColumnarCoalesceExec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,8 @@ case class ColumnarExpandExec(
new CloseableColumnBatchIterator(res)
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarExpandExec =
copy(child = newChild)
}
Loading