Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-359] [NSE-273] Introduce shim layer to fix compatibility issues for gazelle on spark 3.1 & 3.2 #742

Merged
merged 39 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b454397
Initial commit
PHILO-HE Feb 14, 2022
bcfdda8
Add withNewChildInternal & withNewChildrenInternal for spark 3.2 comp…
PHILO-HE Feb 15, 2022
a678961
Fix compatibility issues for ParquetFileFormat, etc.
PHILO-HE Feb 16, 2022
6f10324
Fix compatibility issue for ColumnarBatchScanExec
PHILO-HE Feb 17, 2022
1f62997
Fix compatibility issues for ColumnarBroadcastHashJoinExec
PHILO-HE Feb 18, 2022
fbd0818
Fix compatibility issues for ColumnarHashAggregateExec
PHILO-HE Feb 18, 2022
1d85f39
Fix compatibility issues for ColumnarShuffledHashJoinExec, etc
PHILO-HE Feb 18, 2022
62a411c
Remove ColumnarScalarSubquery and the code where it is used, not rele…
PHILO-HE Feb 18, 2022
6e6bc88
Fix compatibility issues for ColumnarShuffleManager, etc.
PHILO-HE Feb 21, 2022
dfe32ef
Fix compatibility issues for ColumnarArrowPythonRunner etc.
PHILO-HE Feb 22, 2022
3fb5f5c
Set different scala version for spark 3.1/3.2
PHILO-HE Feb 22, 2022
f121a2c
Fix compatibility issues for ColumnarCustomShuffleReaderExec.scala
PHILO-HE Feb 22, 2022
f621b7a
Fix compatibility issues for ShuffledColumnarBatchRDD
PHILO-HE Feb 23, 2022
659b5f4
Refactor the code to refix compatibility issues for ColumnarCustomShu…
PHILO-HE Feb 23, 2022
e1908c5
Multipe fixes in match/case statements
PHILO-HE Feb 23, 2022
2c24d7a
Set jackson versions for different versions of spark
PHILO-HE Feb 23, 2022
7e62299
Fix compatibility issues caused by renaming CustomShuffleReaderExec t…
PHILO-HE Feb 24, 2022
3440ccb
Small fixes
PHILO-HE Feb 24, 2022
71036c8
Fix compatibility issues for ReaderIterator
PHILO-HE Feb 25, 2022
9249de6
Move AdaptiveSparkPlanExec & MemoryStore under shim layer
PHILO-HE Feb 25, 2022
eddf4cc
Fix compatibility issues for ShufflePartitionUtils
PHILO-HE Feb 25, 2022
ca83b1f
Fix issues found in building
PHILO-HE Feb 25, 2022
6e26c4e
Fix cyclic dependency and import missing dependencies
PHILO-HE Feb 28, 2022
7cde900
Fix accessibility issues about IndexShuffleBlockResolver, BaseShuffle…
PHILO-HE Feb 28, 2022
c4157fa
Fix dependency issue for ColumnarBatchScanExec
PHILO-HE Feb 28, 2022
fd0d30c
Fix compile issues in spark311 module
PHILO-HE Mar 1, 2022
725f0fd
Fix compatibility issues for ReaderIterator
PHILO-HE Mar 1, 2022
52a2c62
Move Utils.doFetchFile to specific package and do some code refactor
PHILO-HE Mar 1, 2022
c9dddd7
Make ColumnarBatchScanExec abstract to break the cyclic dependency
PHILO-HE Mar 2, 2022
6e075e9
Use a more concise way to fix compatibility issue for OutputWriter
PHILO-HE Mar 2, 2022
46f0009
Add 3.1 or 3.2 shim layer dependency for NSE core module according to…
PHILO-HE Mar 2, 2022
e868227
Convert some child plan to expected type
PHILO-HE Mar 2, 2022
54863e5
Fix compile issues on spark 3.2.0
PHILO-HE Mar 3, 2022
540bf32
Change the extension from ShuffledJoin to ColumnarShuffledJoin for Co…
PHILO-HE Mar 3, 2022
7b326a2
Remove useless code
PHILO-HE Mar 3, 2022
bee7260
Change spark version to 3.2.1 for profile spark-3.2
PHILO-HE Mar 3, 2022
e36e3d9
Fix compatibility issues from spark 3.2.0 to 3.2.1
PHILO-HE Mar 4, 2022
9a72e3f
Fix dependency issue for github action test
PHILO-HE Mar 8, 2022
6dad071
Put fully-qualified class name under resources/META-INF/services for …
PHILO-HE Mar 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ jobs:
- name: Run unit tests
run: |
mvn clean install -N
cd arrow-data-source
mvn clean install -DskipTests -Dbuild_arrow=OFF
cd ..
mvn clean install -DskipTests -Dbuild_arrow=OFF -pl arrow-data-source
mvn clean package -P full-scala-compiler -Phadoop-2.7.4 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF
mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop2.7" &> log-file.log
echo '#!/bin/bash' > grep.sh
Expand Down Expand Up @@ -144,9 +142,7 @@ jobs:
- name: Run unit tests
run: |
mvn clean install -N
cd arrow-data-source
mvn clean install -DskipTests -Dbuild_arrow=OFF
cd ..
mvn clean install -DskipTests -Dbuild_arrow=OFF -pl arrow-data-source
mvn clean package -P full-scala-compiler -Phadoop-3.2 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF
mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop3.2" &> log-file.log
echo '#!/bin/bash' > grep.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,8 @@ case class RowToArrowColumnarExec(child: SparkPlan) extends UnaryExecNode {
}
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): RowToArrowColumnarExec =
copy(child = newChild)
}
6 changes: 6 additions & 0 deletions arrow-data-source/parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
<artifactId>spark-arrow-datasource-standard</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.util.{Failure, Try}

