Skip to content

Commit

Permalink
improves BaseDuplexConnection and fixes PingClient impl (#1062)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegDokuka authored Aug 20, 2022
1 parent 1fe2d64 commit d330a32
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,10 @@
import reactor.core.publisher.Sinks;

public abstract class BaseDuplexConnection implements DuplexConnection {
protected Sinks.Empty<Void> onClose = Sinks.empty();
protected final Sinks.Empty<Void> onClose = Sinks.empty();
protected final UnboundedProcessor sender = new UnboundedProcessor(onClose::tryEmitEmpty);

protected UnboundedProcessor sender = new UnboundedProcessor();

public BaseDuplexConnection() {
onClose().doFinally(s -> doOnClose()).subscribe();
}
public BaseDuplexConnection() {}

@Override
public void sendFrame(int streamId, ByteBuf frame) {
Expand All @@ -48,7 +45,7 @@ public final Mono<Void> onClose() {

@Override
public final void dispose() {
onClose.tryEmitEmpty();
doOnClose();
}

@Override
Expand Down
10 changes: 7 additions & 3 deletions rsocket-test/src/main/java/io/rsocket/test/PingClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ Flux<Payload> pingPong(
BiFunction<RSocket, ? super Payload, ? extends Publisher<Payload>> interaction,
int count,
final Recorder histogram) {
return client
.flatMapMany(
return Flux.usingWhen(
client,
rsocket ->
Flux.range(1, count)
.flatMap(
Expand All @@ -78,7 +78,11 @@ Flux<Payload> pingPong(
histogram.recordValue(diff);
});
},
64))
64),
rsocket -> {
rsocket.dispose();
return rsocket.onClose();
})
.doOnError(Throwable::printStackTrace);
}
}

0 comments on commit d330a32

Please sign in to comment.