Skip to content

Commit

Permalink
improves TransportTest
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Nov 12, 2021
1 parent 2989042 commit 834284d
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions rsocket-test/src/main/java/io/rsocket/test/TransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -96,6 +97,7 @@ static String read(String resourceName) {
default void close() {
getTransportPair().responder.awaitAllInteractionTermination(getTimeout());
getTransportPair().dispose();
Schedulers.shutdownNow();
getTransportPair().awaitClosed();
RuntimeException throwable =
new RuntimeException() {
Expand Down Expand Up @@ -710,6 +712,7 @@ private static class DisconnectingDuplexConnection implements DuplexConnection {
private final String tag;
final DuplexConnection source;
final Duration delay;
final Disposable.Swap disposables = Disposables.swap();

DisconnectingDuplexConnection(String tag, DuplexConnection source, Duration delay) {
this.tag = tag;
Expand All @@ -719,6 +722,7 @@ private static class DisconnectingDuplexConnection implements DuplexConnection {

@Override
public void dispose() {
disposables.dispose();
source.dispose();
}

Expand Down Expand Up @@ -747,14 +751,16 @@ public Flux<ByteBuf> receive() {
bb -> {
if (!receivedFirst) {
receivedFirst = true;
Mono.delay(delay)
.takeUntilOther(source.onClose())
.subscribe(
__ -> {
logger.warn(
"Tag {}. Disposing Connection[{}]", tag, source.hashCode());
source.dispose();
});
disposables.replace(
Mono.delay(delay)
.takeUntilOther(source.onClose())
.subscribe(
__ -> {
logger.warn(
"Tag {}. Disposing Connection[{}]", tag, source.hashCode());
source.dispose();
})
);
}
});
}
Expand Down

0 comments on commit 834284d

Please sign in to comment.