Skip to content

Commit

Permalink
Add isOpen to WebSocketSession in WebFlux
Browse files Browse the repository at this point in the history
Closes gh-26043
  • Loading branch information
rstoyanchev committed Nov 9, 2020
1 parent c73cff8 commit 6bb3ad7
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public interface WebSocketSession {
*/
Mono<Void> send(Publisher<WebSocketMessage> messages);

/**
* Whether the underlying connection is open.
* @since 5.3.1
*/
boolean isOpen();

/**
* Close the WebSocket session with {@link CloseStatus#NORMAL}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return true;
}

@Override
public boolean isOpen() {
return getDelegate().isOpen();
}

@Override
public Mono<Void> close(CloseStatus status) {
getDelegate().close(status.getCode(), status.getReason());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package org.springframework.web.reactive.socket.adapter;

import java.util.function.Consumer;

import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.http.websocket.WebsocketInbound;
Expand Down Expand Up @@ -93,6 +96,13 @@ public Mono<Void> send(Publisher<WebSocketMessage> messages) {
.then();
}

@Override
public boolean isOpen() {
DisposedCallback callback = new DisposedCallback();
getDelegate().getInbound().withConnection(callback);
return callback.isDisposed();
}

@Override
public Mono<Void> close(CloseStatus status) {
// this will notify WebSocketInbound.receiveCloseStatus()
Expand Down Expand Up @@ -129,4 +139,19 @@ public WebsocketOutbound getOutbound() {
}
}


private static class DisposedCallback implements Consumer<Connection> {

private boolean disposed;

public boolean isDisposed() {
return this.disposed;
}

@Override
public void accept(Connection connection) {
this.disposed = connection.isDisposed();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return true;
}

@Override
public boolean isOpen() {
return getDelegate().isOpen();
}

@Override
public Mono<Void> close(CloseStatus status) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return true;
}

@Override
public boolean isOpen() {
return getDelegate().isOpen();
}

@Override
public Mono<Void> close(CloseStatus status) {
CloseMessage cm = new CloseMessage(status.getCode(), status.getReason());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public interface WebSocketSession extends Closeable {
void sendMessage(WebSocketMessage<?> message) throws IOException;

/**
* Return whether the connection is still open.
* Whether the underlying connection is open.
*/
boolean isOpen();

Expand Down

0 comments on commit 6bb3ad7

Please sign in to comment.