Skip to content

Commit

Permalink
Fix thread leak/invasion
Browse files Browse the repository at this point in the history
Jersey/Jetty, at least in the 3.1 version line, creates one thread for each HTTP request. This behavior was introduced with #5372 and seems not present in the 2.x or 3.x versions of Jersey.

From the javadoc of `java.util.Timer`:
```
Implementation note: All constructors start a timer thread.
...
After the last live reference to a Timer object goes away and all outstanding tasks have completed execution, the timer's task execution thread terminates gracefully (and becomes subject to garbage collection). However, this can take arbitrarily long to occur.
```
It is fair to assume that "arbitrarily long" may also mean _never_, in case GC never runs.

This change replaces the timer & thread per request with a `ScheduledExecutorService` instance per `JettyHttpContainer`.

Also changed the set-timeout mechanism to use `System.nanoTime()` instead of `System.currentTimeMillis()`, because the latter is prone to wall-clock drift and can result into wrong timeout values.

Fixes #5588

Signed-off-by: Robert Stupp <[email protected]>
  • Loading branch information
snazy authored and jansupol committed Apr 4, 2024
1 parent d49a602 commit de52713
Showing 1 changed file with 72 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.security.Principal;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -88,6 +91,8 @@ public final class JettyHttpContainer extends Handler.Abstract implements Contai
*/
private boolean configSetStatusOverSendError;

private final ScheduledThreadPoolExecutor timeoutScheduler;

/**
* Referencing factory for Jetty request.
*/
Expand Down Expand Up @@ -136,7 +141,7 @@ protected void configure() {
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception {

final ResponseWriter responseWriter = new ResponseWriter(request, response, callback, configSetStatusOverSendError);
final ResponseWriter responseWriter = new ResponseWriter(timeoutScheduler, request, response, callback, configSetStatusOverSendError);
try {
LOGGER.debugLog(LocalizationMessages.CONTAINER_STARTED());
final URI baseUri = getBaseUri(request);
Expand Down Expand Up @@ -254,32 +259,44 @@ private static final class ResponseWriter implements ContainerResponseWriter {
private final Response response;
private final Callback callback;
private final boolean configSetStatusOverSendError;
private final Timer timer = new Timer();
private final long asyncStartTimeMillis;
private final long asyncStartTimeNanos;
private final ScheduledExecutorService timeoutScheduler;
private final ConcurrentLinkedQueue<TimeoutHandler> timeoutHandlerQueue = new ConcurrentLinkedQueue<>();
private TimerTask currentTimerTask;
private ScheduledFuture<?> currentTimerTask;

ResponseWriter(final Request request, final Response response, final Callback callback,
final boolean configSetStatusOverSendError) {
ResponseWriter(final ScheduledExecutorService timeoutScheduler, final Request request, final Response response,
final Callback callback, final boolean configSetStatusOverSendError) {
this.timeoutScheduler = timeoutScheduler;
this.request = request;
this.response = response;
this.callback = callback;
this.asyncStartTimeMillis = System.currentTimeMillis();
this.asyncStartTimeNanos = System.nanoTime();
this.configSetStatusOverSendError = configSetStatusOverSendError;
}

private synchronized void setNewTimeout(long timeOut, TimeUnit timeUnit) {
long timeOutMillis = timeUnit.toMillis(timeOut);
long timeOutNanos = timeUnit.toNanos(timeOut);
if (currentTimerTask != null) {
currentTimerTask.cancel();
// Do not interrupt, see callTimeoutHandlers()
currentTimerTask.cancel(false);
}
currentTimerTask = new TimerTask() {
@Override
public void run() {
timeoutHandlerQueue.forEach(timeoutHandler -> timeoutHandler.onTimeout(ResponseWriter.this));
// Use System.nanoTime() as the clock source here, because the returned value is not prone to wall-clock
// drift - unlike System.currentTimeMillis().
long delayNanos = Math.max(asyncStartTimeNanos - System.nanoTime() + timeOutNanos, 0L);
currentTimerTask = timeoutScheduler.schedule(this::callTimeoutHandlers, delayNanos, TimeUnit.NANOSECONDS);
}

private void callTimeoutHandlers() {
// Note: Although it might not happen in practice, it is in theory possible that this function is
// called multiple times concurrently. To prevent any timeout handler being called twice, we poll()
// timeout handlers from the queue, instead of iterating over the queue.
while (true) {
TimeoutHandler handler = timeoutHandlerQueue.poll();
if (handler == null) {
break;
}
};
timer.schedule(currentTimerTask, Math.max(0, timeOutMillis + asyncStartTimeMillis - System.currentTimeMillis()));
handler.onTimeout(ResponseWriter.this);
}
}

@Override
Expand Down Expand Up @@ -419,6 +436,41 @@ public void doStop() throws Exception {
super.doStop();
appHandler.onShutdown(this);
appHandler = null;

timeoutScheduler.shutdown();
boolean needInterrupt = false;
while (true) {
try {
if (timeoutScheduler.awaitTermination(1L, TimeUnit.MINUTES)) {
break;
}
} catch (InterruptedException e) {
if (!needInterrupt) {
needInterrupt = true;
timeoutScheduler.shutdownNow();
}
}
}
if (needInterrupt) {
Thread.currentThread().interrupt();
}
}

private static final AtomicInteger TIMEOUT_HANDLER_ID_GEN = new AtomicInteger();

private static ScheduledThreadPoolExecutor createTimeoutScheduler() {
// Note: creating the thread-pool does not start the core-pool threads.
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r, "JettyHttpContainer-Timeout-Handler #" + TIMEOUT_HANDLER_ID_GEN.incrementAndGet());
t.setDaemon(true);
return t;
});
// Limit the number of timeout handling threads to a quarter of the number of CPUs, at least 2.
executor.setMaximumPoolSize(Math.max(2, Runtime.getRuntime().availableProcessors() / 4));
executor.allowCoreThreadTimeOut(true);
// Don't Keep timeout handling threads around "forever".
executor.setKeepAliveTime(100, TimeUnit.MILLISECONDS);
return executor;
}

/**
Expand All @@ -428,6 +480,7 @@ public void doStop() throws Exception {
* @param parentContext DI provider specific context with application's registered bindings.
*/
JettyHttpContainer(final Application application, final Object parentContext) {
this.timeoutScheduler = createTimeoutScheduler();
this.appHandler = new ApplicationHandler(application, new JettyBinder(), parentContext);
}

Expand All @@ -437,6 +490,7 @@ public void doStop() throws Exception {
* @param application JAX-RS / Jersey application to be deployed on Jetty HTTP container.
*/
JettyHttpContainer(final Application application) {
this.timeoutScheduler = createTimeoutScheduler();
this.appHandler = new ApplicationHandler(application, new JettyBinder());

cacheConfigSetStatusOverSendError();
Expand All @@ -448,6 +502,7 @@ public void doStop() throws Exception {
* @param applicationClass JAX-RS / Jersey class of application to be deployed on Jetty HTTP container.
*/
JettyHttpContainer(final Class<? extends Application> applicationClass) {
this.timeoutScheduler = createTimeoutScheduler();
this.appHandler = new ApplicationHandler(applicationClass, new JettyBinder());

cacheConfigSetStatusOverSendError();
Expand Down

0 comments on commit de52713

Please sign in to comment.