From 2a25eb457010dfd30018b5e63e9a47ef50defd0f Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 13 Sep 2023 11:32:54 -0400 Subject: [PATCH] Revert "Fix flaky test StreamingDataflowWorkerTest (#28173)" This reverts commit 505f94213874471ae4ec5fa810c73a11dc5be1a8. --- .../worker/util/BoundedQueueExecutor.java | 24 ++------- .../worker/StreamingDataflowWorkerTest.java | 49 +++++++------------ 2 files changed, 21 insertions(+), 52 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 1905cf3ac479..05b752f91c0c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker.util; -import java.time.Clock; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -49,24 +48,6 @@ public BoundedQueueExecutor( int maximumElementsOutstanding, long maximumBytesOutstanding, ThreadFactory threadFactory) { - this( - maximumPoolSize, - keepAliveTime, - unit, - maximumElementsOutstanding, - maximumBytesOutstanding, - threadFactory, - Clock.systemUTC()); - } - - public BoundedQueueExecutor( - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - int maximumElementsOutstanding, - long maximumBytesOutstanding, - ThreadFactory threadFactory, - Clock clock) { executor = new ThreadPoolExecutor( maximumPoolSize, @@ -80,7 +61,7 @@ protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); synchronized (this) { if (activeCount.getAndIncrement() >= maximumPoolSize - 1) { - startTimeMaxActiveThreadsUsed = clock.millis(); + startTimeMaxActiveThreadsUsed = System.currentTimeMillis(); } } } @@ -90,7 +71,8 @@ protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); synchronized (this) { if (activeCount.getAndDecrement() == maximumPoolSize) { - totalTimeMaxActiveThreadsUsed += (clock.millis() - startTimeMaxActiveThreadsUsed); + totalTimeMaxActiveThreadsUsed += + (System.currentTimeMillis() - startTimeMaxActiveThreadsUsed); startTimeMaxActiveThreadsUsed = 0; } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 8dc7f6217cdc..95b3a43ebf49 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -33,7 +33,11 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.InstructionInput; @@ -52,7 +56,6 @@ import com.google.api.services.dataflow.model.WriteInstruction; import java.io.IOException; import java.io.InputStream; -import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -167,6 +170,7 @@ import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ErrorCollector; @@ -2851,24 +2855,10 @@ public void testActiveWorkForShardedKeys() throws Exception { } @Test + @Ignore // Test is flaky on Jenkins (#27555) public void testMaxThreadMetric() throws Exception { int maxThreads = 2; int threadExpiration = 60; - - Clock mockClock = Mockito.mock(Clock.class); - CountDownLatch latch = new CountDownLatch(2); - doAnswer( - invocation -> { - latch.countDown(); - // Return 0 until we are called once (reach max thread count). - if (latch.getCount() == 1) { - return 0L; - } - return 1000L; - }) - .when(mockClock) - .millis(); - // setting up actual implementation of executor instead of mocking to keep track of // active thread count. BoundedQueueExecutor executor = @@ -2881,8 +2871,7 @@ public void testMaxThreadMetric() throws Exception { new ThreadFactoryBuilder() .setNameFormat("DataflowWorkUnits-%d") .setDaemon(true) - .build(), - mockClock); + .build()); StreamingDataflowWorker.ComputationState computationState = new StreamingDataflowWorker.ComputationState( @@ -2894,17 +2883,15 @@ public void testMaxThreadMetric() throws Exception { ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); + // overriding definition of MockWork to add sleep, which will help us keep track of how + // long each work item takes to process and therefore let us manipulate how long the time + // at which we're at max threads is. MockWork m2 = new MockWork(2) { @Override public void run() { try { - // Make sure we don't finish before both MockWork are executed, thus afterExecute must - // be called after - // beforeExecute. - while (latch.getCount() > 1) { - Thread.sleep(50); - } + Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -2916,9 +2903,7 @@ public void run() { @Override public void run() { try { - while (latch.getCount() > 1) { - Thread.sleep(50); - } + Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -2928,11 +2913,13 @@ public void run() { assertTrue(computationState.activateWork(key1Shard1, m2)); assertTrue(computationState.activateWork(key1Shard1, m3)); executor.execute(m2, m2.getWorkItem().getSerializedSize()); + executor.execute(m3, m3.getWorkItem().getSerializedSize()); - // Wait until the afterExecute is called. - latch.await(); - assertEquals(1000L, executor.allThreadsActiveTime()); + // Will get close to 1000ms that both work items are processing (sleeping, really) + // give or take a few ms. + long i = 990L; + assertTrue(executor.allThreadsActiveTime() >= i); executor.shutdown(); }