Skip to content

Commit

Permalink
Improve cancellation of read/write inactivity
Browse files Browse the repository at this point in the history
The cancellation of read and write inactivity tasks was done via
WebSocketHandler#afterConnectionClosed, relying on the WebSocket
library to always invoke the callback.

This change moves the cancellation to the `close` method instead
that in turn is called from DefaultStompSession#resetConnection,
in effect making the cancellation more proactive and aligned with
connection cleanup in DefaultStompSession vs relying on a
subsequent call from the WebSocket library after the connection
is closed.

Closes gh-32195
  • Loading branch information
rstoyanchev committed Feb 13, 2024
1 parent 6be0432 commit 2dd22f6
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -383,7 +382,11 @@ private class WebSocketTcpConnectionHandlerAdapter implements BiConsumer<WebSock

private volatile long lastWriteTime = -1;

private final List<ScheduledFuture<?>> inactivityTasks = new ArrayList<>(2);
@Nullable
private ScheduledFuture<?> readInactivityFuture;

@Nullable
private ScheduledFuture<?> writeInactivityFuture;

public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> stompSession) {
Assert.notNull(stompSession, "TcpConnectionHandler must not be null");
Expand Down Expand Up @@ -430,24 +433,9 @@ public void handleTransportError(WebSocketSession session, Throwable ex) {

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
cancelInactivityTasks();
this.stompSession.afterConnectionClosed();
}

private void cancelInactivityTasks() {
for (ScheduledFuture<?> task : this.inactivityTasks) {
try {
task.cancel(true);
}
catch (Throwable ex) {
// Ignore
}
}
this.lastReadTime = -1;
this.lastWriteTime = -1;
this.inactivityTasks.clear();
}

@Override
public boolean supportsPartialMessages() {
return false;
Expand Down Expand Up @@ -486,7 +474,7 @@ public void onReadInactivity(final Runnable runnable, final long duration) {
Assert.state(getTaskScheduler() != null, "No TaskScheduler configured");
this.lastReadTime = System.currentTimeMillis();
Duration delay = Duration.ofMillis(duration / 2);
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(() -> {
this.readInactivityFuture = getTaskScheduler().scheduleWithFixedDelay(() -> {
if (System.currentTimeMillis() - this.lastReadTime > duration) {
try {
runnable.run();
Expand All @@ -497,15 +485,15 @@ public void onReadInactivity(final Runnable runnable, final long duration) {
}
}
}
}, delay));
}, delay);
}

@Override
public void onWriteInactivity(final Runnable runnable, final long duration) {
Assert.state(getTaskScheduler() != null, "No TaskScheduler configured");
this.lastWriteTime = System.currentTimeMillis();
Duration delay = Duration.ofMillis(duration / 2);
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(() -> {
this.writeInactivityFuture = getTaskScheduler().scheduleWithFixedDelay(() -> {
if (System.currentTimeMillis() - this.lastWriteTime > duration) {
try {
runnable.run();
Expand All @@ -516,11 +504,12 @@ public void onWriteInactivity(final Runnable runnable, final long duration) {
}
}
}
}, delay));
}, delay);
}

@Override
public void close() {
cancelInactivityTasks();
WebSocketSession session = this.session;
if (session != null) {
try {
Expand All @@ -533,6 +522,31 @@ public void close() {
}
}
}

private void cancelInactivityTasks() {
ScheduledFuture<?> readFuture = this.readInactivityFuture;
this.readInactivityFuture = null;
cancelFuture(readFuture);

ScheduledFuture<?> writeFuture = this.writeInactivityFuture;
this.writeInactivityFuture = null;
cancelFuture(writeFuture);

this.lastReadTime = -1;
this.lastWriteTime = -1;
}

private static void cancelFuture(@Nullable ScheduledFuture<?> future) {
if (future != null) {
try {
future.cancel(true);
}
catch (Throwable ex) {
// Ignore
}
}
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ void cancelInactivityTasks() throws Exception {
tcpConnection.onReadInactivity(mock(), 2L);
tcpConnection.onWriteInactivity(mock(), 2L);

this.webSocketHandlerCaptor.getValue().afterConnectionClosed(this.webSocketSession, CloseStatus.NORMAL);
WebSocketHandler handler = this.webSocketHandlerCaptor.getValue();
TcpConnection<?> connection = (TcpConnection<?>) WebSocketHandlerDecorator.unwrap(handler);
connection.close();

verify(future, times(2)).cancel(true);
verifyNoMoreInteractions(future);
Expand Down

0 comments on commit 2dd22f6

Please sign in to comment.