Skip to content

Commit

Permalink
improves dispose impl
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Feb 26, 2021
1 parent 0265b79 commit 3b53f52
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -162,7 +163,6 @@ void drainFused(Subscriber<? super T> a) {
for (; ; ) {

if (cancelled) {
this.clear();
hasDownstream = false;
return;
}
Expand All @@ -176,6 +176,7 @@ void drainFused(Subscriber<? super T> a) {

Throwable ex = error;
if (ex != null) {
System.out.println("Send Error");
a.onError(ex);
} else {
a.onComplete();
Expand Down Expand Up @@ -352,12 +353,10 @@ public void cancel() {
}
cancelled = true;

if (outputFused) {
return;
}

if (WIP.getAndIncrement(this) == 0) {
this.clear();
if (!outputFused) {
this.clear();
}
hasDownstream = false;
}
}
Expand Down Expand Up @@ -422,11 +421,43 @@ public int requestFusion(int requestedMode) {

@Override
public void dispose() {
try {
super.dispose();
} catch (Throwable ignored) {
if (cancelled) {
return;
}

error = new CancellationException("Disposed");
done = true;

boolean once = true;
if (WIP.getAndIncrement(this) == 0) {
cancelled = true;
int m = 1;
for (; ; ) {
final CoreSubscriber<? super T> a = this.actual;

if (!outputFused) {
clear();
}

if (a != null && once) {
try {
a.onError(error);
} catch (Throwable ignored) {
}
}

cancelled = true;
once = false;

int wip = this.wip;
if (wip == m) {
break;
}
m = wip;
}

hasDownstream = false;
}
cancel();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ void drain() {
}

T t;
int m = 0;
int m = 1;
for (; ; ) {
if (isCancelled()) {
qs.clear();
Expand Down

0 comments on commit 3b53f52

Please sign in to comment.