Skip to content

Commit

Permalink
Address comments on SurroundingPublisher
Browse files Browse the repository at this point in the history
  • Loading branch information
injae-kim committed May 15, 2023
1 parent c1bc56e commit ab6abc7
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ static final class SurroundingSubscriber<T> implements Subscriber<T>, Subscripti
AtomicLongFieldUpdater.newUpdater(SurroundingSubscriber.class, "needToPublish");

enum State {
SENDING_HEAD,
SENDING_BODY,
SENDING_TAIL,
SENDING_COMPLETE,
REQUIRE_HEAD,
REQUIRE_BODY,
REQUIRE_TAIL,
REQUIRE_COMPLETE,
DONE,
}

private volatile State state = State.SENDING_HEAD;
private volatile State state;

@Nullable
private final T head;
Expand All @@ -85,12 +85,14 @@ enum State {

private volatile long requested;
private volatile long needToPublish;
private volatile boolean subscribed;
private volatile boolean cancelled;

SurroundingSubscriber(@Nullable T head, Publisher<? extends T> publisher, @Nullable T tail,
Subscriber<? super T> downstream) {
requireNonNull(publisher, "publisher");
requireNonNull(downstream, "downstream");
state = head != null ? State.REQUIRE_HEAD : State.REQUIRE_BODY;
this.head = head;
this.publisher = publisher;
this.tail = tail;
Expand All @@ -117,53 +119,65 @@ public void request(long n) {
}
}

switch (state) {
case SENDING_HEAD: {
setState(State.SENDING_HEAD, State.SENDING_BODY);
if (head != null) {
downstream.onNext(head);
requestedUpdater.decrementAndGet(this);
}
publisher.subscribe(this);
return;
}
case SENDING_BODY: {
requestUpstream(upstream);
publish();
}

private void publish() {
for (;;) {
if (requested <= 0) {
return;
}
case SENDING_TAIL: {
sendTail();
if (n > 1) {
switch (state) {
case REQUIRE_HEAD: {
sendHead();
continue;
}
case REQUIRE_BODY: {
if (!subscribed) {
subscribed = true;
publisher.subscribe(this);
return;
}
if (upstream != null) {
requestUpstream(upstream);
return;
}
}
case REQUIRE_TAIL: {
sendTail();
continue;
}
case REQUIRE_COMPLETE: {
sendComplete();
return;
}
case DONE: {
upstream.cancel();
return;
}
return;
}
case SENDING_COMPLETE: {
sendComplete();
return;
}
case DONE: {
upstream.cancel();
return;
}
}
}

private void sendHead() {
setState(State.REQUIRE_HEAD, State.REQUIRE_BODY);
requestedUpdater.decrementAndGet(this);
downstream.onNext(head);
}

private void sendTail() {
setState(State.SENDING_TAIL, State.SENDING_COMPLETE);
setState(State.REQUIRE_TAIL, State.REQUIRE_COMPLETE);
requestedUpdater.decrementAndGet(this);
if (tail != null) {
requestedUpdater.decrementAndGet(this);
downstream.onNext(tail);
} else {
sendComplete();
}
}

private void sendComplete() {
if (state == State.DONE) {
return;
}
setState(State.SENDING_COMPLETE, State.DONE);
setState(State.REQUIRE_COMPLETE, State.DONE);
requestedUpdater.decrementAndGet(this);
downstream.onComplete();
}

Expand All @@ -173,18 +187,13 @@ private void requestUpstream(Subscription subscription) {
if (requested == 0) {
return;
}
if (state == State.SENDING_TAIL) {
sendTail();
if (requested > 1) {
sendComplete();
}
return;
}
if (requestedUpdater.compareAndSet(this, requested, 0)) {
if (needToPublishUpdater.get(this) > Long.MAX_VALUE - requested) {
needToPublishUpdater.set(this, Long.MAX_VALUE);
} else {
needToPublishUpdater.addAndGet(this, requested);
for (;;) {
final long oldNeedToPublish = needToPublish;
final long newNeedToPublish = LongMath.saturatedAdd(oldNeedToPublish, requested);
if (needToPublishUpdater.compareAndSet(this, oldNeedToPublish, newNeedToPublish)) {
break;
}
}
subscription.request(requested);
return;
Expand Down Expand Up @@ -222,15 +231,10 @@ public void onError(Throwable cause) {

@Override
public void onComplete() {
final long needToPublish = needToPublishUpdater.get(this);
setState(State.SENDING_BODY, State.SENDING_TAIL);
if (needToPublish == 0) {
return;
}

sendTail();
if (needToPublish > 1) {
sendComplete();
setState(State.REQUIRE_BODY, State.REQUIRE_TAIL);
if (needToPublish > 0) {
requestedUpdater.addAndGet(this, needToPublish);
publish();
}
}

Expand All @@ -248,7 +252,7 @@ public void cancel() {
}

private boolean setState(State oldState, State newState) {
assert newState != State.SENDING_HEAD : "oldState: " + oldState + ", newState: " + newState;
assert newState != State.REQUIRE_HEAD : "oldState: " + oldState + ", newState: " + newState;
return stateUpdater.compareAndSet(this, oldState, newState);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Publisher<Object> createPublisher(long elements) {
return new SurroundingPublisher<>("head", Mono.empty(), null);
}
if (elements == 2) {
return new SurroundingPublisher<>("head", Mono.empty(), "tail");
return new SurroundingPublisher<>(null, Mono.just(1), "tail");
}
return new SurroundingPublisher<>("head",
Flux.fromStream(LongStream.range(0, elements - 2).boxed()),
Expand Down

0 comments on commit ab6abc7

Please sign in to comment.