Skip to content

Commit

Permalink
Properly handle completion while in READING state
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev authored and lxbzmy committed Mar 26, 2022
1 parent c90a2bf commit 28acfe8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Nullable
private volatile Subscriber<? super T> subscriber;

private volatile boolean completionBeforeDemand;
private volatile boolean completionPending;

@Nullable
private volatile Throwable errorBeforeDemand;
private volatile Throwable errorPending;

private final String logPrefix;

Expand Down Expand Up @@ -228,21 +228,24 @@ private void changeToDemandState(State oldState) {
}
}

private void handleCompletionOrErrorBeforeDemand() {
private boolean handlePendingCompletionOrError() {
State state = this.state.get();
if (!state.equals(State.UNSUBSCRIBED) && !state.equals(State.SUBSCRIBING)) {
if (this.completionBeforeDemand) {
rsReadLogger.trace(getLogPrefix() + "Completed before demand");
if (state.equals(State.DEMAND) || state.equals(State.NO_DEMAND)) {
if (this.completionPending) {
rsReadLogger.trace(getLogPrefix() + "Processing pending completion");
this.state.get().onAllDataRead(this);
return true;
}
Throwable ex = this.errorBeforeDemand;
Throwable ex = this.errorPending;
if (ex != null) {
if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(getLogPrefix() + "Completed with error before demand: " + ex);
rsReadLogger.trace(getLogPrefix() + "Processing pending completion with error: " + ex);
}
this.state.get().onError(this, ex);
return true;
}
}
return false;
}

private Subscription createSubscription() {
Expand Down Expand Up @@ -305,7 +308,7 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
publisher.changeState(SUBSCRIBING, NO_DEMAND);
publisher.handleCompletionOrErrorBeforeDemand();
publisher.handlePendingCompletionOrError();
}
else {
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
Expand All @@ -315,14 +318,14 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe

@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionBeforeDemand = true;
publisher.handleCompletionOrErrorBeforeDemand();
publisher.completionPending = true;
publisher.handlePendingCompletionOrError();
}

@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
publisher.handleCompletionOrErrorBeforeDemand();
publisher.errorPending = ex;
publisher.handlePendingCompletionOrError();
}
},

Expand All @@ -341,14 +344,14 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {

@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionBeforeDemand = true;
publisher.handleCompletionOrErrorBeforeDemand();
publisher.completionPending = true;
publisher.handlePendingCompletionOrError();
}

@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
publisher.handleCompletionOrErrorBeforeDemand();
publisher.errorPending = ex;
publisher.handlePendingCompletionOrError();
}
},

Expand Down Expand Up @@ -379,14 +382,17 @@ <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
boolean demandAvailable = publisher.readAndPublish();
if (demandAvailable) {
publisher.changeToDemandState(READING);
publisher.handlePendingCompletionOrError();
}
else {
publisher.readingPaused();
if (publisher.changeState(READING, NO_DEMAND)) {
// Demand may have arrived since readAndPublish returned
long r = publisher.demand;
if (r > 0) {
publisher.changeToDemandState(NO_DEMAND);
if (!publisher.handlePendingCompletionOrError()) {
// Demand may have arrived since readAndPublish returned
long r = publisher.demand;
if (r > 0) {
publisher.changeToDemandState(NO_DEMAND);
}
}
}
}
Expand All @@ -408,6 +414,18 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
publisher.changeToDemandState(NO_DEMAND);
}
}

@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionPending = true;
publisher.handlePendingCompletionOrError();
}

@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorPending = ex;
publisher.handlePendingCompletionOrError();
}
},

COMPLETED {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ public final void onComplete() {
* container.
*/
public final void onWritePossible() {
State state = this.state.get();
if (rsWriteLogger.isTraceEnabled()) {
rsWriteLogger.trace(getLogPrefix() + "onWritePossible");
rsWriteLogger.trace(getLogPrefix() + "onWritePossible [" + state + "]");
}
this.state.get().onWritePossible(this);
state.onWritePossible(this);
}

/**
Expand Down

0 comments on commit 28acfe8

Please sign in to comment.