diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 9ffe61334..539e104b4 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -322,7 +322,7 @@ public void accept(long n) { .transform( f -> { LimitableRequestPublisher wrapped = - LimitableRequestPublisher.wrap(f, Long.MAX_VALUE); + LimitableRequestPublisher.wrap(f); // Need to set this to one for first the frame wrapped.request(1); senders.put(streamId, wrapped); diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index 8fb887418..6ce5ef88d 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -451,7 +451,7 @@ private void handleStream(int streamId, Flux response, int initialReque .transform( frameFlux -> { LimitableRequestPublisher payloads = - LimitableRequestPublisher.wrap(frameFlux, Long.MAX_VALUE); + LimitableRequestPublisher.wrap(frameFlux); sendingLimitableSubscriptions.put(streamId, payloads); payloads.request( initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java b/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java index 2eafd3d61..8adb7542a 100755 --- a/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java @@ -56,6 +56,10 @@ public static LimitableRequestPublisher wrap(Publisher source, long pr return new LimitableRequestPublisher<>(source, prefetch); } + public static LimitableRequestPublisher wrap(Publisher source) { + return wrap(source, Long.MAX_VALUE); + } + @Override public void subscribe(CoreSubscriber destination) { synchronized (this) { diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index 6c849ab36..c60dba312 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -28,18 +28,14 @@ import io.rsocket.exceptions.ApplicationErrorException; import io.rsocket.exceptions.RejectedSetupException; import io.rsocket.frame.*; -import io.rsocket.test.util.TestDuplexConnection; import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -215,33 +211,6 @@ public void testChannelRequestServerSideCancellation() { Assertions.assertThat(request.isDisposed()).isTrue(); } - @Ignore - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testClientSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final Queue requests = new ConcurrentLinkedQueue<>(); - rule.connection.dispose(); - rule.connection = new TestDuplexConnection(); - rule.connection.setInitialSendRequestN(256); - rule.init(); - - rule.socket - .requestChannel( - Flux.generate(s -> s.next(EmptyPayload.INSTANCE)).doOnRequest(requests::add)) - .subscribe(); - - int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, 2)); - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - public int sendRequestResponse(Publisher response) { Subscriber sub = TestSubscriber.create(); response.subscribe(sub); diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java index db73fb3af..32c0406b9 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java @@ -29,17 +29,12 @@ import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; import java.util.Collection; -import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.assertj.core.api.Assertions; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; -import org.mockito.Mockito; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class RSocketServerTest { @@ -111,91 +106,6 @@ public Mono requestResponse(Payload payload) { assertThat("Subscription not cancelled.", cancelled.get(), is(true)); } - @Ignore - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testServerSideRequestStreamShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final int streamId = 5; - final Queue received = new ConcurrentLinkedQueue<>(); - final Queue requests = new ConcurrentLinkedQueue<>(); - - rule.setAcceptingSocket( - new AbstractRSocket() { - @Override - public Flux requestStream(Payload payload) { - return Flux.generate(s -> s.next(payload.retain())).doOnRequest(requests::add); - } - }, - 256); - - rule.sendRequest(streamId, FrameType.REQUEST_STREAM); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - Subscriber next = rule.connection.getSendSubscribers().iterator().next(); - - Mockito.doAnswer( - invocation -> { - received.add(invocation.getArgument(0)); - - if (received.size() == 256) { - throw new RuntimeException(); - } - - return null; - }) - .when(next) - .onNext(Mockito.any()); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - - @Ignore - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testServerSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final int streamId = 5; - final Queue received = new ConcurrentLinkedQueue<>(); - final Queue requests = new ConcurrentLinkedQueue<>(); - - rule.setAcceptingSocket( - new AbstractRSocket() { - @Override - public Flux requestChannel(Publisher payload) { - return Flux.generate(s -> s.next(EmptyPayload.INSTANCE)) - .doOnRequest(requests::add); - } - }, - 256); - - rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - Subscriber next = rule.connection.getSendSubscribers().iterator().next(); - - Mockito.doAnswer( - invocation -> { - received.add(invocation.getArgument(0)); - - if (received.size() == 256) { - throw new RuntimeException(); - } - - return null; - }) - .when(next) - .onNext(Mockito.any()); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - public static class ServerSocketRule extends AbstractSocketRule { private RSocket acceptingSocket;