From d330a324a7c506091b8e1da22b8676db1bee5042 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Sat, 20 Aug 2022 13:02:47 +0300 Subject: [PATCH] improves BaseDuplexConnection and fixes PingClient impl (#1062) --- .../io/rsocket/internal/BaseDuplexConnection.java | 11 ++++------- .../src/main/java/io/rsocket/test/PingClient.java | 10 +++++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java index 98bed7ba7..fc679c259 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java @@ -22,13 +22,10 @@ import reactor.core.publisher.Sinks; public abstract class BaseDuplexConnection implements DuplexConnection { - protected Sinks.Empty onClose = Sinks.empty(); + protected final Sinks.Empty onClose = Sinks.empty(); + protected final UnboundedProcessor sender = new UnboundedProcessor(onClose::tryEmitEmpty); - protected UnboundedProcessor sender = new UnboundedProcessor(); - - public BaseDuplexConnection() { - onClose().doFinally(s -> doOnClose()).subscribe(); - } + public BaseDuplexConnection() {} @Override public void sendFrame(int streamId, ByteBuf frame) { @@ -48,7 +45,7 @@ public final Mono onClose() { @Override public final void dispose() { - onClose.tryEmitEmpty(); + doOnClose(); } @Override diff --git a/rsocket-test/src/main/java/io/rsocket/test/PingClient.java b/rsocket-test/src/main/java/io/rsocket/test/PingClient.java index 9017e854b..14740950a 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/PingClient.java +++ b/rsocket-test/src/main/java/io/rsocket/test/PingClient.java @@ -63,8 +63,8 @@ Flux pingPong( BiFunction> interaction, int count, final Recorder histogram) { - return client - .flatMapMany( + return Flux.usingWhen( + client, rsocket -> Flux.range(1, count) .flatMap( @@ -78,7 +78,11 @@ Flux pingPong( histogram.recordValue(diff); }); }, - 64)) + 64), + rsocket -> { + rsocket.dispose(); + return rsocket.onClose(); + }) .doOnError(Throwable::printStackTrace); } }