Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[DNM] Support spark320 #669

Closed
wants to merge 3 commits into from
Closed

Conversation

zhouyuan
Copy link
Collaborator

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Signed-off-by: Yuan Zhou <[email protected]>
Signed-off-by: Yuan Zhou <[email protected]>
Signed-off-by: Yuan Zhou <[email protected]>
@github-actions
Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/oap-project/native-sql-engine/issues

Then could you also rename commit message and pull request title in the following format?

[NSE-${ISSUES_ID}] ${detailed message}

See also:

@zhouyuan
Copy link
Collaborator Author

@PHILO-HE

@PHILO-HE PHILO-HE self-assigned this Jan 19, 2022
@PHILO-HE
Copy link
Collaborator

I will re-pick this work.
An initial thought is: this patch will not be directly merged into master, instead, we will separate the divergence between spark 3.1 and 3.2 into modules and the classes under these modules will be called through shim layer which can recognize spark versions.

@@ -48,11 +48,11 @@ case class ColumnarCustomShuffleReaderExec(
partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size ==
partitionSpecs.length) {
child match {
case ShuffleQueryStageExec(_, s: ColumnarShuffleExchangeAdaptor) =>
case ShuffleQueryStageExec(_, s: ColumnarShuffleExchangeAdaptor, _) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the code to fix compatibility issues.
Also keep shim layer approach, but commented.

@@ -77,7 +77,7 @@ class ShuffledColumnarBatchRDD(
override def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
partition.asInstanceOf[ShuffledColumnarBatchRDDPartition].spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the code to fix compatibility issues.

@@ -36,12 +36,15 @@ import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
import org.apache.spark.sql.execution.python.ColumnarArrowEvalPythonExec
import org.apache.spark.sql.execution.window.WindowExec

case class RowGuard(child: SparkPlan) extends SparkPlan {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnaryExecNode is the child of SparkPlan.

def children: Seq[SparkPlan] = Seq(child)
//def children: Seq[SparkPlan] = Seq(child)

override protected def withNewChildInternal(newChild: SparkPlan): RowGuard =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -70,7 +73,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
ColumnarArrowEvalPythonExec(plan.udfs, plan.resultAttrs, plan.child, plan.evalType)
case plan: BatchScanExec =>
if (!enableColumnarBatchScan) return false
new ColumnarBatchScanExec(plan.output, plan.scan)
new ColumnarBatchScanExec(plan.output, plan.scan, plan.runtimeFilters)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.

@@ -133,9 +136,9 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
left match {
case exec: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(exec.mode, exec.child)
case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) =>
case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through code refactor.

@@ -147,9 +150,9 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
right match {
case exec: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(exec.mode, exec.child)
case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) =>
case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) =>
Copy link
Collaborator

@PHILO-HE PHILO-HE Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through code refactor.

@@ -239,7 +242,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
case p if !supportCodegen(p) =>
// insert row guard them recursively
p.withNewChildren(p.children.map(insertRowGuardOrNot))
case p: CustomShuffleReaderExec =>
Copy link
Collaborator

@PHILO-HE PHILO-HE Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.

plan.child match {
case shuffle: ColumnarShuffleExchangeAdaptor =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
CoalesceBatchesExec(
ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs))
case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeAdaptor) =>
case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeAdaptor, _) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the code, similiar to ColumnarCustomShuffleReaderExec.

BroadcastQueryStageExec(
curPlan.id,
BroadcastExchangeExec(
val newBroadcast = BroadcastExchangeExec(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.

curPlan.id,
BroadcastExchangeExec(
curPlan.id, newBroadcast, newBroadcast.doCanonicalize)
case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above.

@@ -25,8 +25,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan)
extends BatchScanExec(output, scan) {
class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan, runtimeFilters: Seq[Expression])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this class into shim layers respectively for spark 3.1/3.2.
Code related to Gazelle config is removed to get rid of GazellePluginConfig dependency. If not, cyclic dependency needs to be handled.

@@ -254,12 +254,6 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending. Must be deleted?

@@ -31,6 +31,27 @@
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
Copy link
Collaborator

@PHILO-HE PHILO-HE Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just applicable for then spark snapshot. For release version, it is not needed.

@@ -241,10 +241,12 @@ object RowToColumnConverter {
* populate with [[RowToColumnConverter]], but the performance requirements are different and it
* would only be to reduce code.
*/
case class RowToArrowColumnarExec(child: SparkPlan) extends UnaryExecNode {
trait RowToArrowColumnarTransition extends UnaryExecNode
Copy link
Collaborator

@PHILO-HE PHILO-HE Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks useless. Keep the following extending relation unchanged.

// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.

@@ -137,6 +137,9 @@ object ArrowWriteExtension {
private case class ColumnarToFakeRowLogicAdaptor(child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: LogicalPlan): ColumnarToFakeRowLogicAdaptor =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this method for both spark 3.1 & 3.2 and override keyword is omitted intentionally for compatibility consideration.

@@ -94,6 +94,10 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab
override def close(): Unit = {
writeQueue.close()
}

override def path(): String = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.
TODO: it seems we can put the new interface method here and omit override key word.

@@ -99,7 +99,7 @@ case class ColumnarConditionProjectExec(
}
}

def isNullIntolerant(expr: Expression): Boolean = expr match {
override def isNullIntolerant(expr: Expression): Boolean = expr match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This impl is as same as the impl in it's parent class, PredicateHelper, in spark 3.2. For keeping the code for spark 3.1 workable, we just changed the method name. For spark3.1. isNullIntolerant is not an interface method or abstract method.

@@ -55,6 +55,57 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
import org.apache.spark.sql.execution.joins.{HashJoin,ShuffledJoin,BaseJoinExec}
import org.apache.spark.sql.execution.joins.HashedRelationInfo

trait ColumnarShuffledJoin extends BaseJoinExec {
Copy link
Collaborator

@PHILO-HE PHILO-HE Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks as same as ShuffledJoin in Spark 3.2. Let's not add this trait.

@@ -59,7 +59,7 @@ case class ColumnarBroadcastHashJoinExec(
nullAware: Boolean = false)
extends BaseJoinExec
with ColumnarCodegenSupport
with ShuffledJoin {
with ColumnarShuffledJoin {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still let it extend ShuffledJoin. Need verify in compile and test.


override lazy val outputPartitioning: Partitioning = {
joinType match {
case _: InnerLike if broadcastHashJoinOutputPartitioningExpandLimit > 0 =>
case _: InnerLike if conf.broadcastHashJoinOutputPartitioningExpandLimit > 0 =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.

@@ -307,7 +306,7 @@ case class ColumnarHashAggregateExec(
val aggregateFunc = exp.aggregateFunction
val out_res = aggregateFunc.children.head.asInstanceOf[Literal].value
aggregateFunc match {
case Sum(_) =>
case Sum(_, _) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use type matching.

@@ -271,11 +271,7 @@ object ColumnarExpressionConverter extends Logging {
columnarDivide,
expr)
}
case oaps: com.intel.oap.expression.ColumnarScalarSubquery =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnarScalarSubquery is useless, so we removed the code where it is used. Not relevant to spark 3.1/3.2 compatibility.

@@ -1,120 +0,0 @@
/*
Copy link
Collaborator

@PHILO-HE PHILO-HE Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted

@@ -69,7 +69,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
ColumnarArrowEvalPythonExec(plan.udfs, plan.resultAttrs, columnarChild, plan.evalType)
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarBatchScanExec(plan.output, plan.scan)
new ColumnarBatchScanExec(plan.output, plan.scan, plan.runtimeFilters)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.

@@ -44,7 +44,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
var isSupportAdaptive: Boolean = true

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
case RowGuard(child: CustomShuffleReaderExec) =>
case RowGuard(child: AQEShuffleReadExec) =>
Copy link
Collaborator

@PHILO-HE PHILO-HE Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In spark 3.2, CustomShuffleReaderExec is renamed to AQEShuffleReadExec.
ColumnarCustomShuffleReaderExec's paritial code is ported from CustomShuffleReaderExec/AQEShuffleReadExec.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShufflePartitionUtils.scala also imported CustomShuffleReaderExec, so also needs to be fixed.



@transient
private lazy val maxBroadcastRows = mode match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.
TDOO: whether the update on spark 3.2 is applicable to 3.1.

@@ -44,7 +44,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
var isSupportAdaptive: Boolean = true

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
case RowGuard(child: CustomShuffleReaderExec) =>
case RowGuard(child: AQEShuffleReadExec) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add guard logic to check the type on shim layer.

@@ -204,17 +204,17 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
plan

case plan: CustomShuffleReaderExec if columnarConf.enableColumnarShuffle =>
case plan: AQEShuffleReadExec if columnarConf.enableColumnarShuffle =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add guard logic to check the type on shim layer.
plan.child, plan.partitionSpecs should be obtained on shim layer.

def output: Seq[Attribute] = child.output
protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException
}
def children: Seq[SparkPlan] = Seq(child)
Copy link
Collaborator

@PHILO-HE PHILO-HE Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A parent class, UnaryLike, has already defined a val called children for spark 3.2.
If we also let this class extend UnaryExecNode for spark 3.1, UnaryExecNode already contains this childiren method. So if we let this class extend UnaryExecNode for both spark 3.1 & 3.2, the children method here can be deleted.

@@ -84,7 +84,7 @@ class ColumnarShuffleWriter[K, V](
override def write(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
partitionLengths = new Array[Long](dep.partitioner.numPartitions)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, null)
shuffleBlockResolver.writeMetadataFileAndCommit(dep.shuffleId, mapId, partitionLengths, null, null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.

@@ -108,7 +108,6 @@ class ColumnarShuffleManager(conf: SparkConf) extends ShuffleManager with Loggin
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(
shuffleBlockResolver,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.

@transient
private[sql] lazy val relationFuture: java.util.concurrent.Future[broadcast.Broadcast[Any]] = {
SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
sqlContext.sparkSession,
session,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed through shim layer.
They are accessible in parent class, SparkPlan. For spark 3.1, it is sqlContext. But for spark 3.2, it is session.

releasedOrClosed: AtomicBoolean,
context: TaskContext): Iterator[ColumnarBatch] = {

new ReaderIterator(stream, writerThread, startTime, env, worker, releasedOrClosed, context) {
new ReaderIterator(stream, writerThread, startTime, env, worker, pid, releasedOrClosed, context) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by introducing an abstract child class in shim layer.

@@ -1,729 +0,0 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move into spark3.1 shim layer. For spark3.2, use spark3.2 dependency.

@@ -1,933 +0,0 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move into spark3.1 shim layer. For spark3.2, use spark3.2 dependency.

@zhouyuan
Copy link
Collaborator Author

zhouyuan commented Mar 1, 2022

replaced by #742

@zhouyuan zhouyuan closed this Mar 1, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants