Skip to content

Commit

Permalink
Detecting timeouts when a service stops
Browse files Browse the repository at this point in the history
  • Loading branch information
galaxina committed Nov 4, 2024
1 parent c068611 commit 8206454
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import oap.util.Maps;
import org.slf4j.Logger;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.net.URL;
Expand Down Expand Up @@ -70,15 +69,8 @@ public KernelTest() {
}


@BeforeMethod
public void beforeMethod() {
Env.set( "APPLICATION_STOP_DETECT_TIMEOUT", String.valueOf( 1 ) );
}

@AfterMethod
public void afterMethod() {
Env.set( "APPLICATION_STOP_DETECT_TIMEOUT", null );

new ArrayList<>( System.getenv().keySet() )
.stream()
.filter( k -> k.startsWith( "CONFIG." ) )
Expand All @@ -95,7 +87,7 @@ public void testLifecycle() {
TestLifecycle delayScheduled;

try( var kernel = new Kernel( modules ) ) {
kernel.start( Map.of( "boot.main", "lifecycle" ) );
kernel.start( Map.of( "boot.main", "lifecycle", "shutdown.serviceTimeout", 1 ) );

service = kernel.<TestLifecycle>service( "lifecycle", "service" ).orElseThrow();
thread = kernel.<TestLifecycle>service( "lifecycle.thread" ).orElseThrow();
Expand Down Expand Up @@ -162,9 +154,9 @@ public void start() {

@Test
public void disabled() {
var modules = List.of( url( "disabled/disabled.conf" ) );
List<URL> modules = List.of( url( "disabled/disabled.conf" ) );

var kernel = new Kernel( modules );
Kernel kernel = new Kernel( modules );
try {
kernel.start( Map.of( "boot.main", "disabled" ) );

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
boot.main = m3

shutdown {
serviceTimeout = 1
serviceTimeout = ${?SHUTDOWN_SERVICE_TIMEOUT}

serviceAsyncShutdownAfterTimeout = false
serviceTimeout = ${?SHUTDOWN_SERVICE_ASYNC_SHUTDOWN_AFTER_TIMEOUT}
}

services {
m2.ServiceTwo.parameters.j = ${a.b}
m1.ServiceOneP1.parameters.i2 = ${one.i2}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import oap.io.Files;
import oap.io.content.ContentReader;
import oap.json.Binder;
import oap.util.Dates;
import oap.util.Lists;
import oap.util.Stream;

Expand All @@ -50,8 +51,9 @@ public final class ApplicationConfiguration {
public static final String PREFIX = "CONFIG.";
public final Map<String, ApplicationConfigurationModule> services = new LinkedHashMap<>();
public final ModuleBoot boot = new ModuleBoot();
public final ModuleShutdown shutdown = new ModuleShutdown();

private ApplicationConfiguration() {
ApplicationConfiguration() {
}

@SneakyThrows
Expand Down Expand Up @@ -143,4 +145,10 @@ public static class ModuleBoot {
public final LinkedHashSet<String> main = new LinkedHashSet<>();
public boolean allowActiveByDefault = false;
}

@ToString
public static class ModuleShutdown {
public long serviceTimeout = Dates.s( 5 );
public boolean serviceAsyncShutdownAfterTimeout = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class Kernel implements Closeable, AutoCloseable {
final String name;
private final List<URL> moduleConfigurations;
private final Supervisor supervisor = new Supervisor();
private ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration();

public Kernel( String name, List<URL> moduleConfigurations ) {
this.name = name;
Expand Down Expand Up @@ -129,8 +130,7 @@ private static void configureModules( LinkedHashMap<String, ModuleWithLocation>
}
implementations.put( new Reference( module.name, serviceName ), ServiceKernelCommand.INSTANCE.reference( ref, null ) );
}
default ->
throw new ApplicationException( "Service " + module.name + "." + serviceName + " configuration must be of type Map<String,?> or it must be a reference to the implementation of an abstract service in the form of <modules.[module name].[service name]>" );
default -> throw new ApplicationException( "Service " + module.name + "." + serviceName + " configuration must be of type Map<String,?> or it must be a reference to the implementation of an abstract service in the form of <modules.[module name].[service name]>" );
}
}
}
Expand Down Expand Up @@ -159,7 +159,8 @@ private void linkListeners( ModuleItem.ServiceItem serviceItem, Object instance
}

public void start() throws ApplicationException {
start( ApplicationConfiguration.load() );
applicationConfiguration = ApplicationConfiguration.load();
start( applicationConfiguration );
}

public void start( String appConfigPath, String confd ) throws ApplicationException {
Expand Down Expand Up @@ -487,7 +488,7 @@ private void startService( Supervisor supervisor, ModuleItem.ServiceItem si ) {
}

if( service.supervision.thread ) {
supervisor.startThread( si.serviceName, instance );
supervisor.startThread( si.serviceName, instance, applicationConfiguration.shutdown );
} else {
if( service.supervision.schedule && service.supervision.cron != null )
supervisor.scheduleCron( si.serviceName, ( Runnable ) instance,
Expand All @@ -507,8 +508,8 @@ public void register( ModuleItem.ServiceItem serviceItem, String serviceName ) t

public void stop() {
log.debug( "stopping application kernel {}...", name );
supervisor.preStop();
supervisor.stop();
supervisor.preStop( applicationConfiguration.shutdown );
supervisor.stop( applicationConfiguration.shutdown );
services.clear();
log.debug( "application kernel stopped {}", name );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package oap.application.supervision;

import lombok.extern.slf4j.Slf4j;
import oap.application.ApplicationConfiguration;
import oap.application.KernelHelper;
import oap.concurrent.Executors;
import oap.util.BiStream;
Expand All @@ -34,7 +35,7 @@
import java.io.Closeable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -49,20 +50,21 @@ public class Supervisor {
private boolean stopped = false;

private static void runAndDetectTimeout( String name, ShutdownConfiguration shutdownConfiguration, Runnable func ) {
if( shutdownConfiguration.timeoutMs > 0 ) {
long serviceTimeout = shutdownConfiguration.shutdown.serviceTimeout;
if( serviceTimeout > 0 ) {
long start = DateTimeUtils.currentTimeMillis();
Future<?> future = shutdownConfiguration.submit( func );
try {
future.get( shutdownConfiguration.timeoutMs, TimeUnit.MILLISECONDS );
future.get( serviceTimeout, TimeUnit.MILLISECONDS );
} catch( InterruptedException e ) {
log.trace( e.getMessage() );
} catch( ExecutionException e ) {
throw Throwables.propagate( e );
} catch( TimeoutException e ) {
log.warn( "APP_TIMEOUT_START service {} after {}", name, Dates.durationToString( shutdownConfiguration.timeoutMs ) );
log.warn( "APP_TIMEOUT_START service {} after {}", name, Dates.durationToString( serviceTimeout ) );

try {
if( !shutdownConfiguration.forceAsyncAfterTimeout ) {
if( !shutdownConfiguration.shutdown.serviceAsyncShutdownAfterTimeout ) {
future.get();

log.warn( "APP_TIMEOUT_END service {} done in {}", name, Dates.durationToString( DateTimeUtils.currentTimeMillis() - start ) );
Expand Down Expand Up @@ -90,8 +92,8 @@ public synchronized void startSupervised( String name, Object service,
// this.wrappers.put( name, new ThreadService( name, ( Runnable ) instance, this ) );
// }

public synchronized void startThread( String name, Object instance ) {
this.wrappers.put( name, new ThreadService( name, ( Runnable ) instance, this ) );
public synchronized void startThread( String name, Object instance, ApplicationConfiguration.ModuleShutdown shutdown ) {
this.wrappers.put( name, new ThreadService( name, ( Runnable ) instance, this, shutdown ) );
}

public synchronized void scheduleWithFixedDelay( String name, Runnable service, long delay, TimeUnit unit ) {
Expand Down Expand Up @@ -159,42 +161,51 @@ public synchronized void start() {
} );
}

public synchronized void preStop() {
public synchronized void preStop( ApplicationConfiguration.ModuleShutdown shutdown ) {
if( !stopped ) {
log.debug( "pre stopping..." );

BiStream.of( this.wrappers )
.reversed()
.forEach( ( name, service ) -> {
log.debug( "[{}] pre stopping {}...", service.type(), name );
KernelHelper.setThreadNameSuffix( name );
try {
service.preStop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "[{}] pre stopping {}... Done.", service.type(), name );
} );

BiStream.of( this.supervised )
.reversed()
.forEach( ( name, service ) -> {
log.debug( "pre stopping {}...", name );
KernelHelper.setThreadNameSuffix( name );
try {
service.preStop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "pre stopping {}... Done.", name );
} );
try( ShutdownConfiguration shutdownConfiguration = new ShutdownConfiguration( shutdown ) ) {
log.debug( "pre stopping..." );

BiStream.of( this.wrappers )
.reversed()
.forEach( ( name, service ) -> {
Runnable func = () -> {
log.debug( "[{}] pre stopping {}...", service.type(), name );
KernelHelper.setThreadNameSuffix( name );
try {
service.preStop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "[{}] pre stopping {}... Done.", service.type(), name );
};

runAndDetectTimeout( name, shutdownConfiguration, func );
} );

BiStream.of( this.supervised )
.reversed()
.forEach( ( name, service ) -> {
Runnable func = () -> {
log.debug( "pre stopping {}...", name );
KernelHelper.setThreadNameSuffix( name );
try {
service.preStop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "pre stopping {}... Done.", name );
};

runAndDetectTimeout( name, shutdownConfiguration, func );
} );
}
}
}

public synchronized void stop() {
public synchronized void stop( ApplicationConfiguration.ModuleShutdown shutdown ) {
if( !stopped ) {

try( ShutdownConfiguration shutdownConfiguration = new ShutdownConfiguration() ) {
try( ShutdownConfiguration shutdownConfiguration = new ShutdownConfiguration( shutdown ) ) {
log.debug( "stopping..." );
this.stopped = true;

Expand Down Expand Up @@ -237,12 +248,12 @@ public synchronized void stop() {
}
}

public synchronized void stop( String serviceName ) {
public synchronized void stop( String serviceName, ApplicationConfiguration.ModuleShutdown shutdown ) {
if( !stopped ) {
log.debug( "stopping..." );
this.stopped = true;

try( ShutdownConfiguration shutdownConfiguration = new ShutdownConfiguration() ) {
try( ShutdownConfiguration shutdownConfiguration = new ShutdownConfiguration( shutdown ) ) {
BiStream.of( this.wrappers )
.filter( ( name, _ ) -> name.equals( serviceName ) )
.forEach( ( name, service ) -> {
Expand Down Expand Up @@ -284,26 +295,29 @@ public synchronized void stop( String serviceName ) {
}

public static class ShutdownConfiguration implements Closeable {
private static final Set<String> on = Set.of( "on", "1", "true", "ON", "TRUE", "yes", "YES" );
public final long timeoutMs;
public final boolean forceAsyncAfterTimeout;
public final ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();
public final ExecutorService threadPoolExecutor;
private final ApplicationConfiguration.ModuleShutdown shutdown;

public ShutdownConfiguration() {
String timeoutMsStr = System.getenv( "APPLICATION_STOP_DETECT_TIMEOUT" );
this.timeoutMs = timeoutMsStr != null ? Long.parseLong( timeoutMsStr ) : Dates.s( 5 );
public ShutdownConfiguration( ApplicationConfiguration.ModuleShutdown shutdown ) {
this.shutdown = shutdown;

String forceAsyncAfterTimeoutStr = System.getenv( "APPLICATION_FORCE_ASYNC_AFTER_TIMEOUT" );
this.forceAsyncAfterTimeout = forceAsyncAfterTimeoutStr != null && on.contains( forceAsyncAfterTimeoutStr );
threadPoolExecutor = shutdown.serviceTimeout > 0 ? Executors.newCachedThreadPool() : null;
}

@Override
public void close() {
threadPoolExecutor.shutdown();
if( threadPoolExecutor != null ) {
threadPoolExecutor.shutdown();
}
}

public Future<?> submit( Runnable func ) {
return threadPoolExecutor.submit( func );
if( threadPoolExecutor != null ) {
return threadPoolExecutor.submit( func );
} else {
func.run();
return CompletableFuture.completedFuture( null );
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package oap.application.supervision;

import lombok.extern.slf4j.Slf4j;
import oap.application.ApplicationConfiguration;
import oap.concurrent.SynchronizedRunnable;
import oap.concurrent.SynchronizedRunnableReadyListener;
import oap.concurrent.SynchronizedThread;
Expand All @@ -34,16 +35,18 @@
public class ThreadService extends SynchronizedRunnable implements WrapperService<Runnable>, SynchronizedRunnableReadyListener {

private final Supervisor supervisor;
private final ApplicationConfiguration.ModuleShutdown shutdown;
private final SynchronizedThread thread = new SynchronizedThread( this );
private final Runnable supervised;
protected SynchronizedRunnableReadyListener listener;
private AtomicInteger maxFailures = new AtomicInteger( 100 );
private volatile boolean done = false;

public ThreadService( final String name, Runnable supervisee, final Supervisor supervisor ) {
public ThreadService( final String name, Runnable supervisee, Supervisor supervisor, ApplicationConfiguration.ModuleShutdown shutdown ) {
this.supervised = supervisee;

this.supervisor = supervisor;
this.shutdown = shutdown;
this.thread.setName( name );
if( supervisee instanceof SynchronizedRunnable )
( ( SynchronizedRunnable ) supervisee ).readyListener( this );
Expand Down Expand Up @@ -71,8 +74,8 @@ public void run() {
if( maxFailures.get() <= 0 ) {
log.error( supervised + " constantly crushing. Requesting shutdown..." );
new Thread( () -> {
supervisor.preStop();
supervisor.stop();
supervisor.preStop( shutdown );
supervisor.stop( shutdown );
} ).run();
}
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
</distributionManagement>

<properties>
<oap.project.version>22.5.5</oap.project.version>
<oap.project.version>22.5.6</oap.project.version>

<oap.deps.config.version>21.0.0</oap.deps.config.version>
<oap.deps.oap-teamcity.version>21.0.1</oap.deps.oap-teamcity.version>
Expand Down

0 comments on commit 8206454

Please sign in to comment.