import com.intel.oap.spark.sql.execution.datasources.arrow.ArrowFileFormat
import com.intel.oap.sql.shims.SparkShimLoader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, JobID, OutputCommitter, TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
Expand Down Expand Up @@ -274,6 +275,7 @@ class ParquetFileFormat
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand All @@ -292,11 +294,17 @@ class ParquetFileFormat

lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData

val datetimeRebaseMode =
SparkShimLoader.getSparkShims.getDatetimeRebaseMode(footerFileMetaData, parquetOptions)

// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters =
SparkShimLoader.getSparkShims.newParquetFilters(parquetSchema: MessageType,
pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStringStartWith,
pushDownInFilterThreshold, isCaseSensitive, footerFileMetaData, parquetOptions)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand All @@ -322,10 +330,6 @@ class ParquetFileFormat
None
}

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))

val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
Expand All @@ -337,12 +341,14 @@ class ParquetFileFormat
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
"",
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
val vectorizedReader = SparkShimLoader.getSparkShims
.newVectorizedParquetRecordReader(
convertTz.orNull,
footerFileMetaData,
parquetOptions,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)

val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
Expand All @@ -358,8 +364,8 @@ class ParquetFileFormat
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz, enableVectorizedReader = false, datetimeRebaseMode, SQLConf.LegacyBehaviorPolicy.LEGACY)
val readSupport = SparkShimLoader.getSparkShims.newParquetReadSupport(
convertTz, false, footerFileMetaData, parquetOptions)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
</pluginRepositories>

