Skip to content

Commit

Permalink
Allow Service to transition from STARTING to TERMINATED. Add a new "d…
Browse files Browse the repository at this point in the history
…oCancelStart" method that is called when a service is stopped while still starting. This gives implementations a chance to abort code which may be preventing the service from starting.

RELNOTES=Added `doCancelStart` protected method to `AbstractService`

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=207746343
  • Loading branch information
herbyderby authored and ronshapiro committed Aug 9, 2018
1 parent 0b3eb0f commit 27bfe41
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,53 @@ public void failure(Service service) {
manager.awaitStopped(10, TimeUnit.MILLISECONDS);
}

public void testDoCancelStart() throws TimeoutException {
Service a =
new AbstractService() {
@Override
protected void doStart() {
// Never starts!
}

@Override
protected void doCancelStart() {
assertThat(state()).isEqualTo(Service.State.STOPPING);
notifyStopped();
}

@Override
protected void doStop() {
throw new AssertionError(); // Should not be called.
}
};

final ServiceManager manager = new ServiceManager(asList(a));
manager.startAsync();
manager.stopAsync();
manager.awaitStopped(10, TimeUnit.MILLISECONDS);
assertThat(manager.servicesByState().keySet()).containsExactly(Service.State.TERMINATED);
}

public void testNotifyStoppedAfterFailure() throws TimeoutException {
Service a =
new AbstractService() {
@Override
protected void doStart() {
notifyFailed(new IllegalStateException("start failure"));
notifyStopped(); // This will be a no-op.
}

@Override
protected void doStop() {
notifyStopped();
}
};
final ServiceManager manager = new ServiceManager(asList(a));
manager.startAsync();
manager.awaitStopped(10, TimeUnit.MILLISECONDS);
assertThat(manager.servicesByState().keySet()).containsExactly(Service.State.FAILED);
}

