From 682f4715cf56fe727a5ee39645300534e5904a84 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Tue, 16 Jan 2024 18:53:01 +0000 Subject: [PATCH] Polishing See gh-30393 --- .../AbstractListenerReadPublisher.java | 23 +++++++++++-------- .../AbstractListenerWebSocketSession.java | 10 +++++--- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 08a27792f5ce..a3753486c35a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -72,8 +72,10 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Nullable private volatile Subscriber subscriber; + /** Flag to defer transition to COMPLETED briefly while SUBSCRIBING or READING. */ private volatile boolean completionPending; + /** Flag to defer transition to COMPLETED briefly while SUBSCRIBING or READING. */ @Nullable private volatile Throwable errorPending; @@ -123,8 +125,8 @@ public final void onDataAvailable() { } /** - * Subclasses can call this method to delegate a container notification when - * all data has been read. + * Subclasses can call this method to signal onComplete, delegating a + * notification from the container when all data has been read. */ public void onAllDataRead() { State state = this.state.get(); @@ -135,7 +137,8 @@ public void onAllDataRead() { } /** - * Subclasses can call this to delegate container error notifications. + * Subclasses can call this to signal onError, delegating a + * notification from the container for an error. */ public final void onError(Throwable ex) { State state = this.state.get(); @@ -183,10 +186,10 @@ public final void onError(Throwable ex) { // Private methods for use in State... /** - * Read and publish data one at a time until there is no more data, no more - * demand, or perhaps we completed meanwhile. - * @return {@code true} if there is more demand; {@code false} if there is - * no more demand or we have completed. + * Read and publish data one by one until there are no more items + * to read (i.e. input queue drained), or there is no more demand. + * @return {@code true} if there is demand but no more to read, or + * {@code false} if there is more to read but lack of demand. */ private boolean readAndPublish() throws IOException { long r; @@ -269,7 +272,7 @@ private final class ReadSubscription implements Subscription { @Override - public final void request(long n) { + public void request(long n) { if (rsReadLogger.isTraceEnabled()) { rsReadLogger.trace(getLogPrefix() + "request " + (n != Long.MAX_VALUE ? n : "Long.MAX_VALUE")); } @@ -277,7 +280,7 @@ public final void request(long n) { } @Override - public final void cancel() { + public void cancel() { State state = AbstractListenerReadPublisher.this.state.get(); if (rsReadLogger.isTraceEnabled()) { rsReadLogger.trace(getLogPrefix() + "cancel [" + state + "]"); @@ -288,7 +291,7 @@ public final void cancel() { /** - * Represents a state for the {@link Publisher} to be in. + * The states that a read {@link Publisher} transitions through. *

 	 *        UNSUBSCRIBED
 	 *             |
diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
index 4c54f726e02f..53181c21fee9 100644
--- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
+++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
@@ -112,7 +112,8 @@ protected WebSocketSendProcessor getSendProcessor() {
 
 	@Override
 	public Flux receive() {
-		return (canSuspendReceiving() ? Flux.from(this.receivePublisher) :
+		return (canSuspendReceiving() ?
+				Flux.from(this.receivePublisher) :
 				Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE));
 	}
 
@@ -240,6 +241,9 @@ public void onComplete() {
 	}
 
 
+	/**
+	 * Read publisher for inbound WebSocket messages.
+	 */
 	private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher {
 
 		private volatile Queue pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get();
@@ -269,7 +273,7 @@ protected void readingPaused() {
 
 		@Override
 		@Nullable
-		protected WebSocketMessage read() throws IOException {
+		protected WebSocketMessage read() {
 			return (WebSocketMessage) this.pendingMessages.poll();
 		}
 
@@ -304,7 +308,7 @@ protected void discardData() {
 
 
 	/**
-	 * Processor to send web socket messages.
+	 * Write processor for outbound WebSocket messages.
 	 */
 	protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor {