From 3b53f52b075d1fa5d31385a05e0f7c67aa22ef01 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 26 Feb 2021 15:16:15 +0200 Subject: [PATCH] improves dispose impl Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../rsocket/internal/UnboundedProcessor.java | 51 +++++++++++++++---- .../internal/subscriber/AssertSubscriber.java | 2 +- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index cd1fc1ab3..9112c2288 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -20,6 +20,7 @@ import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue; import java.util.Objects; import java.util.Queue; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.reactivestreams.Subscriber; @@ -162,7 +163,6 @@ void drainFused(Subscriber a) { for (; ; ) { if (cancelled) { - this.clear(); hasDownstream = false; return; } @@ -176,6 +176,7 @@ void drainFused(Subscriber a) { Throwable ex = error; if (ex != null) { + System.out.println("Send Error"); a.onError(ex); } else { a.onComplete(); @@ -352,12 +353,10 @@ public void cancel() { } cancelled = true; - if (outputFused) { - return; - } - if (WIP.getAndIncrement(this) == 0) { - this.clear(); + if (!outputFused) { + this.clear(); + } hasDownstream = false; } } @@ -422,11 +421,43 @@ public int requestFusion(int requestedMode) { @Override public void dispose() { - try { - super.dispose(); - } catch (Throwable ignored) { + if (cancelled) { + return; + } + + error = new CancellationException("Disposed"); + done = true; + + boolean once = true; + if (WIP.getAndIncrement(this) == 0) { + cancelled = true; + int m = 1; + for (; ; ) { + final CoreSubscriber a = this.actual; + + if (!outputFused) { + clear(); + } + + if (a != null && once) { + try { + a.onError(error); + } catch (Throwable ignored) { + } + } + + cancelled = true; + once = false; + + int wip = this.wip; + if (wip == m) { + break; + } + m = wip; + } + + hasDownstream = false; } - cancel(); } @Override diff --git a/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java b/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java index 13e48f607..83d420d90 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java @@ -933,7 +933,7 @@ void drain() { } T t; - int m = 0; + int m = 1; for (; ; ) { if (isCancelled()) { qs.clear();