Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 19, 2024
1 parent 27459df commit dde2884
Showing 1 changed file with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1895,25 +1895,26 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg)

@Test
public void testGracefulShutdownIssue3509() throws Exception {
CountDownLatch latch1 = new CountDownLatch(2);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
CountDownLatch connectionOpenedLatch = new CountDownLatch(2);
CountDownLatch connectionClosedLatch = new CountDownLatch(1);
CountDownLatch responseReceivedLatch = new CountDownLatch(1);
LoopResources loop = LoopResources.create("testGracefulShutdownIssue3509");
AtomicBoolean stop = new AtomicBoolean(false);
Sinks.One<String> delay = Sinks.one();
this.disposableServer =
createServer().runOn(loop)
.doOnConnection(c -> {
c.onDispose().subscribe(null, null, latch2::countDown);
latch1.countDown();
c.onDispose().subscribe(null, null, connectionClosedLatch::countDown);
connectionOpenedLatch.countDown();
})
// Register a channel group, when invoking disposeNow()
// the implementation will wait for the active requests to finish
.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()))
.route(r ->
r.get("/delay500", (req, res) -> res.sendString(Mono.just("delay500").delayElement(Duration.ofMillis(500))))
r.get("/delay", (req, res) -> res.sendString(delay.asMono()))
.get("/cpuIntensive", (req, res) -> {
// Simulate some long-running CPU-intensive work
boolean stop = false;
while (!stop) {
while (!stop.get()) {
// this is deliberate
}
return res.sendString(Mono.just("cpuIntensive"));
Expand All @@ -1923,17 +1924,20 @@ public void testGracefulShutdownIssue3509() throws Exception {
HttpClient client = createClient(this.disposableServer::address);

AtomicReference<String> result = new AtomicReference<>();
Flux.just("/delay500", "/cpuIntensive")
Flux.just("/delay", "/cpuIntensive")
.flatMap(s -> client.get().uri(s).responseContent().aggregate().asString())
.subscribe(s -> {
result.set(s);
latch3.countDown();
responseReceivedLatch.countDown();
});

assertThat(latch1.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(connectionOpenedLatch.await(30, TimeUnit.SECONDS)).isTrue();

// Stop accepting incoming requests, wait at most 1s for the active requests to finish
try {
this.disposableServer.channel()
.closeFuture()
.addListener(f -> delay.tryEmitValue("delay"));
// Reduce the timeout for testing purposes, by default it is 3s
this.disposableServer.disposeNow(Duration.ofSeconds(1));
fail("Expectation is that the socket cannot be closed");
Expand All @@ -1950,9 +1954,10 @@ public void testGracefulShutdownIssue3509() throws Exception {
.expectError(IllegalStateException.class)
.verify(Duration.ofSeconds(5));

assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(latch3.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(result.get()).isNotNull().isEqualTo("delay500");
assertThat(connectionClosedLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(responseReceivedLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(result.get()).isNotNull().isEqualTo("delay");
stop.set(true);
}

@Test
Expand Down

0 comments on commit dde2884

Please sign in to comment.