diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java index 04a7174bad9d..89315a714e34 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java @@ -19,7 +19,6 @@ import java.lang.reflect.Type; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -28,6 +27,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.apache.commons.logging.Log; @@ -439,7 +439,7 @@ else if (logger.isDebugEnabled()) { String receiptId = headers.getReceiptId(); ReceiptHandler handler = this.receiptHandlers.get(receiptId); if (handler != null) { - handler.handleReceiptReceived(); + handler.handleReceiptReceived(headers); } else if (logger.isDebugEnabled()) { logger.debug("No matching receipt: " + accessor.getDetailedLogMessage(message.getPayload())); @@ -544,7 +544,7 @@ private class ReceiptHandler implements Receiptable { @Nullable private final String receiptId; - private final List receiptCallbacks = new ArrayList<>(2); + private final List> receiptCallbacks = new ArrayList<>(2); private final List receiptLostCallbacks = new ArrayList<>(2); @@ -554,6 +554,9 @@ private class ReceiptHandler implements Receiptable { @Nullable private Boolean result; + @Nullable + private StompHeaders receiptHeaders; + public ReceiptHandler(@Nullable String receiptId) { this.receiptId = receiptId; if (receiptId != null) { @@ -576,64 +579,80 @@ public String getReceiptId() { @Override public void addReceiptTask(Runnable task) { - addTask(task, true); + addReceiptTask(headers -> task.run()); } @Override - public void addReceiptLostTask(Runnable task) { - addTask(task, false); - } - - private void addTask(Runnable task, boolean successTask) { - Assert.notNull(this.receiptId, - "To track receipts, set autoReceiptEnabled=true or add 'receiptId' header"); + public void addReceiptTask(Consumer task) { + Assert.notNull(this.receiptId, "Set autoReceiptEnabled to track receipts or add a 'receiptId' header"); synchronized (this) { - if (this.result != null && this.result == successTask) { - invoke(Collections.singletonList(task)); + if (this.result != null) { + if (this.result) { + task.accept(this.receiptHeaders); + } } else { - if (successTask) { - this.receiptCallbacks.add(task); - } - else { - this.receiptLostCallbacks.add(task); - } + this.receiptCallbacks.add(task); } } } - private void invoke(List callbacks) { - for (Runnable runnable : callbacks) { - try { - runnable.run(); + @Override + public void addReceiptLostTask(Runnable task) { + synchronized (this) { + if (this.result != null) { + if (!this.result) { + task.run(); + } } - catch (Throwable ex) { - // ignore + else { + this.receiptLostCallbacks.add(task); } } } - public void handleReceiptReceived() { - handleInternal(true); + public void handleReceiptReceived(StompHeaders receiptHeaders) { + handleInternal(true, receiptHeaders); } public void handleReceiptNotReceived() { - handleInternal(false); + handleInternal(false, null); } - private void handleInternal(boolean result) { + private void handleInternal(boolean result, @Nullable StompHeaders receiptHeaders) { synchronized (this) { if (this.result != null) { return; } this.result = result; - invoke(result ? this.receiptCallbacks : this.receiptLostCallbacks); + this.receiptHeaders = receiptHeaders; + if (result) { + this.receiptCallbacks.forEach(consumer -> { + try { + consumer.accept(this.receiptHeaders); + } + catch (Throwable ex) { + // ignore + } + }); + } + else { + this.receiptLostCallbacks.forEach(task -> { + try { + task.run(); + } + catch (Throwable ex) { + // ignore + } + }); + } DefaultStompSession.this.receiptHandlers.remove(this.receiptId); if (this.future != null) { this.future.cancel(true); } } } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java index 01ad81b8d444..94875b6d0a97 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.messaging.simp.stomp; +import java.util.function.Consumer; + import org.springframework.lang.Nullable; /** @@ -139,16 +141,27 @@ interface Receiptable { /** * Task to invoke when a receipt is received. + * @param task the task to invoke + * @throws java.lang.IllegalArgumentException if the receiptId is {@code null} + */ + void addReceiptTask(Runnable task); + + /** + * Variant of {@link #addReceiptTask(Runnable)} with a {@link Consumer} + * of the headers from the {@code RECEIPT} frame. + * @param task the consumer to invoke * @throws java.lang.IllegalArgumentException if the receiptId is {@code null} + * @since 5.3.23 */ - void addReceiptTask(Runnable runnable); + void addReceiptTask(Consumer task); /** * Task to invoke when a receipt is not received in the configured time. + * @param task the task to invoke * @throws java.lang.IllegalArgumentException if the receiptId is {@code null} * @see org.springframework.messaging.simp.stomp.StompClientSupport#setReceiptTimeLimit(long) */ - void addReceiptLostTask(Runnable runnable); + void addReceiptLostTask(Runnable task); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java index 7d13c217ee54..d4b0dbc9e69d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java @@ -575,22 +575,30 @@ public void receiptReceived() { this.session.setTaskScheduler(mock(TaskScheduler.class)); AtomicReference received = new AtomicReference<>(); + AtomicReference receivedHeaders = new AtomicReference<>(); StompHeaders headers = new StompHeaders(); headers.setDestination("/topic/foo"); headers.setReceipt("my-receipt"); Subscription subscription = this.session.subscribe(headers, mock(StompFrameHandler.class)); - subscription.addReceiptTask(() -> received.set(true)); + subscription.addReceiptTask(receiptHeaders -> { + received.set(true); + receivedHeaders.set(receiptHeaders); + }); assertThat((Object) received.get()).isNull(); StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT); accessor.setReceiptId("my-receipt"); + accessor.setNativeHeader("foo", "bar"); accessor.setLeaveMutable(true); this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders())); assertThat(received.get()).isNotNull(); assertThat(received.get()).isTrue(); + assertThat(receivedHeaders.get()).isNotNull(); + assertThat(receivedHeaders.get().get("foo").size()).isEqualTo(1); + assertThat(receivedHeaders.get().get("foo").get(0)).isEqualTo("bar"); } @Test @@ -599,6 +607,7 @@ public void receiptReceivedBeforeTaskAdded() { this.session.setTaskScheduler(mock(TaskScheduler.class)); AtomicReference received = new AtomicReference<>(); + AtomicReference receivedHeaders = new AtomicReference<>(); StompHeaders headers = new StompHeaders(); headers.setDestination("/topic/foo"); @@ -607,13 +616,20 @@ public void receiptReceivedBeforeTaskAdded() { StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT); accessor.setReceiptId("my-receipt"); + accessor.setNativeHeader("foo", "bar"); accessor.setLeaveMutable(true); this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders())); - subscription.addReceiptTask(() -> received.set(true)); + subscription.addReceiptTask(receiptHeaders -> { + received.set(true); + receivedHeaders.set(receiptHeaders); + }); assertThat(received.get()).isNotNull(); assertThat(received.get()).isTrue(); + assertThat(receivedHeaders.get()).isNotNull(); + assertThat(receivedHeaders.get().get("foo").size()).isEqualTo(1); + assertThat(receivedHeaders.get().get("foo").get(0)).isEqualTo("bar"); } @Test diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/ConnectionManagerSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/ConnectionManagerSupport.java index 03e816d58837..147938331c05 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/client/ConnectionManagerSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/ConnectionManagerSupport.java @@ -25,11 +25,11 @@ import org.springframework.web.util.UriComponentsBuilder; /** - * A base class for WebSocket connection managers. Provides a declarative style of - * connecting to a WebSocket server given a URI to connect to. The connection occurs when - * the Spring ApplicationContext is refreshed, if the {@link #autoStartup} property is set - * to {@code true}, or if set to {@code false}, the {@link #start()} and #stop methods can - * be invoked manually. + * Base class for a connection manager that automates the process of connecting + * to a WebSocket server with the Spring ApplicationContext lifecycle. Connects + * to a WebSocket server on {@link #start()} and disconnects on {@link #stop()}. + * If {@link #setAutoStartup(boolean)} is set to {@code true} this will be done + * automatically when the Spring {@code ApplicationContext} is refreshed. * * @author Rossen Stoyanchev * @since 4.0 @@ -163,11 +163,19 @@ public boolean isRunning() { return this.running; } + /** + * Whether the connection is open/{@code true} or closed/{@code false}. + */ + public abstract boolean isConnected(); + /** + * Subclasses implement this to actually establish the connection. + */ protected abstract void openConnection(); + /** + * Subclasses implement this to close the connection. + */ protected abstract void closeConnection() throws Exception; - protected abstract boolean isConnected(); - } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java index 37f159e75eef..326c693d6f53 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java @@ -28,10 +28,9 @@ import org.springframework.web.socket.handler.LoggingWebSocketHandlerDecorator; /** - * A WebSocket connection manager that is given a URI, a {@link WebSocketClient}, and a - * {@link WebSocketHandler}, connects to a WebSocket server through {@link #start()} and - * {@link #stop()} methods. If {@link #setAutoStartup(boolean)} is set to {@code true} - * this will be done automatically when the Spring ApplicationContext is refreshed. + * WebSocket {@link ConnectionManagerSupport connection manager} that connects + * to the server via {@link WebSocketClient} and handles the session with a + * {@link WebSocketHandler}. * * @author Rossen Stoyanchev * @author Sam Brannen @@ -58,14 +57,6 @@ public WebSocketConnectionManager(WebSocketClient client, } - /** - * Decorate the WebSocketHandler provided to the class constructor. - *

By default {@link LoggingWebSocketHandlerDecorator} is added. - */ - protected WebSocketHandler decorateWebSocketHandler(WebSocketHandler handler) { - return new LoggingWebSocketHandlerDecorator(handler); - } - /** * Set the sub-protocols to use. If configured, specified sub-protocols will be * requested in the handshake through the {@code Sec-WebSocket-Protocol} header. The @@ -130,6 +121,11 @@ public void stopInternal() throws Exception { super.stopInternal(); } + @Override + public boolean isConnected() { + return (this.webSocketSession != null && this.webSocketSession.isOpen()); + } + @Override protected void openConnection() { if (logger.isInfoEnabled()) { @@ -157,9 +153,12 @@ protected void closeConnection() throws Exception { } } - @Override - protected boolean isConnected() { - return (this.webSocketSession != null && this.webSocketSession.isOpen()); + /** + * Decorate the WebSocketHandler provided to the class constructor. + *

By default {@link LoggingWebSocketHandlerDecorator} is added. + */ + protected WebSocketHandler decorateWebSocketHandler(WebSocketHandler handler) { + return new LoggingWebSocketHandlerDecorator(handler); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/AnnotatedEndpointConnectionManager.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/AnnotatedEndpointConnectionManager.java index 7347791e1ade..debc3ebecbe4 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/AnnotatedEndpointConnectionManager.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/AnnotatedEndpointConnectionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,11 +31,9 @@ import org.springframework.web.socket.handler.BeanCreatingHandlerProvider; /** - * A WebSocket connection manager that is given a URI, a - * {@link jakarta.websocket.ClientEndpoint}-annotated endpoint, connects to a - * WebSocket server through the {@link #start()} and {@link #stop()} methods. - * If {@link #setAutoStartup(boolean)} is set to {@code true} this will be - * done automatically when the Spring ApplicationContext is refreshed. + * WebSocket {@link ConnectionManagerSupport connection manager} that connects + * to the server via {@link WebSocketContainer} and handles the session with an + * {@link javax.websocket.ClientEndpoint @ClientEndpoint} endpoint. * * @author Rossen Stoyanchev * @since 4.0 @@ -101,6 +99,12 @@ public TaskExecutor getTaskExecutor() { } + @Override + public boolean isConnected() { + Session session = this.session; + return (session != null && session.isOpen()); + } + @Override protected void openConnection() { this.taskExecutor.execute(() -> { @@ -135,10 +139,4 @@ protected void closeConnection() throws Exception { } } - @Override - protected boolean isConnected() { - Session session = this.session; - return (session != null && session.isOpen()); - } - } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/EndpointConnectionManager.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/EndpointConnectionManager.java index c58fb951e971..0960f70159ab 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/EndpointConnectionManager.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/standard/EndpointConnectionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,10 +39,9 @@ import org.springframework.web.socket.handler.BeanCreatingHandlerProvider; /** - * A WebSocket connection manager that is given a URI, an {@link Endpoint}, connects to a - * WebSocket server through the {@link #start()} and {@link #stop()} methods. If - * {@link #setAutoStartup(boolean)} is set to {@code true} this will be done automatically - * when the Spring ApplicationContext is refreshed. + * WebSocket {@link ConnectionManagerSupport connection manager} that connects + * to the server via {@link WebSocketContainer} and handles the session with an + * {@link Endpoint}. * * @author Rossen Stoyanchev * @since 4.0 @@ -133,6 +132,12 @@ public TaskExecutor getTaskExecutor() { } + @Override + public boolean isConnected() { + Session session = this.session; + return (session != null && session.isOpen()); + } + @Override protected void openConnection() { this.taskExecutor.execute(() -> { @@ -168,10 +173,4 @@ protected void closeConnection() throws Exception { } } - @Override - protected boolean isConnected() { - Session session = this.session; - return (session != null && session.isOpen()); - } - } diff --git a/src/docs/asciidoc/integration.adoc b/src/docs/asciidoc/integration.adoc index 866fc8f14903..f69d21f31257 100644 --- a/src/docs/asciidoc/integration.adoc +++ b/src/docs/asciidoc/integration.adoc @@ -325,7 +325,7 @@ to serialize only a subset of the object properties, as the following example sh ---- [[rest-template-multipart]] -===== Multipart +==== Multipart To send multipart data, you need to provide a `MultiValueMap` whose values may be an `Object` for part content, a `Resource` for a file part, or an `HttpEntity` for diff --git a/src/docs/asciidoc/web/websocket.adoc b/src/docs/asciidoc/web/websocket.adoc index a230337e1893..36f33c55722c 100644 --- a/src/docs/asciidoc/web/websocket.adoc +++ b/src/docs/asciidoc/web/websocket.adoc @@ -1347,7 +1347,7 @@ receipt if the server supports it (simple broker does not). For example, with th headers.setDestination("/topic/..."); headers.setReceipt("r1"); FrameHandler handler = ...; - stompSession.subscribe(headers, handler).addReceiptTask(() -> { + stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> { // Subscription ready... }); ----