Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: implemented watchdog as BackgroundResource #838

Merged
merged 4 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gax/src/main/java/com/google/api/gax/rpc/ClientContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ public static ClientContext create(StubSettings settings) throws IOException {
if (executorProvider.shouldAutoClose()) {
backgroundResources.add(new ExecutorAsBackgroundResource(executor));
}
if (watchdogProvider != null && watchdogProvider.shouldAutoClose()) {
backgroundResources.add(watchdog);
}

return newBuilder()
.setBackgroundResources(backgroundResources.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ public WatchdogProvider withExecutor(ScheduledExecutorService executor) {
public Watchdog getWatchdog() {
return watchdog;
}

@Override
public boolean shouldAutoClose() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.google.api.core.BetaApi;
import com.google.common.base.Preconditions;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -103,10 +102,11 @@ public Watchdog getWatchdog() {
return null;
}

Watchdog watchdog = new Watchdog(clock);
executor.scheduleAtFixedRate(
watchdog, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS);
return Watchdog.create(clock, checkInterval, executor);
}

return watchdog;
@Override
public boolean shouldAutoClose() {
return true;
}
}
59 changes: 57 additions & 2 deletions gax/src/main/java/com/google/api/gax/rpc/Watchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@

import com.google.api.core.ApiClock;
import com.google.api.core.InternalApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;
Expand All @@ -56,15 +60,34 @@
* </ul>
*/
@InternalApi
public class Watchdog implements Runnable {
public class Watchdog implements Runnable, BackgroundResource {
// Dummy value to convert the ConcurrentHashMap into a Set
private static Object PRESENT = new Object();
private final ConcurrentHashMap<WatchdogStream, Object> openStreams = new ConcurrentHashMap<>();

private final ApiClock clock;
private final Duration scheduleInterval;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> future;

/** returns a Watchdog which is scheduled at the provided interval. */
public static Watchdog create(
ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) {
Watchdog watchdog = new Watchdog(clock, scheduleInterval, executor);
watchdog.start();
return watchdog;
}

public Watchdog(ApiClock clock) {
private Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) {
this.clock = Preconditions.checkNotNull(clock, "clock can't be null");
this.scheduleInterval = scheduleInterval;
this.executor = executor;
}

private void start() {
future =
executor.scheduleAtFixedRate(
this, scheduleInterval.toMillis(), scheduleInterval.toMillis(), TimeUnit.MILLISECONDS);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}

/** Wraps the target observer with timing constraints. */
Expand Down Expand Up @@ -98,6 +121,38 @@ public void run() {
}
}

@Override
public void shutdown() {
future.cancel(false);
executor.shutdown();
}

@Override
public boolean isShutdown() {
return executor.isShutdown();
}

@Override
public boolean isTerminated() {
return executor.isTerminated();
}

@Override
public void shutdownNow() {
future.cancel(true);
executor.shutdownNow();
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(duration, unit);
}

@Override
public void close() {
shutdown();
}

enum State {
/** Stream has been started, but doesn't have any outstanding requests. */
IDLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ public interface WatchdogProvider {
WatchdogProvider withExecutor(ScheduledExecutorService executor);

Watchdog getWatchdog();

boolean shouldAutoClose();
}
39 changes: 39 additions & 0 deletions gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
package com.google.api.gax.rpc;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.ApiClock;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -287,4 +289,41 @@ private void runTest(
Truth.assertThat(executor.shutdownCalled).isEqualTo(shouldAutoClose);
}
}

@Test
public void testWatchdogProvider() throws IOException {
FakeClientSettings.Builder builder = new FakeClientSettings.Builder();

InterceptingExecutor executor = new InterceptingExecutor(1);
FakeTransportChannel transportChannel = FakeTransportChannel.create(new FakeChannel());
FakeTransportProvider transportProvider =
new FakeTransportProvider(transportChannel, executor, true, null, null);
ApiClock clock = Mockito.mock(ApiClock.class);

builder.setClock(clock);
builder.setCredentialsProvider(
FixedCredentialsProvider.create(Mockito.mock(Credentials.class)));
builder.setExecutorProvider(new FakeExecutorProvider(executor, true));
builder.setTransportChannelProvider(transportProvider);

Duration watchdogCheckInterval = Duration.ofSeconds(11);
builder.setWatchdogProvider(
InstantiatingWatchdogProvider.create()
.withClock(clock)
.withCheckInterval(watchdogCheckInterval)
.withExecutor(executor));
builder.setWatchdogCheckInterval(watchdogCheckInterval);

HeaderProvider headerProvider = Mockito.mock(HeaderProvider.class);
Mockito.when(headerProvider.getHeaders()).thenReturn(ImmutableMap.of("k1", "v1"));
HeaderProvider internalHeaderProvider = Mockito.mock(HeaderProvider.class);

Mockito.when(internalHeaderProvider.getHeaders()).thenReturn(ImmutableMap.of("k2", "v2"));
builder.setHeaderProvider(headerProvider);
builder.setInternalHeaderProvider(internalHeaderProvider);

ClientContext context = ClientContext.create(builder.build());
List<BackgroundResource> resources = context.getBackgroundResources();
assertThat(resources.get(2)).isInstanceOf(Watchdog.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void testNoModifications() {
assertThat(provider.needsCheckInterval()).isFalse();
assertThat(provider.needsClock()).isFalse();
assertThat(provider.needsExecutor()).isFalse();
assertThat(provider.shouldAutoClose()).isFalse();

Throwable actualError = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public void happyPath() {
assertThat(provider.needsCheckInterval()).isTrue();
provider = provider.withCheckInterval(checkInterval);

assertThat(provider.shouldAutoClose()).isTrue();

Watchdog watchdog = provider.getWatchdog();
Mockito.verify(executor)
.scheduleAtFixedRate(
Expand Down
64 changes: 51 additions & 13 deletions gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,34 @@
*/
package com.google.api.gax.rpc;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.FakeApiClock;
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCall;
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCallable;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class WatchdogTest {
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(1);

private FakeApiClock clock;
private final Duration checkInterval = Duration.ofMillis(1000);
private Duration waitTime = Duration.ofSeconds(10);
private Duration idleTime = Duration.ofMinutes(5);

Expand All @@ -59,7 +68,7 @@ public class WatchdogTest {
@Before
public void setUp() {
clock = new FakeApiClock(0);
watchdog = new Watchdog(clock);
watchdog = Watchdog.create(clock, checkInterval, EXECUTOR);

callable = new MockServerStreamingCallable<>();
innerObserver = new AccumulatingObserver<>();
Expand All @@ -70,7 +79,7 @@ public void setUp() {
@Test
public void testRequestPassthrough() throws Exception {
innerObserver.controller.get().request(1);
Truth.assertThat(call.getController().popLastPull()).isEqualTo(1);
assertThat(call.getController().popLastPull()).isEqualTo(1);
}

@Test
Expand All @@ -79,11 +88,11 @@ public void testWaitTimeout() throws Exception {

clock.incrementNanoTime(waitTime.toNanos() - 1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isFalse();
assertThat(call.getController().isCancelled()).isFalse();

clock.incrementNanoTime(1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isTrue();
assertThat(call.getController().isCancelled()).isTrue();
call.getController()
.getObserver()
.onError(new RuntimeException("Some upstream exception representing cancellation"));
Expand All @@ -94,18 +103,18 @@ public void testWaitTimeout() throws Exception {
} catch (ExecutionException t) {
actualError = t.getCause();
}
Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void testIdleTimeout() throws InterruptedException {
clock.incrementNanoTime(idleTime.toNanos() - 1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isFalse();
assertThat(call.getController().isCancelled()).isFalse();

clock.incrementNanoTime(1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isTrue();
assertThat(call.getController().isCancelled()).isTrue();
call.getController()
.getObserver()
.onError(new RuntimeException("Some upstream exception representing cancellation"));
Expand All @@ -116,7 +125,7 @@ public void testIdleTimeout() throws InterruptedException {
} catch (ExecutionException t) {
actualError = t.getCause();
}
Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
Expand All @@ -141,20 +150,49 @@ public void testMultiple() throws Exception {
watchdog.run();

// Call1 should be ok
Truth.assertThat(call1.getController().isCancelled()).isFalse();
assertThat(call1.getController().isCancelled()).isFalse();
// Should not throw
Truth.assertThat(downstreamObserver1.done.isDone()).isFalse();
assertThat(downstreamObserver1.done.isDone()).isFalse();

// Call2 should be timed out
Truth.assertThat(call2.getController().isCancelled()).isTrue();
assertThat(call2.getController().isCancelled()).isTrue();
call2.getController().getObserver().onError(new CancellationException("User cancelled"));
Throwable error = null;
try {
downstreamObserver2.done.get();
} catch (ExecutionException t) {
error = t.getCause();
}
Truth.assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
@SuppressWarnings("unchecked")
public void testWatchdogBeingClosed() {
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
Mockito.when(
mockExecutor.scheduleAtFixedRate(
Mockito.any(Watchdog.class),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.any(TimeUnit.class)))
.thenReturn(future);
Watchdog underTest = Watchdog.create(clock, checkInterval, mockExecutor);
assertThat(underTest).isInstanceOf(BackgroundResource.class);

underTest.close();
underTest.shutdown();

Mockito.verify(mockExecutor)
.scheduleAtFixedRate(
underTest, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS);
Mockito.verify(future, Mockito.times(2)).cancel(false);
Mockito.verify(mockExecutor, Mockito.times(2)).shutdown();

underTest.shutdownNow();
Mockito.verify(future).cancel(true);
Mockito.verify(mockExecutor).shutdownNow();
}

static class AccumulatingObserver<T> implements ResponseObserver<T> {
Expand Down