private static void assertState(
ServiceManager manager, Service.State state, Service... services) {
Collection<Service> managerServices = manager.servicesByState().get(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public String toString() {

private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
terminatedEvent(NEW);
private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT =
terminatedEvent(STARTING);
private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
terminatedEvent(RUNNING);
private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
Expand Down Expand Up @@ -211,10 +213,31 @@ protected AbstractService() {}
* <p>This method should return promptly; prefer to do work on a different thread where it is
* convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
* called multiple times.
*
* <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not
* invoked immediately. Instead, it will be deferred until after the service is {@link
* State#RUNNING}. Services that need to cancel startup work can override {#link #doCancelStart}.
*/
@ForOverride
protected abstract void doStop();

/**
* This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link
* #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the
* method to cancel pending work and then call {@link #notifyStopped} to stop the service.
*
* <p>This method should return promptly; prefer to do work on a different thread where it is
* convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
* called multiple times.
*
* <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which
* is the external state observable by the caller of {@link #stopAsync}.
*
* @since NEXT
*/
@ForOverride
protected void doCancelStart() {}

@CanIgnoreReturnValue
@Override
public final Service startAsync() {
Expand Down Expand Up @@ -249,6 +272,7 @@ public final Service stopAsync() {
case STARTING:
snapshot = new StateSnapshot(STARTING, true, null);
enqueueStoppingEvent(STARTING);
doCancelStart();
break;
case RUNNING:
snapshot = new StateSnapshot(STOPPING);
Expand All @@ -260,8 +284,6 @@ public final Service stopAsync() {
case FAILED:
// These cases are impossible due to the if statement above.
throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
default:
throw new AssertionError("Unexpected state: " + previous);
}
} catch (Throwable shutdownFailure) {
notifyFailed(shutdownFailure);
Expand Down Expand Up @@ -384,25 +406,28 @@ protected final void notifyStarted() {

/**
* Implementing classes should invoke this method once their service has stopped. It will cause
* the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
* the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link
* State#TERMINATED}.
*
* @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link
* State#RUNNING}.
* @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link
* State#STARTING}, or {@link State#RUNNING}.
*/
protected final void notifyStopped() {
monitor.enter();
try {
// We check the internal state of the snapshot instead of state() directly so we don't allow
// notifyStopped() to be called while STARTING, even if stop() has already been called.
State previous = snapshot.state;
if (previous != STOPPING && previous != RUNNING) {
IllegalStateException failure =
new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
notifyFailed(failure);
throw failure;
State previous = state();
switch (previous) {
case NEW:
case TERMINATED:
case FAILED:
throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
case RUNNING:
case STARTING:
case STOPPING:
snapshot = new StateSnapshot(TERMINATED);
enqueueTerminatedEvent(previous);
break;
}
snapshot = new StateSnapshot(TERMINATED);
enqueueTerminatedEvent(previous);
} finally {
monitor.leave();
dispatchListenerEvents();
Expand Down Expand Up @@ -433,8 +458,6 @@ protected final void notifyFailed(Throwable cause) {
case FAILED:
// Do nothing
break;
default:
throw new AssertionError("Unexpected state: " + previous);
}
} finally {
monitor.leave();
Expand Down Expand Up @@ -502,16 +525,17 @@ private void enqueueTerminatedEvent(final State from) {
case NEW:
listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
break;
case STARTING:
listeners.enqueue(TERMINATED_FROM_STARTING_EVENT);
break;
case RUNNING:
listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
break;
case STOPPING:
listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
break;
case STARTING:
case TERMINATED:
case FAILED:
default:
throw new AssertionError();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ public void stopping(State from) {}
* diagram. Therefore, if this method is called, no other methods will be called on the {@link
* Listener}.
*
* @param from The previous state that is being transitioned from. The only valid values for
* this are {@linkplain State#NEW NEW}, {@linkplain State#RUNNING RUNNING} or {@linkplain
* State#STOPPING STOPPING}.
* @param from The previous state that is being transitioned from. Failure can occur in any
* state with the exception of {@linkplain State#FAILED FAILED} and {@linkplain
* State#TERMINATED TERMINATED}.
*/
public void terminated(State from) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,53 @@ public void failure(Service service) {
manager.awaitStopped(10, TimeUnit.MILLISECONDS);
}

public void testDoCancelStart() throws TimeoutException {
Service a =
new AbstractService() {
@Override
protected void doStart() {
// Never starts!
}

@Override
protected void doCancelStart() {
assertThat(state()).isEqualTo(Service.State.STOPPING);
notifyStopped();
}

@Override
protected void doStop() {
throw new AssertionError(); // Should not be called.
}
};

final ServiceManager manager = new ServiceManager(asList(a));
manager.startAsync();
manager.stopAsync();
manager.awaitStopped(10, TimeUnit.MILLISECONDS);
assertThat(manager.servicesByState().keySet()).containsExactly(Service.State.TERMINATED);
}

public void testNotifyStoppedAfterFailure() throws TimeoutException {
Service a =
new AbstractService() {
@Override
protected void doStart() {
notifyFailed(new IllegalStateException("start failure"));
notifyStopped(); // This will be a no-op.
}

@Override
protected void doStop() {
notifyStopped();
}
};
final ServiceManager manager = new ServiceManager(asList(a));
manager.startAsync();
manager.awaitStopped(10, TimeUnit.MILLISECONDS);
assertThat(manager.servicesByState().keySet()).containsExactly(Service.State.FAILED);
}

private static void assertState(
ServiceManager manager, Service.State state, Service... services) {
Collection<Service> managerServices = manager.servicesByState().get(state);
Expand Down
62 changes: 43 additions & 19 deletions guava/src/com/google/common/util/concurrent/AbstractService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public String toString() {

private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
terminatedEvent(NEW);
private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT =
terminatedEvent(STARTING);
private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
terminatedEvent(RUNNING);
private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
Expand Down Expand Up @@ -211,10 +213,31 @@ protected AbstractService() {}
* <p>This method should return promptly; prefer to do work on a different thread where it is
* convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
* called multiple times.
*
* <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not
* invoked immediately. Instead, it will be deferred until after the service is {@link
* State#RUNNING}. Services that need to cancel startup work can override {#link #doCancelStart}.
*/
@ForOverride
protected abstract void doStop();

/**
* This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link
* #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the
* method to cancel pending work and then call {@link #notifyStopped} to stop the service.
*
* <p>This method should return promptly; prefer to do work on a different thread where it is
* convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
* called multiple times.
*
* <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which
* is the external state observable by the caller of {@link #stopAsync}.
*
* @since NEXT
*/
@ForOverride
protected void doCancelStart() {}

@CanIgnoreReturnValue
@Override
public final Service startAsync() {
Expand Down Expand Up @@ -249,6 +272,7 @@ public final Service stopAsync() {
case STARTING:
snapshot = new StateSnapshot(STARTING, true, null);
enqueueStoppingEvent(STARTING);
doCancelStart();
break;
case RUNNING:
snapshot = new StateSnapshot(STOPPING);
Expand All @@ -260,8 +284,6 @@ public final Service stopAsync() {
case FAILED:
// These cases are impossible due to the if statement above.
throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
default:
throw new AssertionError("Unexpected state: " + previous);
}
} catch (Throwable shutdownFailure) {
notifyFailed(shutdownFailure);
Expand Down Expand Up @@ -384,25 +406,28 @@ protected final void notifyStarted() {

/**
* Implementing classes should invoke this method once their service has stopped. It will cause
* the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
* the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link
* State#TERMINATED}.
*
* @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link
* State#RUNNING}.
* @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link
* State#STARTING}, or {@link State#RUNNING}.
*/
protected final void notifyStopped() {
monitor.enter();
try {
// We check the internal state of the snapshot instead of state() directly so we don't allow
// notifyStopped() to be called while STARTING, even if stop() has already been called.
State previous = snapshot.state;
if (previous != STOPPING && previous != RUNNING) {
IllegalStateException failure =
new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
notifyFailed(failure);
throw failure;
State previous = state();
switch (previous) {
case NEW:
case TERMINATED:
case FAILED:
throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
case RUNNING:
case STARTING:
case STOPPING:
snapshot = new StateSnapshot(TERMINATED);
enqueueTerminatedEvent(previous);
break;
}
snapshot = new StateSnapshot(TERMINATED);
enqueueTerminatedEvent(previous);
} finally {
monitor.leave();
dispatchListenerEvents();
Expand Down Expand Up @@ -433,8 +458,6 @@ protected final void notifyFailed(Throwable cause) {
case FAILED:
// Do nothing
break;
default:
throw new AssertionError("Unexpected state: " + previous);
}
} finally {
monitor.leave();
Expand Down Expand Up @@ -502,16 +525,17 @@ private void enqueueTerminatedEvent(final State from) {
case NEW:
listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
break;
case STARTING:
listeners.enqueue(TERMINATED_FROM_STARTING_EVENT);
break;
case RUNNING:
listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
break;
case STOPPING:
listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
break;
case STARTING:
case TERMINATED:
case FAILED:
default:
throw new AssertionError();
}
}
Expand Down
6 changes: 3 additions & 3 deletions guava/src/com/google/common/util/concurrent/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ public void stopping(State from) {}
* diagram. Therefore, if this method is called, no other methods will be called on the {@link
* Listener}.
*
* @param from The previous state that is being transitioned from. The only valid values for
* this are {@linkplain State#NEW NEW}, {@linkplain State#RUNNING RUNNING} or {@linkplain
* State#STOPPING STOPPING}.
* @param from The previous state that is being transitioned from. Failure can occur in any
* state with the exception of {@linkplain State#FAILED FAILED} and {@linkplain
* State#TERMINATED TERMINATED}.
*/
public void terminated(State from) {}

Expand Down

0 comments on commit 27bfe41

Please sign in to comment.