Skip to content

Commit

Permalink
Remove the tailing c2r for DataWritingCommandExec
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Mar 15, 2024
1 parent 4ba695a commit ce4c729
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,6 +109,10 @@ object GpuDataWritingCommand {
}
}

/**
* GpuDataWritingCommandExec is usually the root operator, and so far it returns
* empty data, so it is safe to make it support row-based execution.
*/
case class GpuDataWritingCommandExec(cmd: GpuDataWritingCommand, child: SparkPlan)
extends ShimUnaryExecNode with GpuExec {
override lazy val allMetrics: Map[String, GpuMetric] = GpuMetric.wrap(cmd.metrics)
Expand All @@ -123,18 +127,41 @@ case class GpuDataWritingCommandExec(cmd: GpuDataWritingCommand, child: SparkPla
// override the default one, otherwise the `cmd.nodeName` will appear twice from simpleString
override def argString(maxFields: Int): String = cmd.argString(maxFields)

override def executeCollect(): Array[InternalRow] = throw new UnsupportedOperationException(
s"${getClass.getCanonicalName} does not support row-based execution")
override def executeCollect(): Array[InternalRow] = {
if (sideEffectResult.isEmpty) {
Array.empty[InternalRow]
} else {
throw new UnsupportedOperationException(
s"${getClass.getCanonicalName} does not support row-based execution")
}
}

override def executeToIterator: Iterator[InternalRow] = throw new UnsupportedOperationException(
s"${getClass.getCanonicalName} does not support row-based execution")
override def executeToIterator: Iterator[InternalRow] = {
if (sideEffectResult.isEmpty) {
Iterator.empty
} else {
throw new UnsupportedOperationException(
s"${getClass.getCanonicalName} does not support row-based execution")
}
}

override def executeTake(limit: Int): Array[InternalRow] =
throw new UnsupportedOperationException(
s"${getClass.getCanonicalName} does not support row-based execution")
override def executeTake(limit: Int): Array[InternalRow] = {
if (sideEffectResult.isEmpty) {
Array.empty[InternalRow]
} else {
throw new UnsupportedOperationException(
s"${getClass.getCanonicalName} does not support row-based execution")
}
}

protected override def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException(
s"${getClass.getCanonicalName} does not support row-based execution")
protected override def doExecute(): RDD[InternalRow] = {
if (sideEffectResult.isEmpty) {
sparkContext.parallelize(Seq.empty[InternalRow], 1)
} else {
throw new UnsupportedOperationException(
s"${getClass.getCanonicalName} does not support row-based execution")
}
}

override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
sparkContext.parallelize(sideEffectResult, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,15 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
}
}

/**
* So far (2024-03-15) the DataWritingCommandExec in Spark always returns empty data,
* so the following ColumnarToRow is useless. Safe to remove it.
*/
private def removeUselessColumnarToRow(plan: SparkPlan): SparkPlan = plan match {
case GpuColumnarToRowExec(child @ GpuDataWritingCommandExec(_, _), _) => child
case _ => plan
}

override def apply(sparkPlan: SparkPlan): SparkPlan = GpuOverrideUtil.tryOverride { plan =>
this.rapidsConf = new RapidsConf(plan.conf)
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU) {
Expand Down Expand Up @@ -829,7 +838,8 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
}

insertStageLevelMetrics(updatedPlan)
updatedPlan
// TODO Add a config to support disabling this rule
removeUselessColumnarToRow(updatedPlan)
}
} else {
plan
Expand Down

0 comments on commit ce4c729

Please sign in to comment.