From 834284dae942855400ec4955393c2669f6b08b5b Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 12 Nov 2021 12:45:06 +0200 Subject: [PATCH] improves TransportTest Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/test/TransportTest.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 725d621f5..990e4df34 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -57,6 +57,7 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.Exceptions; import reactor.core.Fuseable; import reactor.core.publisher.Flux; @@ -96,6 +97,7 @@ static String read(String resourceName) { default void close() { getTransportPair().responder.awaitAllInteractionTermination(getTimeout()); getTransportPair().dispose(); + Schedulers.shutdownNow(); getTransportPair().awaitClosed(); RuntimeException throwable = new RuntimeException() { @@ -710,6 +712,7 @@ private static class DisconnectingDuplexConnection implements DuplexConnection { private final String tag; final DuplexConnection source; final Duration delay; + final Disposable.Swap disposables = Disposables.swap(); DisconnectingDuplexConnection(String tag, DuplexConnection source, Duration delay) { this.tag = tag; @@ -719,6 +722,7 @@ private static class DisconnectingDuplexConnection implements DuplexConnection { @Override public void dispose() { + disposables.dispose(); source.dispose(); } @@ -747,14 +751,16 @@ public Flux receive() { bb -> { if (!receivedFirst) { receivedFirst = true; - Mono.delay(delay) - .takeUntilOther(source.onClose()) - .subscribe( - __ -> { - logger.warn( - "Tag {}. Disposing Connection[{}]", tag, source.hashCode()); - source.dispose(); - }); + disposables.replace( + Mono.delay(delay) + .takeUntilOther(source.onClose()) + .subscribe( + __ -> { + logger.warn( + "Tag {}. Disposing Connection[{}]", tag, source.hashCode()); + source.dispose(); + }) + ); } }); }