Skip to content

Commit

Permalink
WebSockets Next: broadcasting fixes
Browse files Browse the repository at this point in the history
- intentionally ignore 'WebSocket is closed' failures
- do not fail fast but collect all failures

(cherry picked from commit cf67cc1)
  • Loading branch information
mkouba authored and gsmet committed Jul 16, 2024
1 parent d86eaee commit 471e5b1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private static boolean isSecurityFailure(Throwable throwable) {
|| throwable instanceof ForbiddenException;
}

private static boolean isWebSocketIsClosedFailure(Throwable throwable, WebSocketConnectionBase connection) {
static boolean isWebSocketIsClosedFailure(Throwable throwable, WebSocketConnectionBase connection) {
if (!connection.isClosed()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,26 @@ public Uni<Void> sendPong(Buffer data) {
throw new UnsupportedOperationException();
}

private <M> Uni<Void> doSend(BiFunction<WebSocketConnection, M, Uni<Void>> function, M message) {
private <M> Uni<Void> doSend(BiFunction<WebSocketConnection, M, Uni<Void>> sendFunction, M message) {
Set<WebSocketConnection> connections = connectionManager.getConnections(generatedEndpointClass);
if (connections.isEmpty()) {
return Uni.createFrom().voidItem();
}
List<Uni<Void>> unis = new ArrayList<>(connections.size());
for (WebSocketConnection connection : connections) {
if (connection.isOpen() && (filter == null || filter.test(connection))) {
unis.add(function.apply(connection, message));
if (connection.isOpen()
&& (filter == null || filter.test(connection))) {
unis.add(sendFunction.apply(connection, message)
// Intentionally ignore 'WebSocket is closed' failures
// It might happen that the connection is closed in the mean time
.onFailure(t -> Endpoints.isWebSocketIsClosedFailure(t, (WebSocketConnectionBase) connection))
.recoverWithNull());
}
}
return unis.isEmpty() ? Uni.createFrom().voidItem() : Uni.join().all(unis).andFailFast().replaceWithVoid();
if (unis.isEmpty()) {
return Uni.createFrom().voidItem();
}
return Uni.join().all(unis).andCollectFailures().replaceWithVoid();
}

}
Expand Down

0 comments on commit 471e5b1

Please sign in to comment.