Skip to content

Commit

Permalink
fix duplicate counted metrics like op time for GpuCoalesceBatches (#1…
Browse files Browse the repository at this point in the history
…1062)

* with call site print, not good because some test cases by design will dup

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* done

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* add file

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix comiple

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* address review comments

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

---------

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
binmahone authored Jun 25, 2024
1 parent 86a905a commit 7a8690f
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ abstract class AbstractGpuCoalesceIterator(
// If we have reached the cuDF limit once, proactively filter batches
// after that first limit is reached.
GpuFilter.filterAndClose(cbFromIter, inputFilterTier.get,
NoopMetric, NoopMetric, opTime)
NoopMetric, NoopMetric, NoopMetric)
} else {
Iterator(cbFromIter)
}
Expand Down Expand Up @@ -499,7 +499,7 @@ abstract class AbstractGpuCoalesceIterator(
var filteredBytes = 0L
if (hasAnyToConcat) {
val filteredDowIter = GpuFilter.filterAndClose(concatAllAndPutOnGPU(),
filterTier, NoopMetric, NoopMetric, opTime)
filterTier, NoopMetric, NoopMetric, NoopMetric)
while (filteredDowIter.hasNext) {
closeOnExcept(filteredDowIter.next()) { filteredDownCb =>
filteredNumRows += filteredDownCb.numRows()
Expand All @@ -512,7 +512,7 @@ abstract class AbstractGpuCoalesceIterator(
// filterAndClose takes ownership of CB so we should not close it on a failure
// anymore...
val filteredCbIter = GpuFilter.filterAndClose(cb.release, filterTier,
NoopMetric, NoopMetric, opTime)
NoopMetric, NoopMetric, NoopMetric)
while (filteredCbIter.hasNext) {
closeOnExcept(filteredCbIter.next()) { filteredCb =>
val filteredWouldBeRows = filteredNumRows + filteredCb.numRows()
Expand Down
30 changes: 26 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,34 @@ sealed abstract class GpuMetric extends Serializable {
def +=(v: Long): Unit
def add(v: Long): Unit

private var isTimerActive = false

final def tryActivateTimer(): Boolean = {
if (!isTimerActive) {
isTimerActive = true
true
} else {
false
}
}

final def deactivateTimer(duration: Long): Unit = {
if (isTimerActive) {
isTimerActive = false
add(duration)
}
}

final def ns[T](f: => T): T = {
val start = System.nanoTime()
try {
if (tryActivateTimer()) {
val start = System.nanoTime()
try {
f
} finally {
deactivateTimer(System.nanoTime() - start)
}
} else {
f
} finally {
add(System.nanoTime() - start)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,26 +32,32 @@ object NvtxWithMetrics {
* by the amount of time spent in the range
*/
class NvtxWithMetrics(name: String, color: NvtxColor, val metrics: GpuMetric*)
extends NvtxRange(name, color) {
extends NvtxRange(name, color) {

val needTracks = metrics.map(_.tryActivateTimer())
private val start = System.nanoTime()

override def close(): Unit = {
val time = System.nanoTime() - start
metrics.foreach { metric =>
metric += time
metrics.toSeq.zip(needTracks).foreach { pair =>
if (pair._2) {
pair._1.deactivateTimer(time)
}
}
super.close()
}
}

class MetricRange(val metrics: GpuMetric*) extends AutoCloseable {
val needTracks = metrics.map(_.tryActivateTimer())
private val start = System.nanoTime()

override def close(): Unit = {
val time = System.nanoTime() - start
metrics.foreach { metric =>
metric += time
metrics.toSeq.zip(needTracks).foreach { pair =>
if (pair._2) {
pair._1.deactivateTimer(time)
}
}
}
}
68 changes: 68 additions & 0 deletions tests/src/test/scala/com/nvidia/spark/rapids/MetricsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import org.scalatest.funsuite.AnyFunSuite

class MetricsSuite extends AnyFunSuite {

test("GpuMetric.ns: duplicate timing on the same metrics") {
val m1 = new LocalGpuMetric()
m1.ns(
m1.ns(
Thread.sleep(100)
)
)
// if the timing is duplicated, the value should be around 200,000,000
assert(m1.value < 100000000 * 1.5)
assert(m1.value > 100000000 * 0.5)
}

test("MetricRange: duplicate timing on the same metrics") {
val m1 = new LocalGpuMetric()
val m2 = new LocalGpuMetric()
withResource(new MetricRange(m1, m2)) { _ =>
withResource(new MetricRange(m2, m1)) { _ =>
Thread.sleep(100)
}
}

// if the timing is duplicated, the value should be around 200,000,000
assert(m1.value < 100000000 * 1.5)
assert(m1.value > 100000000 * 0.5)
assert(m2.value < 100000000 * 1.5)
assert(m2.value > 100000000 * 0.5)
}

test("NvtxWithMetrics: duplicate timing on the same metrics") {
val m1 = new LocalGpuMetric()
val m2 = new LocalGpuMetric()
withResource(new NvtxWithMetrics("a", NvtxColor.BLUE, m1, m2)) { _ =>
withResource(new NvtxWithMetrics("b", NvtxColor.BLUE, m2, m1)) { _ =>
Thread.sleep(100)
}
}

// if the timing is duplicated, the value should be around 200,000,000
assert(m1.value < 100000000 * 1.5)
assert(m1.value > 100000000 * 0.5)
assert(m2.value < 100000000 * 1.5)
assert(m2.value > 100000000 * 0.5)
}
}

0 comments on commit 7a8690f

Please sign in to comment.