From b07d742d95049f4b984d56e597c0a7f8af60b590 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Sun, 1 Mar 2020 10:30:30 +0700 Subject: [PATCH] bugfixes ReconnectingRSocket Signed-off-by: Oleh Dokuka --- .../io/rsocket/util/ReconnectingRSocket.java | 452 +++++++++++------- .../rsocket/util/ReconnectingRSocketTest.java | 173 ++++++- 2 files changed, 452 insertions(+), 173 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/util/ReconnectingRSocket.java b/rsocket-core/src/main/java/io/rsocket/util/ReconnectingRSocket.java index 22004de99..7be23639f 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/ReconnectingRSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/util/ReconnectingRSocket.java @@ -13,14 +13,16 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; + +import io.rsocket.frame.FrameType; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; -import reactor.core.Exceptions; +import reactor.core.Disposable; import reactor.core.Scannable; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; @@ -31,23 +33,18 @@ import reactor.util.context.Context; @SuppressWarnings("unchecked") -public class ReconnectingRSocket implements CoreSubscriber, RSocket, Runnable { +public final class ReconnectingRSocket extends ReconnectingSubscriber implements RSocket { - private final long backoffMinInMillis; - private final long backoffMaxInMillis; private final Mono source; - private final Scheduler scheduler; private final Predicate errorPredicate; private final MonoProcessor onDispose; - RSocket value; volatile Consumer[] subscribers; - @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater SUBSCRIBERS = - AtomicReferenceFieldUpdater.newUpdater( - ReconnectingRSocket.class, Consumer[].class, "subscribers"); + AtomicReferenceFieldUpdater.newUpdater( + ReconnectingRSocket.class, Consumer[].class, "subscribers"); @SuppressWarnings("rawtypes") static final Consumer[] EMPTY_UNSUBSCRIBED = new Consumer[0]; @@ -55,10 +52,17 @@ public class ReconnectingRSocket implements CoreSubscriber, RSocket, Ru @SuppressWarnings("rawtypes") static final Consumer[] EMPTY_SUBSCRIBED = new Consumer[0]; + @SuppressWarnings("rawtypes") + static final Consumer[] READY = new Consumer[0]; + + static final int ADDED_STATE = 0; + static final int READY_STATE = 1; + static final int TERMINATED_STATE = 2; + @SuppressWarnings("rawtypes") static final Consumer[] TERMINATED = new Consumer[0]; - static final ClosedChannelException ON_CLOSE_EXCEPTION = new ClosedChannelException(); + OnCloseSubscriber onCloseSubscriber; public static Builder builder() { return new Builder(); @@ -71,132 +75,74 @@ public static Builder builder() { long backoffMinInMillis, long backoffMaxInMillis) { + super(backoffMinInMillis, backoffMaxInMillis, scheduler); + this.source = source; - this.backoffMinInMillis = backoffMinInMillis; - this.backoffMaxInMillis = backoffMaxInMillis; - this.scheduler = scheduler; this.errorPredicate = errorPredicate; this.onDispose = MonoProcessor.create(); - SUBSCRIBERS.lazySet(this, EMPTY_UNSUBSCRIBED); - } - - public void subscribe(Consumer actual) { - if (!add(actual)) { - actual.accept(value); - } - } - - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(Long.MAX_VALUE); - } - - @Override - public void onComplete() { - final RSocket value = this.value; - - if (value == null) { - reconnect(); - } else { - Consumer[] array = SUBSCRIBERS.getAndSet(this, TERMINATED); - value.onClose().subscribe(null, null, () -> resubscribeWhen(ON_CLOSE_EXCEPTION)); - for (Consumer as : array) { - as.accept(value); - } - } - } - @Override - public void onError(Throwable t) { - reconnect(); - } - - @Override - @SuppressWarnings("unchecked") - public void onNext(@Nullable RSocket value) { - this.value = value; - } - - @Override - public void run() { - source.subscribe(this); - } - - private void reconnect() { - ThreadLocalRandom random = ThreadLocalRandom.current(); - long nextRandomDelay = random.nextLong(backoffMinInMillis, backoffMaxInMillis); - scheduler.schedule(this, nextRandomDelay, TimeUnit.MILLISECONDS); - } - private boolean resubscribeWhen(Throwable throwable) { - if (onDispose.isDisposed()) { - return false; - } - - if (errorPredicate.test(throwable)) { - final Consumer[] subscribers = this.subscribers; - final RSocket current = this.value; - if ((current == null || current.isDisposed()) - && subscribers == TERMINATED - && SUBSCRIBERS.compareAndSet(this, TERMINATED, EMPTY_SUBSCRIBED)) { - this.value = null; - reconnect(); - } - return true; - } - return false; + SUBSCRIBERS.lazySet(this, EMPTY_UNSUBSCRIBED); } @Override public Mono fireAndForget(Payload payload) { - return new FlatMapInner<>( - this, rsocket -> rsocket.fireAndForget(payload), this::resubscribeWhen); + return new FlatMapInner<>(this, payload, FrameType.REQUEST_FNF); } @Override public Mono requestResponse(Payload payload) { - return new FlatMapInner<>( - this, rsocket -> rsocket.requestResponse(payload), this::resubscribeWhen); + return new FlatMapInner<>(this, payload, FrameType.REQUEST_RESPONSE); } @Override public Flux requestStream(Payload payload) { - return new FlatMapManyInner<>( - this, rSocket -> rSocket.requestStream(payload), this::resubscribeWhen); + return new FlatMapManyInner<>(this, payload, FrameType.REQUEST_STREAM); } @Override public Flux requestChannel(Publisher payloads) { - return new FlatMapManyInner<>( - this, rSocket -> rSocket.requestChannel(payloads), this::resubscribeWhen); + return new FlatMapManyInner<>(this, payloads, FrameType.REQUEST_CHANNEL); } @Override public Mono metadataPush(Payload payload) { - return new FlatMapInner<>( - this, rsocket -> rsocket.metadataPush(payload), this::resubscribeWhen); + return new FlatMapInner<>(this, payload, FrameType.METADATA_PUSH); } @Override public double availability() { RSocket rsocket = this.value; - return rsocket != null ? rsocket.availability() : 0d; + return rsocket != null ? rsocket.availability() : 0.0d; } @Override public void dispose() { - onDispose.dispose(); + Consumer[] consumers = SUBSCRIBERS.getAndSet(this, TERMINATED); + RSocket value = this.value; + OnCloseSubscriber onCloseSubscriber = this.onCloseSubscriber; + this.value = null; + this.onCloseSubscriber = null; + this.onDispose.dispose(); + if (value != null) { + onCloseSubscriber.dispose(); value.dispose(); } + + if (consumers != TERMINATED && consumers != READY) { + for (Consumer consumer : consumers) { + consumer.accept(null); + } + } } @Override public boolean isDisposed() { - return onDispose.isDisposed(); + return this.onDispose.isDisposed(); } @Override @@ -204,12 +150,74 @@ public Mono onClose() { return onDispose; } - boolean add(Consumer ps) { - for (; ; ) { - Consumer[] a = subscribers; + void subscribe(Consumer actual) { + final int state = add(actual); + + if (state == READY_STATE) { + actual.accept(value); + } else if (state == TERMINATED_STATE) { + actual.accept(null); + } + } + + @Override + void complete(RSocket rSocket) { + onCloseSubscriber = new OnCloseSubscriber(this, rSocket); + + // happens-before write so all non-volatile writes are going to be available on this volatile field read + Consumer[] consumers = SUBSCRIBERS.getAndSet(this, READY); + + if (consumers == TERMINATED) { + this.dispose(); + return; + } + + rSocket.onClose().subscribe(onCloseSubscriber); + + for (Consumer as : consumers) { + as.accept(rSocket); + } + } + + boolean tryResubscribe(RSocket rSocket, Throwable throwable) { + + if (this.subscribers == TERMINATED) { + return false; + } + + if (this.errorPredicate.test(throwable)) { + final Consumer[] subscribers = this.subscribers; + final RSocket currentRSocket = this.value; + + if (currentRSocket == rSocket && subscribers == READY && SUBSCRIBERS.compareAndSet(this, READY, EMPTY_SUBSCRIBED)) { + // dispose listener to avoid double retry in case reconnection happens earlier that last RSocket is going to be disposed + this.onCloseSubscriber.dispose(); + this.onCloseSubscriber = null; + + // need to be disposed just in case a given error was considered as one to resubscribe + if (!currentRSocket.isDisposed()) { + currentRSocket.dispose(); + } + this.value = null; + + reconnect(); + } + + return true; + } + return false; + } + + int add(Consumer ps) { + for (;;) { + Consumer[] a = this.subscribers; if (a == TERMINATED) { - return false; + return TERMINATED_STATE; + } + + if (a == READY) { + return READY_STATE; } int n = a.length; @@ -220,24 +228,28 @@ boolean add(Consumer ps) { if (SUBSCRIBERS.compareAndSet(this, a, b)) { if (a == EMPTY_UNSUBSCRIBED) { - source.subscribe(this); + this.source.subscribe(this); } - return true; + return ADDED_STATE; } } } + @Override + public void run() { + this.source.subscribe(this); + } + static final class FlatMapInner extends Mono implements CoreSubscriber, Consumer, Subscription, Scannable { final ReconnectingRSocket parent; - final Function> mapper; - final Predicate errorPredicate; + final FrameType interactionType; + final Payload payload; boolean done; volatile int state; - @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(FlatMapInner.class, "state"); @@ -246,16 +258,17 @@ static final class FlatMapInner extends Mono static final int SUBSCRIBED = 1; static final int CANCELLED = 2; + RSocket rSocket; CoreSubscriber actual; Subscription s; FlatMapInner( - ReconnectingRSocket parent, - Function> mapper, - Predicate errorPredicate) { + ReconnectingRSocket parent, + Payload payload, + FrameType interactionType) { this.parent = parent; - this.mapper = mapper; - this.errorPredicate = errorPredicate; + this.payload = payload; + this.interactionType = interactionType; } @Override @@ -274,14 +287,25 @@ public void accept(RSocket rSocket) { Operators.error(actual, new CancellationException("Disposed")); } - Mono source; - try { - source = this.mapper.apply(rSocket); - source.subscribe(this); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - Operators.error(actual, e); + this.rSocket = rSocket; + + Mono source; + switch (interactionType) { + case REQUEST_FNF: + source = rSocket.fireAndForget(payload); + break; + case REQUEST_RESPONSE: + source = rSocket.requestResponse(payload); + break; + case METADATA_PUSH: + source = rSocket.metadataPush(payload); + break; + default: + Operators.error(this.actual, new IllegalStateException("Should never happen")); + return; + } + source.subscribe((CoreSubscriber) this); } @Override @@ -308,26 +332,28 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T payload) { - if (done) { - Operators.onNextDropped(payload, actual.currentContext()); + if (this.done) { + Operators.onNextDropped(payload, this.actual.currentContext()); return; } - done = true; - actual.onNext(payload); + + this.done = true; + this.actual.onNext(payload); + this.actual.onComplete(); } @Override public void onError(Throwable t) { - if (done) { - Operators.onErrorDropped(t, actual.currentContext()); + if (this.done) { + Operators.onErrorDropped(t, this.actual.currentContext()); return; } final CoreSubscriber actual = this.actual; - if (errorPredicate.test(t)) { + if (this.parent.tryResubscribe(rSocket, t)) { this.actual = null; - STATE.compareAndSet(this, SUBSCRIBED, NONE); + this.state = NONE; } else { done = true; } @@ -336,61 +362,61 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { + if (this.done) { return; } - done = true; - actual.onComplete(); + + this.done = true; + this.actual.onComplete(); } @Override public void request(long n) { - s.request(n); + this.s.request(n); } public void cancel() { if (STATE.getAndSet(this, CANCELLED) != CANCELLED) { - s.cancel(); + this.s.cancel(); } } } - static final class FlatMapManyInner extends Flux - implements CoreSubscriber, Consumer, Subscription, Scannable { + static final class FlatMapManyInner extends Flux + implements CoreSubscriber, Consumer, Subscription, Scannable { final ReconnectingRSocket parent; - final Function> mapper; - final Predicate errorPredicate; - - boolean done; + final FrameType interactionType; + final T fluxOrPayload; volatile int state; - @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(FlatMapManyInner.class, "state"); - static final int NONE = 0; static final int SUBSCRIBED = 1; static final int CANCELLED = 2; - CoreSubscriber actual; + + RSocket rSocket; + boolean done; + CoreSubscriber actual; Subscription s; FlatMapManyInner( - ReconnectingRSocket parent, - Function> mapper, - Predicate errorPredicate) { + ReconnectingRSocket parent, + T fluxOrPayload, + FrameType interactionType) { this.parent = parent; - this.mapper = mapper; - this.errorPredicate = errorPredicate; + this.fluxOrPayload = fluxOrPayload; + this.interactionType = interactionType; } @Override - public void subscribe(CoreSubscriber actual) { - if (state == NONE && STATE.compareAndSet(this, NONE, SUBSCRIBED)) { + public void subscribe(CoreSubscriber actual) { + if (this.state == NONE && STATE.compareAndSet(this, NONE, SUBSCRIBED)) { this.actual = actual; - parent.subscribe(this); + this.parent.subscribe(this); } else { Operators.error(actual, new IllegalStateException("Only a single Subscriber allowed")); } @@ -399,31 +425,42 @@ public void subscribe(CoreSubscriber actual) { @Override public void accept(RSocket rSocket) { if (rSocket == null) { - Operators.error(actual, new CancellationException("Disposed")); + Operators.error(this.actual, new CancellationException("Disposed")); } - Flux source; - try { - source = this.mapper.apply(rSocket); - source.subscribe(this); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - Operators.error(actual, e); + this.rSocket = rSocket; + + Flux source; + switch (interactionType) { + case REQUEST_STREAM: + source = rSocket.requestStream((Payload) fluxOrPayload); + break; + case REQUEST_CHANNEL: + source = rSocket.requestChannel((Flux) fluxOrPayload); + break; + default: + Operators.error(this.actual, new IllegalStateException("Should never happen")); + return; + } + + source.subscribe(this); } @Override public Context currentContext() { - return actual.currentContext(); + return this.actual.currentContext(); } @Nullable @Override public Object scanUnsafe(Attr key) { - if (key == Attr.PARENT) return s; - if (key == Attr.ACTUAL) return parent; - if (key == Attr.TERMINATED) return done; - if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription(); + int state = this.state; + + if (key == Attr.PARENT) return this.s; + if (key == Attr.ACTUAL) return this.parent; + if (key == Attr.TERMINATED) return this.done; + if (key == Attr.CANCELLED) return state == CANCELLED; return null; } @@ -431,49 +468,49 @@ public Object scanUnsafe(Attr key) { @Override public void onSubscribe(Subscription s) { this.s = s; - actual.onSubscribe(this); + this.actual.onSubscribe(this); } @Override - public void onNext(T payload) { - actual.onNext(payload); + public void onNext(Payload payload) { + this.actual.onNext(payload); } @Override public void onError(Throwable t) { - if (done) { - Operators.onErrorDropped(t, actual.currentContext()); + if (this.done) { + Operators.onErrorDropped(t, this.actual.currentContext()); return; } - final CoreSubscriber actual = this.actual; + final CoreSubscriber actual = this.actual; - if (errorPredicate.test(t)) { + if (this.parent.tryResubscribe(rSocket, t)) { this.actual = null; - STATE.compareAndSet(this, SUBSCRIBED, NONE); + this.state = NONE; } else { - done = true; + this.done = true; } actual.onError(t); } @Override public void onComplete() { - if (done) { + if (this.done) { return; } - done = true; - actual.onComplete(); + this.done = true; + this.actual.onComplete(); } @Override public void request(long n) { - s.request(n); + this.s.request(n); } public void cancel() { if (STATE.getAndSet(this, CANCELLED) != CANCELLED) { - s.cancel(); + this.s.cancel(); } } } @@ -536,4 +573,77 @@ public ReconnectingRSocket build() { backoffMax.toMillis()); } } + + final static class OnCloseSubscriber extends BaseSubscriber { + final ReconnectingRSocket parent; + final RSocket rSocket; + + OnCloseSubscriber(ReconnectingRSocket parent, RSocket rSocket) { + this.parent = parent; + this.rSocket = rSocket; + } + + @Override + protected void hookOnComplete() { + if (!parent.tryResubscribe(rSocket, ON_CLOSE_EXCEPTION)) { + this.dispose(); + } + } + } } + +abstract class ReconnectingSubscriber implements CoreSubscriber, Runnable, Disposable { + + final long backoffMinInMillis; + final long backoffMaxInMillis; + final Scheduler scheduler; + + RSocket value; + + static final ClosedChannelException ON_CLOSE_EXCEPTION = new ClosedChannelException(); + + ReconnectingSubscriber(long backoffMinInMillis, long backoffMaxInMillis, Scheduler scheduler) { + this.backoffMinInMillis = backoffMinInMillis; + this.backoffMaxInMillis = backoffMaxInMillis; + this.scheduler = scheduler; + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onComplete() { + final RSocket value = this.value; + + if (value == null) { + reconnect(); + } else { + complete(value); + } + } + + abstract void complete(RSocket rSocket); + + @Override + public void onError(Throwable t) { + this.value = null; + reconnect(); + } + + @Override + public void onNext(@Nullable RSocket value) { + this.value = value; + } + + void reconnect() { + final ThreadLocalRandom random = ThreadLocalRandom.current(); + final long min = this.backoffMinInMillis; + final long max = this.backoffMaxInMillis; + final long nextRandomDelay = min == max ? min : random.nextLong(min, max); + + this.scheduler.schedule(this, nextRandomDelay, TimeUnit.MILLISECONDS); + } +} + diff --git a/rsocket-core/src/test/java/io/rsocket/util/ReconnectingRSocketTest.java b/rsocket-core/src/test/java/io/rsocket/util/ReconnectingRSocketTest.java index 009851e6f..448edfde7 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/ReconnectingRSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/ReconnectingRSocketTest.java @@ -4,6 +4,7 @@ import io.rsocket.RSocket; import java.nio.channels.ClosedChannelException; import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Supplier; import java.util.stream.Stream; @@ -13,11 +14,17 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; import reactor.core.CorePublisher; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; +import reactor.test.util.RaceTestUtils; class ReconnectingRSocketTest { @@ -146,12 +153,12 @@ public void testReconnectsWhenGotLogicalStreamError( new ReconnectingRSocket( Mono.defer(() -> rSocketMonoMock.get()), t -> true, Schedulers.parallel(), 10, 20); - Mockito.verifyZeroInteractions(rSocketMock); + Mockito.verifyNoInteractions(rSocketMock); Assertions.assertThat(reconnectingRSocket.value).isNull(); CorePublisher corePublisher = invocation.apply(reconnectingRSocket, MONO_TEST_PAYLOAD); - Mockito.verifyZeroInteractions(rSocketMock); + Mockito.verifyNoInteractions(rSocketMock); Assertions.assertThat(reconnectingRSocket.value).isNull(); if (corePublisher instanceof Mono) { @@ -169,4 +176,166 @@ public void testReconnectsWhenGotLogicalStreamError( Mockito.verify(rSocketMonoMock, Mockito.times(4)).get(); } + + @Test + public void racingTest() throws InterruptedException { + Schedulers.parallel().start(); + AtomicLong subscribedTimes = new AtomicLong(); + MockSupplier mockRSocket = new MockSupplier(subscribedTimes); + + ReconnectingRSocket rSocket = ReconnectingRSocket.builder() + .withSourceRSocket(mockRSocket) + .withRetryOnScheduler(Schedulers.parallel()) + .withRetryPeriod(Duration.ofMillis(0)) + .build(); + + Assertions.assertThat(subscribedTimes.get()).isZero(); + + for (int i = 2; i <= 101; i++) { + Mono instance = rSocket.requestResponse(EmptyPayload.INSTANCE); + instance.subscribe(new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Payload payload) { + + } + + @Override + public void onError(Throwable t) { + instance.subscribe(this); + } + + @Override + public void onComplete() { + + } + }); + + Mono instance2 = rSocket.requestResponse(EmptyPayload.INSTANCE); + instance2.subscribe(new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Payload payload) { + + } + + @Override + public void onError(Throwable t) { + instance2.subscribe(this); + } + + @Override + public void onComplete() { + + } + }); + +// System.out.println(i); + mockRSocket.lastSuppliedRSocket.dispose(); + while (i != subscribedTimes.get()) { + Thread.sleep(10); + } + } + + Assertions.assertThat(subscribedTimes.get()).isEqualTo(101); + } + + static class MockSupplier implements Supplier> { + + final AtomicLong subscribedCounted; + RSocket lastSuppliedRSocket; + + MockSupplier(AtomicLong subscribedCounted) { + this.subscribedCounted = subscribedCounted; + } + + public Mono get() { +// System.out.println("Subscribed"); + lastSuppliedRSocket = new MockRSocket(TestPublisher.create(), MonoProcessor.create()); + subscribedCounted.incrementAndGet(); + return Mono.just(lastSuppliedRSocket).publishOn(Schedulers.parallel()); + } + + } + + static class MockRSocket implements RSocket { + + static final ClosedChannelException ERROR = new ClosedChannelException(); + + final TestPublisher testPublisher; + final MonoProcessor onDispose; + + MockRSocket(TestPublisher testPublisher, MonoProcessor onDispose) { + this.testPublisher = testPublisher; + this.onDispose = onDispose; + } + + @Override + public Mono fireAndForget(Payload payload) { + return testPublisher.mono().publishOn(Schedulers.parallel()); + } + + @Override + public Mono requestResponse(Payload payload) { + return testPublisher.mono().publishOn(Schedulers.parallel()); + } + + @Override + public Flux requestStream(Payload payload) { + return testPublisher.flux().publishOn(Schedulers.parallel()); + } + + @Override + public Flux requestChannel(Publisher payloads) { + return Flux.from(payloads) + .switchOnFirst((s, f) -> testPublisher.flux()); + } + + @Override + public Mono metadataPush(Payload payload) { + return testPublisher.mono().publishOn(Schedulers.parallel()); + } + + @Override + public double availability() { + return 1.0D; + } + + @Override + public Mono onClose() { + return onDispose; + } + + @Override + public void dispose() { +// printStackTrace(); + RaceTestUtils.race(onDispose::onComplete, () -> testPublisher.error(ERROR), Schedulers.elastic()); + } + + @Override + public boolean isDisposed() { + return onDispose.isDisposed(); + } + + private void printStackTrace() { + StringBuilder s = new StringBuilder(); + StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + for (StackTraceElement traceElement : trace) { + s.append("\tat " + traceElement + "\n\r"); + if (traceElement.toString().contains("io.rsocket.util.ReconnectingRSocketTest.racingTest")) { + break; + } + } + + System.err.println(s); + } + } }