<dependencies>
<dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
Expand Down
6 changes: 6 additions & 0 deletions arrow-data-source/standard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
<artifactId>spark-arrow-datasource-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ object ArrowWriteExtension {
private case class ColumnarToFakeRowLogicAdaptor(child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output

// For spark 3.2.
protected def withNewChildInternal(newChild: LogicalPlan): ColumnarToFakeRowLogicAdaptor =
copy(child = newChild)
}

private case class ColumnarToFakeRowAdaptor(child: SparkPlan) extends ColumnarToRowTransition {
Expand All @@ -149,6 +153,10 @@ object ArrowWriteExtension {
}

override def output: Seq[Attribute] = child.output

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarToFakeRowAdaptor =
copy(child = newChild)
}

case class SimpleStrategy() extends Strategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
override def close(): Unit = {
writeQueue.close()
}

// Do NOT add override keyword for compatibility on spark 3.1.
def path(): String = {
path
}
}
}
}
Expand Down
35 changes: 31 additions & 4 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@
<nativesql.build_protobuf>${build_protobuf}</nativesql.build_protobuf>
<nativesql.build_jemalloc>${build_jemalloc}</nativesql.build_jemalloc>
</properties>

<profiles>
<profile>
<id>spark-3.1.1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark311</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.2</id>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark321</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
<!-- Prevent our dummy JAR from being included in Spark distributions or uploaded to YARN -->
<dependency>
Expand Down Expand Up @@ -166,19 +193,19 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -299,7 +326,7 @@
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.*;
import java.util.stream.Collectors;

import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader;
import org.apache.spark.sql.execution.datasources.VectorizedParquetRecordReaderChild;
import com.intel.oap.datasource.parquet.ParquetReader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
Expand All @@ -46,7 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VectorizedParquetArrowReader extends VectorizedParquetRecordReader {
public class VectorizedParquetArrowReader extends VectorizedParquetRecordReaderChild {
private static final Logger LOG =
LoggerFactory.getLogger(VectorizedParquetArrowReader.class);
private ParquetReader reader = null;
Expand All @@ -70,7 +70,8 @@ public class VectorizedParquetArrowReader extends VectorizedParquetRecordReader

public VectorizedParquetArrowReader(String path, ZoneId convertTz, boolean useOffHeap,
int capacity, StructType sourceSchema, StructType readDataSchema, String tmp_dir) {
super(convertTz, "CORRECTED", "LEGACY", useOffHeap, capacity);
// TODO: datetimeRebaseTz & int96RebaseTz are set to "", needs to check the impact.
super(convertTz, "CORRECTED", "", "LEGACY", "", useOffHeap, capacity);
this.capacity = capacity;
this.path = path;
this.tmp_dir = tmp_dir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ case class CoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode {
new CloseableColumnBatchIterator(res)
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): CoalesceBatchesExec =
copy(child = newChild)
}

object CoalesceBatchesExec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ case class ColumnarConditionProjectExec(
}
}

def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
// In spark 3.2, PredicateHelper has already introduced isNullIntolerant with completely same
// code. If we use the same method name, override keyword is required. But in spark3.1, no
// method is overridden. So we use an independent method name.
def isNullIntolerantInternal(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerantInternal)
case _ => false
}

Expand All @@ -110,7 +113,7 @@ case class ColumnarConditionProjectExec(

val notNullAttributes = if (condition != null) {
val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
case IsNotNull(a) => isNullIntolerant(a) && a.references.subsetOf(child.outputSet)
case IsNotNull(a) => isNullIntolerantInternal(a) && a.references.subsetOf(child.outputSet)
case _ => false
}
notNullPreds.flatMap(_.references).distinct.map(_.exprId)
Expand Down Expand Up @@ -267,6 +270,9 @@ case class ColumnarConditionProjectExec(
}
}

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarConditionProjectExec =
copy(child = newChild)
}

case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
Expand Down Expand Up @@ -308,6 +314,10 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

// For spark 3.2.
protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): ColumnarUnionExec =
copy(children = newChildren)
}

//TODO(): consolidate locallimit and globallimit
Expand Down Expand Up @@ -380,6 +390,10 @@ case class ColumnarLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExe
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

protected def withNewChildInternal(newChild: SparkPlan):
ColumnarLocalLimitExec =
copy(child = newChild)

}

case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
Expand Down Expand Up @@ -451,4 +465,8 @@ case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitEx
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

protected def withNewChildInternal(newChild: SparkPlan):
ColumnarGlobalLimitExec =
copy(child = newChild)
}
Loading