Skip to content

Commit

Permalink
feat: implemented watchdog as BackgroundResource
Browse files Browse the repository at this point in the history
This change updates watchdog to BackgroundResource, Also, watchdog unschedules itself explicitly when shutdown.

closes googleapis#828
  • Loading branch information
rahulKQL committed Jan 3, 2020
1 parent bbefe87 commit 50bf647
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 28 deletions.
20 changes: 12 additions & 8 deletions gax/src/main/java/com/google/api/gax/rpc/ClientContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ public static ClientContext create(StubSettings settings) throws IOException {
defaultCallContext = defaultCallContext.withCredentials(credentials);
}

ImmutableList.Builder<BackgroundResource> backgroundResources = ImmutableList.builder();

if (transportChannelProvider.shouldAutoClose()) {
backgroundResources.add(transportChannel);
}
if (executorProvider.shouldAutoClose()) {
backgroundResources.add(new ExecutorAsBackgroundResource(executor));
}

WatchdogProvider watchdogProvider = settings.getStreamWatchdogProvider();
@Nullable Watchdog watchdog = null;

Expand All @@ -175,15 +184,10 @@ public static ClientContext create(StubSettings settings) throws IOException {
watchdogProvider = watchdogProvider.withExecutor(executor);
}
watchdog = watchdogProvider.getWatchdog();
}

ImmutableList.Builder<BackgroundResource> backgroundResources = ImmutableList.builder();

if (transportChannelProvider.shouldAutoClose()) {
backgroundResources.add(transportChannel);
}
if (executorProvider.shouldAutoClose()) {
backgroundResources.add(new ExecutorAsBackgroundResource(executor));
if (watchdogProvider.shouldAutoClose()) {
backgroundResources.add(watchdog);
}
}

return newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ public WatchdogProvider withExecutor(ScheduledExecutorService executor) {
public Watchdog getWatchdog() {
return watchdog;
}

@Override
public boolean shouldAutoClose() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.google.api.core.BetaApi;
import com.google.common.base.Preconditions;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -103,10 +102,11 @@ public Watchdog getWatchdog() {
return null;
}

Watchdog watchdog = new Watchdog(clock);
executor.scheduleAtFixedRate(
watchdog, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS);
return new Watchdog(clock, checkInterval, executor);
}

return watchdog;
@Override
public boolean shouldAutoClose() {
return true;
}
}
46 changes: 44 additions & 2 deletions gax/src/main/java/com/google/api/gax/rpc/Watchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@

