diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultLoopResources.java b/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultLoopResources.java index 10743f0766..ae54f39d83 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultLoopResources.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultLoopResources.java @@ -129,7 +129,8 @@ public Mono disposeLater(Duration quietPeriod, Duration timeout) { } } - return Mono.when(clMono, sslMono, slMono, cnclMono, cnslMono, cnsrvlMono); + return Mono.when(clMono, sslMono, slMono, cnclMono, cnslMono, cnsrvlMono) + .timeout(timeout, Mono.error(new IllegalStateException("LoopResources couldn't be disposed within " + timeout.toMillis() + "ms"))); }); } diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java index 45c22f9158..86548f7123 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java @@ -1896,6 +1896,73 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) .isEqualTo("delay500delay1000"); } + @Test + public void testGracefulShutdownIssue3509() throws Exception { + CountDownLatch connectionOpenedLatch = new CountDownLatch(2); + CountDownLatch connectionClosedLatch = new CountDownLatch(1); + CountDownLatch responseReceivedLatch = new CountDownLatch(1); + LoopResources loop = LoopResources.create("testGracefulShutdownIssue3509"); + AtomicBoolean stop = new AtomicBoolean(false); + Sinks.One delay = Sinks.one(); + this.disposableServer = + createServer().runOn(loop) + .doOnConnection(c -> { + c.onDispose().subscribe(null, null, connectionClosedLatch::countDown); + connectionOpenedLatch.countDown(); + }) + // Register a channel group, when invoking disposeNow() + // the implementation will wait for the active requests to finish + .channelGroup(new DefaultChannelGroup(new SingleThreadEventExecutor())) + .route(r -> + r.get("/delay", (req, res) -> res.sendString(delay.asMono())) + .get("/cpuIntensive", (req, res) -> { + // Simulate some long-running CPU-intensive work + while (!stop.get()) { + // this is deliberate + } + return res.sendString(Mono.just("cpuIntensive")); + })) + .bindNow(Duration.ofSeconds(30)); + + HttpClient client = createClient(this.disposableServer::address); + + AtomicReference result = new AtomicReference<>(); + Flux.just("/delay", "/cpuIntensive") + .flatMap(s -> client.get().uri(s).responseContent().aggregate().asString()) + .subscribe(s -> { + result.set(s); + responseReceivedLatch.countDown(); + }); + + assertThat(connectionOpenedLatch.await(30, TimeUnit.SECONDS)).isTrue(); + + // Stop accepting incoming requests, wait at most 1s for the active requests to finish + try { + this.disposableServer.channel() + .closeFuture() + .addListener(f -> delay.tryEmitValue("delay")); + // Reduce the timeout for testing purposes, by default it is 3s + this.disposableServer.disposeNow(Duration.ofSeconds(1)); + fail("Expectation is that the socket cannot be closed"); + } + catch (IllegalStateException e) { + // The socket couldn't be stopped, continue with shutdown + this.disposableServer = null; + } + + // Reduce the quiet period and the timeout for testing purposes + // by default the quiet period is 2s and the timeout is 15s + loop.disposeLater(Duration.ofMillis(100), Duration.ofSeconds(1)) + .as(StepVerifier::create) + .expectError(IllegalStateException.class) + .verify(Duration.ofSeconds(5)); + + assertThat(connectionClosedLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(responseReceivedLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(result.get()).isNotNull().isEqualTo("delay"); + stop.set(true); + } + @Test void testHttpServerWithDomainSocketsWithHost() { assertThatExceptionOfType(IllegalArgumentException.class)