From 0d2fadf3409146c4a4fdc7dee6613a6a90eef563 Mon Sep 17 00:00:00 2001 From: Mike Eltsufin Date: Fri, 2 Dec 2022 15:13:33 -0500 Subject: [PATCH 1/2] fix: Watchdog controls lifecycle of the future, not executor (#1890) --- .../java/com/google/api/gax/rpc/Watchdog.java | 16 +++++++++++++--- .../com/google/api/gax/rpc/WatchdogProvider.java | 1 + 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index 83d729933..8ca97876c 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -36,9 +36,11 @@ import java.util.Map.Entry; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nonnull; @@ -61,6 +63,7 @@ * */ public final class Watchdog implements Runnable, BackgroundResource { + private static final Logger LOG = Logger.getLogger(Watchdog.class.getName()); // Dummy value to convert the ConcurrentHashMap into a Set @@ -138,12 +141,12 @@ public void shutdown() { @Override public boolean isShutdown() { - return executor.isShutdown(); + return future.isCancelled(); } @Override public boolean isTerminated() { - return executor.isTerminated(); + return future.isDone(); } @Override @@ -153,7 +156,14 @@ public void shutdownNow() { @Override public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { - return executor.awaitTermination(duration, unit); + try { + future.get(duration, unit); + return true; + } catch (ExecutionException | CancellationException e) { + return true; + } catch (TimeoutException e) { + return false; + } } @Override diff --git a/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java index e01ed3c20..db3fb20bb 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java @@ -49,5 +49,6 @@ public interface WatchdogProvider { Watchdog getWatchdog(); + /** Return true if the watchdog should be automatically unscheduled. */ boolean shouldAutoClose(); } From e6cc7eebeb3d75631b9fcce162b0c59bcbc829e6 Mon Sep 17 00:00:00 2001 From: Blake Li Date: Fri, 2 Dec 2022 21:58:07 +0000 Subject: [PATCH 2/2] test: Add unit tests for Watchdog (#1919) --- .../com/google/api/gax/rpc/WatchdogTest.java | 81 +++++++++++++++++-- 1 file changed, 73 insertions(+), 8 deletions(-) diff --git a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index 0b161018f..e20218452 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -44,6 +44,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -195,14 +196,7 @@ public void testMultiple() throws Exception { @SuppressWarnings("unchecked") public void testWatchdogBeingClosed() { ScheduledFuture future = Mockito.mock(ScheduledFuture.class); - ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); - Mockito.when( - mockExecutor.scheduleAtFixedRate( - Mockito.any(Watchdog.class), - Mockito.anyLong(), - Mockito.anyLong(), - Mockito.any(TimeUnit.class))) - .thenReturn(future); + ScheduledExecutorService mockExecutor = getMockExecutorService(future); Watchdog underTest = Watchdog.create(clock, checkInterval, mockExecutor); assertThat(underTest).isInstanceOf(BackgroundResource.class); @@ -219,6 +213,77 @@ public void testWatchdogBeingClosed() { Mockito.verifyNoMoreInteractions(mockExecutor); } + @Test + public void awaitTermination_shouldReturnTrueIfFutureIsDone() throws Exception { + int duration = 1000; + TimeUnit timeUnit = TimeUnit.MILLISECONDS; + ScheduledFuture future = Mockito.mock(ScheduledFuture.class); + ScheduledExecutorService mockExecutor = getMockExecutorService(future); + Watchdog watchdog = Watchdog.create(clock, checkInterval, mockExecutor); + watchdog.shutdown(); + + boolean actual = watchdog.awaitTermination(duration, timeUnit); + + assertThat(actual).isTrue(); + } + + @Test + public void awaitTermination_shouldReturnFalseIfGettingFutureTimedOut() throws Exception { + int duration = 1000; + TimeUnit timeUnit = TimeUnit.MILLISECONDS; + ScheduledFuture future = Mockito.mock(ScheduledFuture.class); + Mockito.doThrow(new TimeoutException()).when(future).get(duration, timeUnit); + ScheduledExecutorService mockExecutor = getMockExecutorService(future); + Watchdog watchdog = Watchdog.create(clock, checkInterval, mockExecutor); + + boolean actual = watchdog.awaitTermination(duration, timeUnit); + + assertThat(actual).isFalse(); + } + + @Test + public void awaitTermination_shouldReturnTrueIfFutureIsAlreadyCancelled() throws Exception { + int duration = 1000; + TimeUnit timeUnit = TimeUnit.MILLISECONDS; + ScheduledFuture future = Mockito.mock(ScheduledFuture.class); + Mockito.doThrow(new CancellationException()).when(future).get(duration, timeUnit); + ScheduledExecutorService mockExecutor = getMockExecutorService(future); + Watchdog watchdog = Watchdog.create(clock, checkInterval, mockExecutor); + + boolean actual = watchdog.awaitTermination(duration, timeUnit); + + assertThat(actual).isTrue(); + } + + @Test + public void awaitTermination_shouldReturnFalseIfGettingFutureThrowsExecutionException() + throws Exception { + int duration = 1000; + TimeUnit timeUnit = TimeUnit.MILLISECONDS; + ScheduledFuture future = Mockito.mock(ScheduledFuture.class); + Mockito.doThrow(new ExecutionException(new RuntimeException())) + .when(future) + .get(duration, timeUnit); + ScheduledExecutorService mockExecutor = getMockExecutorService(future); + Watchdog watchdog = Watchdog.create(clock, checkInterval, mockExecutor); + + boolean actual = watchdog.awaitTermination(duration, timeUnit); + + assertThat(actual).isTrue(); + } + + private ScheduledExecutorService getMockExecutorService(ScheduledFuture future) { + ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); + Mockito.when( + mockExecutor.scheduleAtFixedRate( + Mockito.any(Watchdog.class), + Mockito.anyLong(), + Mockito.anyLong(), + Mockito.any(TimeUnit.class))) + .thenReturn(future); + return mockExecutor; + } + static class AccumulatingObserver implements ResponseObserver { SettableApiFuture controller = SettableApiFuture.create(); Queue responses = Queues.newLinkedBlockingDeque();