import com.google.api.core.ApiClock;
import com.google.api.core.InternalApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;
Expand All @@ -56,15 +60,21 @@
* </ul>
*/
@InternalApi
public class Watchdog implements Runnable {
public class Watchdog implements Runnable, BackgroundResource {
// Dummy value to convert the ConcurrentHashMap into a Set
private static Object PRESENT = new Object();
private final ConcurrentHashMap<WatchdogStream, Object> openStreams = new ConcurrentHashMap<>();

private final ApiClock clock;
private final ScheduledExecutorService executor;
private final ScheduledFuture<?> future;

public Watchdog(ApiClock clock) {
public Watchdog(ApiClock clock, Duration interval, ScheduledExecutorService executor) {
this.clock = Preconditions.checkNotNull(clock, "clock can't be null");
this.executor = executor;
this.future =
executor.scheduleAtFixedRate(
this, interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
}

/** Wraps the target observer with timing constraints. */
Expand Down Expand Up @@ -98,6 +108,38 @@ public void run() {
}
}

@Override
public void shutdown() {
future.cancel(true);
executor.shutdown();
}

@Override
public boolean isShutdown() {
return executor.isShutdown();
}

@Override
public boolean isTerminated() {
return executor.isTerminated();
}

@Override
public void shutdownNow() {
future.cancel(true);
executor.shutdownNow();
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(duration, unit);
}

@Override
public void close() throws Exception {
shutdown();
}

enum State {
/** Stream has been started, but doesn't have any outstanding requests. */
IDLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ public interface WatchdogProvider {
WatchdogProvider withExecutor(ScheduledExecutorService executor);

Watchdog getWatchdog();

boolean shouldAutoClose();
}
37 changes: 37 additions & 0 deletions gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,41 @@ private void runTest(
Truth.assertThat(executor.shutdownCalled).isEqualTo(shouldAutoClose);
}
}

@Test
public void testWatchdogProvider() throws IOException {
FakeClientSettings.Builder builder = new FakeClientSettings.Builder();

InterceptingExecutor executor = new InterceptingExecutor(1);
FakeTransportChannel transportChannel = FakeTransportChannel.create(new FakeChannel());
FakeTransportProvider transportProvider =
new FakeTransportProvider(transportChannel, executor, true, null, null);
ApiClock clock = Mockito.mock(ApiClock.class);

builder.setClock(clock);
builder.setCredentialsProvider(
FixedCredentialsProvider.create(Mockito.mock(Credentials.class)));
builder.setExecutorProvider(new FakeExecutorProvider(executor, true));
builder.setTransportChannelProvider(transportProvider);

Duration watchdogCheckInterval = Duration.ofSeconds(11);
builder.setWatchdogProvider(
InstantiatingWatchdogProvider.create()
.withClock(clock)
.withCheckInterval(watchdogCheckInterval)
.withExecutor(executor));
builder.setWatchdogCheckInterval(watchdogCheckInterval);

HeaderProvider headerProvider = Mockito.mock(HeaderProvider.class);
Mockito.when(headerProvider.getHeaders()).thenReturn(ImmutableMap.of("k1", "v1"));
HeaderProvider internalHeaderProvider = Mockito.mock(HeaderProvider.class);

Mockito.when(internalHeaderProvider.getHeaders()).thenReturn(ImmutableMap.of("k2", "v2"));
builder.setHeaderProvider(headerProvider);
builder.setInternalHeaderProvider(internalHeaderProvider);

ClientContext context = ClientContext.create(builder.build());
List<BackgroundResource> resources = context.getBackgroundResources();
Truth.assertThat(resources.get(2)).isInstanceOf(Watchdog.class);
}
}
58 changes: 45 additions & 13 deletions gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,34 @@
*/
package com.google.api.gax.rpc;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.FakeApiClock;
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCall;
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCallable;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class WatchdogTest {
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(1);

private FakeApiClock clock;
private final Duration checkInterval = Duration.ofMillis(1000);
private Duration waitTime = Duration.ofSeconds(10);
private Duration idleTime = Duration.ofMinutes(5);

Expand All @@ -59,7 +68,7 @@ public class WatchdogTest {
@Before
public void setUp() {
clock = new FakeApiClock(0);
watchdog = new Watchdog(clock);
watchdog = new Watchdog(clock, checkInterval, EXECUTOR);

callable = new MockServerStreamingCallable<>();
innerObserver = new AccumulatingObserver<>();
Expand All @@ -70,7 +79,7 @@ public void setUp() {
@Test
public void testRequestPassthrough() throws Exception {
innerObserver.controller.get().request(1);
Truth.assertThat(call.getController().popLastPull()).isEqualTo(1);
assertThat(call.getController().popLastPull()).isEqualTo(1);
}

@Test
Expand All @@ -79,11 +88,11 @@ public void testWaitTimeout() throws Exception {

clock.incrementNanoTime(waitTime.toNanos() - 1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isFalse();
assertThat(call.getController().isCancelled()).isFalse();

clock.incrementNanoTime(1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isTrue();
assertThat(call.getController().isCancelled()).isTrue();
call.getController()
.getObserver()
.onError(new RuntimeException("Some upstream exception representing cancellation"));
Expand All @@ -94,18 +103,18 @@ public void testWaitTimeout() throws Exception {
} catch (ExecutionException t) {
actualError = t.getCause();
}
Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void testIdleTimeout() throws InterruptedException {
clock.incrementNanoTime(idleTime.toNanos() - 1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isFalse();
assertThat(call.getController().isCancelled()).isFalse();

clock.incrementNanoTime(1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isTrue();
assertThat(call.getController().isCancelled()).isTrue();
call.getController()
.getObserver()
.onError(new RuntimeException("Some upstream exception representing cancellation"));
Expand All @@ -116,7 +125,7 @@ public void testIdleTimeout() throws InterruptedException {
} catch (ExecutionException t) {
actualError = t.getCause();
}
Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
Expand All @@ -141,20 +150,43 @@ public void testMultiple() throws Exception {
watchdog.run();

// Call1 should be ok
Truth.assertThat(call1.getController().isCancelled()).isFalse();
assertThat(call1.getController().isCancelled()).isFalse();
// Should not throw
Truth.assertThat(downstreamObserver1.done.isDone()).isFalse();
assertThat(downstreamObserver1.done.isDone()).isFalse();

// Call2 should be timed out
Truth.assertThat(call2.getController().isCancelled()).isTrue();
assertThat(call2.getController().isCancelled()).isTrue();
call2.getController().getObserver().onError(new CancellationException("User cancelled"));
Throwable error = null;
try {
downstreamObserver2.done.get();
} catch (ExecutionException t) {
error = t.getCause();
}
Truth.assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void testWatchdogClosed() throws Exception {
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
Mockito.when(
mockExecutor.scheduleAtFixedRate(
Mockito.any(Watchdog.class),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.any(TimeUnit.class)))
.thenReturn(future);
Watchdog underTest = new Watchdog(clock, checkInterval, mockExecutor);
assertThat(underTest).isInstanceOf(BackgroundResource.class);

underTest.close();
underTest.shutdown();

Mockito.verify(mockExecutor)
.scheduleAtFixedRate(
underTest, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS);
Mockito.verify(mockExecutor, Mockito.times(2)).shutdown();
}

static class AccumulatingObserver<T> implements ResponseObserver<T> {
Expand Down

0 comments on commit 50bf647

Please sign in to comment.