Skip to content

Commit

Permalink
Time at max threads fix (#30041)
Browse files Browse the repository at this point in the history
* change counterset

* add logs

* calculate delta for time at max threads

* debugging which worker harness

* use atomic long instead of counter to keep track of previous time

* fix potential race condition
  • Loading branch information
edman124 authored Jan 22, 2024
1 parent 5eeffc7 commit d014a98
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public class StreamingDataflowWorker {
private final Thread dispatchThread;
private final Thread commitThread;
private final AtomicLong activeCommitBytes = new AtomicLong();
private final AtomicLong previousTimeAtMaxThreads = new AtomicLong();
private final AtomicBoolean running = new AtomicBoolean();
private final SideInputStateFetcher sideInputStateFetcher;
private final StreamingDataflowWorkerOptions options;
Expand All @@ -245,10 +246,10 @@ public class StreamingDataflowWorker {
private final Counter<Long, Long> windmillStateBytesRead;
private final Counter<Long, Long> windmillStateBytesWritten;
private final Counter<Long, Long> windmillQuotaThrottling;
private final Counter<Long, Long> timeAtMaxActiveThreads;
// Built-in cumulative counters.
private final Counter<Long, Long> javaHarnessUsedMemory;
private final Counter<Long, Long> javaHarnessMaxMemory;
private final Counter<Long, Long> timeAtMaxActiveThreads;
private final Counter<Integer, Integer> activeThreads;
private final Counter<Integer, Integer> totalAllocatedThreads;
private final Counter<Long, Long> outstandingBytes;
Expand Down Expand Up @@ -331,15 +332,15 @@ public class StreamingDataflowWorker {
this.windmillQuotaThrottling =
pendingDeltaCounters.longSum(
StreamingSystemCounterNames.WINDMILL_QUOTA_THROTTLING.counterName());
this.timeAtMaxActiveThreads =
pendingDeltaCounters.longSum(
StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName());
this.javaHarnessUsedMemory =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.JAVA_HARNESS_USED_MEMORY.counterName());
this.javaHarnessMaxMemory =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.JAVA_HARNESS_MAX_MEMORY.counterName());
this.timeAtMaxActiveThreads =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName());
this.activeThreads =
pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName());
this.outstandingBytes =
Expand Down Expand Up @@ -1739,7 +1740,9 @@ private void updateVMMetrics() {

private void updateThreadMetrics() {
timeAtMaxActiveThreads.getAndReset();
timeAtMaxActiveThreads.addValue(workUnitExecutor.allThreadsActiveTime());
long allThreadsActiveTime = workUnitExecutor.allThreadsActiveTime();
timeAtMaxActiveThreads.addValue(allThreadsActiveTime - previousTimeAtMaxThreads.get());
previousTimeAtMaxThreads.set(allThreadsActiveTime);
activeThreads.getAndReset();
activeThreads.addValue(workUnitExecutor.activeCount());
totalAllocatedThreads.getAndReset();
Expand Down

0 comments on commit d014a98

Please sign in to comment.