diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index c9a00ade6d8c..f2d7c02729c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -228,6 +228,7 @@ public class StreamingDataflowWorker { private final Thread dispatchThread; private final Thread commitThread; private final AtomicLong activeCommitBytes = new AtomicLong(); + private final AtomicLong previousTimeAtMaxThreads = new AtomicLong(); private final AtomicBoolean running = new AtomicBoolean(); private final SideInputStateFetcher sideInputStateFetcher; private final StreamingDataflowWorkerOptions options; @@ -245,10 +246,10 @@ public class StreamingDataflowWorker { private final Counter windmillStateBytesRead; private final Counter windmillStateBytesWritten; private final Counter windmillQuotaThrottling; + private final Counter timeAtMaxActiveThreads; // Built-in cumulative counters. private final Counter javaHarnessUsedMemory; private final Counter javaHarnessMaxMemory; - private final Counter timeAtMaxActiveThreads; private final Counter activeThreads; private final Counter totalAllocatedThreads; private final Counter outstandingBytes; @@ -331,15 +332,15 @@ public class StreamingDataflowWorker { this.windmillQuotaThrottling = pendingDeltaCounters.longSum( StreamingSystemCounterNames.WINDMILL_QUOTA_THROTTLING.counterName()); + this.timeAtMaxActiveThreads = + pendingDeltaCounters.longSum( + StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName()); this.javaHarnessUsedMemory = pendingCumulativeCounters.longSum( StreamingSystemCounterNames.JAVA_HARNESS_USED_MEMORY.counterName()); this.javaHarnessMaxMemory = pendingCumulativeCounters.longSum( StreamingSystemCounterNames.JAVA_HARNESS_MAX_MEMORY.counterName()); - this.timeAtMaxActiveThreads = - pendingCumulativeCounters.longSum( - StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName()); this.activeThreads = pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName()); this.outstandingBytes = @@ -1739,7 +1740,9 @@ private void updateVMMetrics() { private void updateThreadMetrics() { timeAtMaxActiveThreads.getAndReset(); - timeAtMaxActiveThreads.addValue(workUnitExecutor.allThreadsActiveTime()); + long allThreadsActiveTime = workUnitExecutor.allThreadsActiveTime(); + timeAtMaxActiveThreads.addValue(allThreadsActiveTime - previousTimeAtMaxThreads.get()); + previousTimeAtMaxThreads.set(allThreadsActiveTime); activeThreads.getAndReset(); activeThreads.addValue(workUnitExecutor.activeCount()); totalAllocatedThreads.getAndReset();