Skip to content

Commit

Permalink
Polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Jan 17, 2024
1 parent c4a34fa commit 682f471
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Nullable
private volatile Subscriber<? super T> subscriber;

/** Flag to defer transition to COMPLETED briefly while SUBSCRIBING or READING. */
private volatile boolean completionPending;

/** Flag to defer transition to COMPLETED briefly while SUBSCRIBING or READING. */
@Nullable
private volatile Throwable errorPending;

Expand Down Expand Up @@ -123,8 +125,8 @@ public final void onDataAvailable() {
}

/**
* Subclasses can call this method to delegate a container notification when
* all data has been read.
* Subclasses can call this method to signal onComplete, delegating a
* notification from the container when all data has been read.
*/
public void onAllDataRead() {
State state = this.state.get();
Expand All @@ -135,7 +137,8 @@ public void onAllDataRead() {
}

/**
* Subclasses can call this to delegate container error notifications.
* Subclasses can call this to signal onError, delegating a
* notification from the container for an error.
*/
public final void onError(Throwable ex) {
State state = this.state.get();
Expand Down Expand Up @@ -183,10 +186,10 @@ public final void onError(Throwable ex) {
// Private methods for use in State...

/**
* Read and publish data one at a time until there is no more data, no more
* demand, or perhaps we completed meanwhile.
* @return {@code true} if there is more demand; {@code false} if there is
* no more demand or we have completed.
* Read and publish data one by one until there are no more items
* to read (i.e. input queue drained), or there is no more demand.
* @return {@code true} if there is demand but no more to read, or
* {@code false} if there is more to read but lack of demand.
*/
private boolean readAndPublish() throws IOException {
long r;
Expand Down Expand Up @@ -269,15 +272,15 @@ private final class ReadSubscription implements Subscription {


@Override
public final void request(long n) {
public void request(long n) {
if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(getLogPrefix() + "request " + (n != Long.MAX_VALUE ? n : "Long.MAX_VALUE"));
}
state.get().request(AbstractListenerReadPublisher.this, n);
}

@Override
public final void cancel() {
public void cancel() {
State state = AbstractListenerReadPublisher.this.state.get();
if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(getLogPrefix() + "cancel [" + state + "]");
Expand All @@ -288,7 +291,7 @@ public final void cancel() {


/**
* Represents a state for the {@link Publisher} to be in.
* The states that a read {@link Publisher} transitions through.
* <p><pre>
* UNSUBSCRIBED
* |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ protected WebSocketSendProcessor getSendProcessor() {

@Override
public Flux<WebSocketMessage> receive() {
return (canSuspendReceiving() ? Flux.from(this.receivePublisher) :
return (canSuspendReceiving() ?
Flux.from(this.receivePublisher) :
Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE));
}

Expand Down Expand Up @@ -240,6 +241,9 @@ public void onComplete() {
}


/**
* Read publisher for inbound WebSocket messages.
*/
private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {

private volatile Queue<Object> pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get();
Expand Down Expand Up @@ -269,7 +273,7 @@ protected void readingPaused() {

@Override
@Nullable
protected WebSocketMessage read() throws IOException {
protected WebSocketMessage read() {
return (WebSocketMessage) this.pendingMessages.poll();
}

Expand Down Expand Up @@ -304,7 +308,7 @@ protected void discardData() {


/**
* Processor to send web socket messages.
* Write processor for outbound WebSocket messages.
*/
protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {

Expand Down

0 comments on commit 682f471

Please sign in to comment.