Skip to content

Commit

Permalink
Revert "Fix flaky test StreamingDataflowWorkerTest (#28173)"
Browse files Browse the repository at this point in the history
This reverts commit 505f942.
  • Loading branch information
Abacn authored Sep 13, 2023
1 parent 6ac4e82 commit 2a25eb4
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.dataflow.worker.util;

import java.time.Clock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -49,24 +48,6 @@ public BoundedQueueExecutor(
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory) {
this(
maximumPoolSize,
keepAliveTime,
unit,
maximumElementsOutstanding,
maximumBytesOutstanding,
threadFactory,
Clock.systemUTC());
}

public BoundedQueueExecutor(
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory,
Clock clock) {
executor =
new ThreadPoolExecutor(
maximumPoolSize,
Expand All @@ -80,7 +61,7 @@ protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
synchronized (this) {
if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
startTimeMaxActiveThreadsUsed = clock.millis();
startTimeMaxActiveThreadsUsed = System.currentTimeMillis();
}
}
}
Expand All @@ -90,7 +71,8 @@ protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
synchronized (this) {
if (activeCount.getAndDecrement() == maximumPoolSize) {
totalTimeMaxActiveThreadsUsed += (clock.millis() - startTimeMaxActiveThreadsUsed);
totalTimeMaxActiveThreadsUsed +=
(System.currentTimeMillis() - startTimeMaxActiveThreadsUsed);
startTimeMaxActiveThreadsUsed = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.InstructionInput;
Expand All @@ -52,7 +56,6 @@
import com.google.api.services.dataflow.model.WriteInstruction;
import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -167,6 +170,7 @@
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
Expand Down Expand Up @@ -2851,24 +2855,10 @@ public void testActiveWorkForShardedKeys() throws Exception {
}

@Test
@Ignore // Test is flaky on Jenkins (#27555)
public void testMaxThreadMetric() throws Exception {
int maxThreads = 2;
int threadExpiration = 60;

Clock mockClock = Mockito.mock(Clock.class);
CountDownLatch latch = new CountDownLatch(2);
doAnswer(
invocation -> {
latch.countDown();
// Return 0 until we are called once (reach max thread count).
if (latch.getCount() == 1) {
return 0L;
}
return 1000L;
})
.when(mockClock)
.millis();

// setting up actual implementation of executor instead of mocking to keep track of
// active thread count.
BoundedQueueExecutor executor =
Expand All @@ -2881,8 +2871,7 @@ public void testMaxThreadMetric() throws Exception {
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
.build(),
mockClock);
.build());

StreamingDataflowWorker.ComputationState computationState =
new StreamingDataflowWorker.ComputationState(
Expand All @@ -2894,17 +2883,15 @@ public void testMaxThreadMetric() throws Exception {

ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1);

// overriding definition of MockWork to add sleep, which will help us keep track of how
// long each work item takes to process and therefore let us manipulate how long the time
// at which we're at max threads is.
MockWork m2 =
new MockWork(2) {
@Override
public void run() {
try {
// Make sure we don't finish before both MockWork are executed, thus afterExecute must
// be called after
// beforeExecute.
while (latch.getCount() > 1) {
Thread.sleep(50);
}
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand All @@ -2916,9 +2903,7 @@ public void run() {
@Override
public void run() {
try {
while (latch.getCount() > 1) {
Thread.sleep(50);
}
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand All @@ -2928,11 +2913,13 @@ public void run() {
assertTrue(computationState.activateWork(key1Shard1, m2));
assertTrue(computationState.activateWork(key1Shard1, m3));
executor.execute(m2, m2.getWorkItem().getSerializedSize());

executor.execute(m3, m3.getWorkItem().getSerializedSize());
// Wait until the afterExecute is called.
latch.await();

assertEquals(1000L, executor.allThreadsActiveTime());
// Will get close to 1000ms that both work items are processing (sleeping, really)
// give or take a few ms.
long i = 990L;
assertTrue(executor.allThreadsActiveTime() >= i);
executor.shutdown();
}

Expand Down

0 comments on commit 2a25eb4

Please sign in to comment.