Skip to content

Commit

Permalink
Polishing in WebSocketStompClient
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Feb 13, 2024
1 parent b415361 commit 6a5953d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,8 +109,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {

private final Map<String, ReceiptHandler> receiptHandlers = new ConcurrentHashMap<>(4);

/* Whether the client is willfully closing the connection */
private volatile boolean closing;
private volatile boolean clientSideClose;


/**
Expand Down Expand Up @@ -368,7 +367,7 @@ public void disconnect() {

@Override
public void disconnect(@Nullable StompHeaders headers) {
this.closing = true;
this.clientSideClose = true;
try {
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.DISCONNECT);
if (headers != null) {
Expand Down Expand Up @@ -519,7 +518,7 @@ public void afterConnectionClosed() {
if (logger.isDebugEnabled()) {
logger.debug("Connection closed in session id=" + this.sessionId);
}
if (!this.closing) {
if (!this.clientSideClose) {
resetConnection();
handleFailure(new ConnectionLostException("Connection closed"));
}
Expand Down Expand Up @@ -729,11 +728,11 @@ private class ReadInactivityTask implements Runnable {

@Override
public void run() {
closing = true;
String error = "Server has gone quiet. Closing connection in session id=" + sessionId + ".";
String error = "Read inactivity. Closing connection in session id=" + sessionId + ".";
if (logger.isDebugEnabled()) {
logger.debug(error);
}
clientSideClose = true;
resetConnection();
handleFailure(new IllegalStateException(error));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -372,7 +372,7 @@ protected StompHeaders processConnectHeaders(@Nullable StompHeaders connectHeade
private class WebSocketTcpConnectionHandlerAdapter implements BiConsumer<WebSocketSession, Throwable>,
WebSocketHandler, TcpConnection<byte[]> {

private final TcpConnectionHandler<byte[]> connectionHandler;
private final TcpConnectionHandler<byte[]> stompSession;

private final StompWebSocketMessageCodec codec = new StompWebSocketMessageCodec(getInboundMessageSizeLimit());

Expand All @@ -385,17 +385,17 @@ private class WebSocketTcpConnectionHandlerAdapter implements BiConsumer<WebSock

private final List<ScheduledFuture<?>> inactivityTasks = new ArrayList<>(2);

public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> connectionHandler) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
this.connectionHandler = connectionHandler;
public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> stompSession) {
Assert.notNull(stompSession, "TcpConnectionHandler must not be null");
this.stompSession = stompSession;
}

// CompletableFuture callback implementation: handshake outcome

@Override
public void accept(@Nullable WebSocketSession webSocketSession, @Nullable Throwable throwable) {
if (throwable != null) {
this.connectionHandler.afterConnectFailure(throwable);
this.stompSession.afterConnectFailure(throwable);
}
}

Expand All @@ -404,7 +404,7 @@ public void accept(@Nullable WebSocketSession webSocketSession, @Nullable Throwa
@Override
public void afterConnectionEstablished(WebSocketSession session) {
this.session = session;
this.connectionHandler.afterConnected(this);
this.stompSession.afterConnected(this);
}

@Override
Expand All @@ -415,23 +415,23 @@ public void handleMessage(WebSocketSession session, WebSocketMessage<?> webSocke
messages = this.codec.decode(webSocketMessage);
}
catch (Throwable ex) {
this.connectionHandler.handleFailure(ex);
this.stompSession.handleFailure(ex);
return;
}
for (Message<byte[]> message : messages) {
this.connectionHandler.handleMessage(message);
this.stompSession.handleMessage(message);
}
}

@Override
public void handleTransportError(WebSocketSession session, Throwable ex) throws Exception {
this.connectionHandler.handleFailure(ex);
public void handleTransportError(WebSocketSession session, Throwable ex) {
this.stompSession.handleFailure(ex);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
cancelInactivityTasks();
this.connectionHandler.afterConnectionClosed();
this.stompSession.afterConnectionClosed();
}

private void cancelInactivityTasks() {
Expand Down

0 comments on commit 6a5953d

Please sign in to comment.