From a63b0e2a38073a6a53cbe7b0c9646e2826fcb451 Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Mon, 11 Mar 2024 17:23:53 +0800 Subject: [PATCH] Add log for GPU semaphore (#32) add log from semaphore Signed-off-by: Firestarman --- .../main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index 503909eccd5..3cb348026ee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -162,7 +162,7 @@ object GpuSemaphore { * this is considered to be okay as there are other mechanisms in place, and it should be rather * rare. */ -private final class SemaphoreTaskInfo() extends Logging { +private final class SemaphoreTaskInfo(taskAttemptId: Long) extends Logging { /** * This holds threads that are not on the GPU yet. Most of the time they are * blocked waiting for the semaphore to let them on, but it may hold one @@ -251,6 +251,7 @@ private final class SemaphoreTaskInfo() extends Logging { // We cannot be in a synchronized block and wait on the semaphore // so we have to release it and grab it again afterwards. semaphore.acquire(numPermits) + logInfo(s"==> Task $taskAttemptId acquiring GPU with $numPermits permits") synchronized { // We now own the semaphore so we need to wake up all of the other tasks that are // waiting. @@ -304,6 +305,7 @@ private final class SemaphoreTaskInfo() extends Logging { activeThreads.remove(t) if (hasSemaphore) { semaphore.release(numPermits) + logInfo(s"==> Task $taskAttemptId releasing GPU with $numPermits permits") hasSemaphore = false } // It should be impossible for the current thread to be blocked when releasing the semaphore @@ -327,7 +329,7 @@ private final class GpuSemaphore() extends Logging { val taskAttemptId = context.taskAttemptId() val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => { onTaskCompletion(context, completeTask) - new SemaphoreTaskInfo() + new SemaphoreTaskInfo(taskAttemptId) }) if (taskInfo.tryAcquire(semaphore)) { GpuDeviceManager.initializeFromTask() @@ -351,7 +353,7 @@ private final class GpuSemaphore() extends Logging { val taskAttemptId = context.taskAttemptId() val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => { onTaskCompletion(context, completeTask) - new SemaphoreTaskInfo() + new SemaphoreTaskInfo(taskAttemptId) }) taskInfo.blockUntilReady(semaphore) GpuDeviceManager.initializeFromTask()