From d014a98936eb95316050474c0714277ccc3fa232 Mon Sep 17 00:00:00 2001 From: Edward Cheng Date: Mon, 22 Jan 2024 14:18:34 -0500 Subject: [PATCH] Time at max threads fix (#30041) * change counterset * add logs * calculate delta for time at max threads * debugging which worker harness * use atomic long instead of counter to keep track of previous time * fix potential race condition --- .../dataflow/worker/StreamingDataflowWorker.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index c9a00ade6d8c..f2d7c02729c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -228,6 +228,7 @@ public class StreamingDataflowWorker { private final Thread dispatchThread; private final Thread commitThread; private final AtomicLong activeCommitBytes = new AtomicLong(); + private final AtomicLong previousTimeAtMaxThreads = new AtomicLong(); private final AtomicBoolean running = new AtomicBoolean(); private final SideInputStateFetcher sideInputStateFetcher; private final StreamingDataflowWorkerOptions options; @@ -245,10 +246,10 @@ public class StreamingDataflowWorker { private final Counter windmillStateBytesRead; private final Counter windmillStateBytesWritten; private final Counter windmillQuotaThrottling; + private final Counter timeAtMaxActiveThreads; // Built-in cumulative counters. private final Counter javaHarnessUsedMemory; private final Counter javaHarnessMaxMemory; - private final Counter timeAtMaxActiveThreads; private final Counter activeThreads; private final Counter totalAllocatedThreads; private final Counter outstandingBytes; @@ -331,15 +332,15 @@ public class StreamingDataflowWorker { this.windmillQuotaThrottling = pendingDeltaCounters.longSum( StreamingSystemCounterNames.WINDMILL_QUOTA_THROTTLING.counterName()); + this.timeAtMaxActiveThreads = + pendingDeltaCounters.longSum( + StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName()); this.javaHarnessUsedMemory = pendingCumulativeCounters.longSum( StreamingSystemCounterNames.JAVA_HARNESS_USED_MEMORY.counterName()); this.javaHarnessMaxMemory = pendingCumulativeCounters.longSum( StreamingSystemCounterNames.JAVA_HARNESS_MAX_MEMORY.counterName()); - this.timeAtMaxActiveThreads = - pendingCumulativeCounters.longSum( - StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName()); this.activeThreads = pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName()); this.outstandingBytes = @@ -1739,7 +1740,9 @@ private void updateVMMetrics() { private void updateThreadMetrics() { timeAtMaxActiveThreads.getAndReset(); - timeAtMaxActiveThreads.addValue(workUnitExecutor.allThreadsActiveTime()); + long allThreadsActiveTime = workUnitExecutor.allThreadsActiveTime(); + timeAtMaxActiveThreads.addValue(allThreadsActiveTime - previousTimeAtMaxThreads.get()); + previousTimeAtMaxThreads.set(allThreadsActiveTime); activeThreads.getAndReset(); activeThreads.addValue(workUnitExecutor.activeCount()); totalAllocatedThreads.getAndReset();