From ced2c77a7eefa748667c81f7f0b4e1d15e23cbbc Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 10 Aug 2020 17:30:59 -0700 Subject: [PATCH 1/5] Add metrics to GpuUnion operator --- .../spark/rapids/basicPhysicalOperators.scala | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index 3f633d24a52..b1aa630e9ca 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -114,7 +114,7 @@ object GpuFilter { Seq(filtered, tbl, filterConditionCv, batch).safeClose() } - numOutputBatches += 1 + numOutputBatches += 10 numOutputRows += filteredBatch.numRows() filteredBatch } finally { @@ -296,6 +296,22 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range throw new IllegalStateException(s"Row-based execution should not occur for $this") } +object unionMetrics { + def apply( + batch: ColumnarBatch, + outputBatches: SQLMetric, + outputRows: SQLMetric, + totalTime: SQLMetric): ColumnarBatch = { + val nvtxRange = new NvtxWithMetrics("ProjectExec", NvtxColor.CYAN, totalTime) + try { + outputBatches += 1 + outputRows += batch.numRows() + batch + } finally { + nvtxRange.close() + } + } +} case class GpuUnionExec(children: Seq[SparkPlan]) extends SparkPlan with GpuExec { // updating nullability to make all the children consistent @@ -316,8 +332,18 @@ case class GpuUnionExec(children: Seq[SparkPlan]) extends SparkPlan with GpuExec override def doExecute(): RDD[InternalRow] = throw new IllegalStateException(s"Row-based execution should not occur for $this") - override def doExecuteColumnar(): RDD[ColumnarBatch] = - sparkContext.union(children.map(_.executeColumnar())) + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputRows = longMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES) + val totalTime = longMetric(TOTAL_TIME) + + sparkContext.union(children.map(child => { + val rdd = child.executeColumnar() + rdd.map { batch => + unionMetrics(batch, numOutputBatches, numOutputRows, totalTime) + } + })) + } } case class GpuCoalesceExec(numPartitions: Int, child: SparkPlan) From 10f03c97e57661da1eee5e53220f0ec0869eb843 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 10 Aug 2020 19:39:43 -0700 Subject: [PATCH 2/5] minor fix --- .../nvidia/spark/rapids/basicPhysicalOperators.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index 9e2ed8773a9..de2a5a00b6a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -20,17 +20,16 @@ import ai.rapids.cudf import ai.rapids.cudf.{NvtxColor, Scalar, Table} import com.nvidia.spark.rapids.GpuMetricNames._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ - -import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, IsNotNull, NamedExpression, NullIntolerant, PredicateHelper, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, NullIntolerant, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning} -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode} import org.apache.spark.sql.rapids.GpuPredicateHelper import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} object GpuProjectExec { def projectAndClose[A <: Expression](cb: ColumnarBatch, boundExprs: Seq[A], @@ -115,7 +114,7 @@ object GpuFilter { Seq(filtered, tbl, filterConditionCv, batch).safeClose() } - numOutputBatches += 10 + numOutputBatches += 1 numOutputRows += filteredBatch.numRows() filteredBatch } finally { From adc853163b5ede1e67e9aeb934f2bb8a2f69a942 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 10 Aug 2020 19:47:26 -0700 Subject: [PATCH 3/5] fix imports --- .../com/nvidia/spark/rapids/basicPhysicalOperators.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index de2a5a00b6a..ea5caca6e45 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -20,16 +20,17 @@ import ai.rapids.cudf import ai.rapids.cudf.{NvtxColor, Scalar, Table} import com.nvidia.spark.rapids.GpuMetricNames._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ + +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, NullIntolerant, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, IsNotNull, NamedExpression, NullIntolerant, PredicateHelper, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning} -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.GpuPredicateHelper import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} -import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuProjectExec { def projectAndClose[A <: Expression](cb: ColumnarBatch, boundExprs: Seq[A], From 13ebfe8c9fc50576c3ea2e9c712a299b9e778eab Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 11 Aug 2020 18:09:01 -0700 Subject: [PATCH 4/5] addressed review comments Signed-off-by: Niranjan Artal --- .../spark/rapids/basicPhysicalOperators.scala | 28 ++++--------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index ea5caca6e45..ad38adae908 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -297,23 +297,6 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range throw new IllegalStateException(s"Row-based execution should not occur for $this") } -object unionMetrics { - def apply( - batch: ColumnarBatch, - outputBatches: SQLMetric, - outputRows: SQLMetric, - totalTime: SQLMetric): ColumnarBatch = { - val nvtxRange = new NvtxWithMetrics("ProjectExec", NvtxColor.CYAN, totalTime) - try { - outputBatches += 1 - outputRows += batch.numRows() - batch - } finally { - nvtxRange.close() - } - } -} - case class GpuUnionExec(children: Seq[SparkPlan]) extends SparkPlan with GpuExec { // updating nullability to make all the children consistent override def output: Seq[Attribute] = { @@ -338,12 +321,13 @@ case class GpuUnionExec(children: Seq[SparkPlan]) extends SparkPlan with GpuExec val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES) val totalTime = longMetric(TOTAL_TIME) - sparkContext.union(children.map(child => { - val rdd = child.executeColumnar() - rdd.map { batch => - unionMetrics(batch, numOutputBatches, numOutputRows, totalTime) + sparkContext.union(children.map(_.executeColumnar())).map { batch => + withResource(new NvtxWithMetrics("Union", NvtxColor.CYAN, totalTime)) { _ => + numOutputBatches += 1 + numOutputRows += batch.numRows + batch } - })) + } } } From 642b85cba6e37ee7163499fb2ab51bfc72212e01 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 11 Aug 2020 18:22:58 -0700 Subject: [PATCH 5/5] add line Signed-off-by: Niranjan Artal --- .../scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index ad38adae908..ae4c9f46614 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -297,6 +297,7 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range throw new IllegalStateException(s"Row-based execution should not occur for $this") } + case class GpuUnionExec(children: Seq[SparkPlan]) extends SparkPlan with GpuExec { // updating nullability to make all the children consistent override def output: Seq[Attribute] = {