Skip to content

Commit

Permalink
add LimitableRequestPublisher.wrap(Flux)
Browse files Browse the repository at this point in the history
remove rsocket-transport request coordination tests

Signed-off-by: Maksym Ostroverkhov <[email protected]>
  • Loading branch information
mostroverkhov committed May 20, 2019
1 parent a8c0a31 commit 115e836
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 123 deletions.
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public void accept(long n) {
.transform(
f -> {
LimitableRequestPublisher<Payload> wrapped =
LimitableRequestPublisher.wrap(f, Long.MAX_VALUE);
LimitableRequestPublisher.wrap(f);
// Need to set this to one for first the frame
wrapped.request(1);
senders.put(streamId, wrapped);
Expand Down
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
.transform(
frameFlux -> {
LimitableRequestPublisher<Payload> payloads =
LimitableRequestPublisher.wrap(frameFlux, Long.MAX_VALUE);
LimitableRequestPublisher.wrap(frameFlux);
sendingLimitableSubscriptions.put(streamId, payloads);
payloads.request(
initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public static <T> LimitableRequestPublisher<T> wrap(Publisher<T> source, long pr
return new LimitableRequestPublisher<>(source, prefetch);
}

public static <T> LimitableRequestPublisher<T> wrap(Publisher<T> source) {
return wrap(source, Long.MAX_VALUE);
}

@Override
public void subscribe(CoreSubscriber<? super T> destination) {
synchronized (this) {
Expand Down
31 changes: 0 additions & 31 deletions rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.frame.*;
import io.rsocket.test.util.TestDuplexConnection;
import io.rsocket.test.util.TestSubscriber;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -215,33 +211,6 @@ public void testChannelRequestServerSideCancellation() {
Assertions.assertThat(request.isDisposed()).isTrue();
}

@Ignore
@Test(timeout = 2_000)
@SuppressWarnings("unchecked")
public void
testClientSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() {
final Queue<Long> requests = new ConcurrentLinkedQueue<>();
rule.connection.dispose();
rule.connection = new TestDuplexConnection();
rule.connection.setInitialSendRequestN(256);
rule.init();

rule.socket
.requestChannel(
Flux.<Payload>generate(s -> s.next(EmptyPayload.INSTANCE)).doOnRequest(requests::add))
.subscribe();

int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);

assertThat("Unexpected error.", rule.errors, is(empty()));

rule.connection.addToReceivedBuffer(
RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, 2));
rule.connection.addToReceivedBuffer(
RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE));
Assertions.assertThat(requests).containsOnly(1L, 2L, 253L);
}

public int sendRequestResponse(Publisher<Payload> response) {
Subscriber<Payload> sub = TestSubscriber.create();
response.subscribe(sub);
Expand Down
90 changes: 0 additions & 90 deletions rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,12 @@
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RSocketServerTest {
Expand Down Expand Up @@ -111,91 +106,6 @@ public Mono<Payload> requestResponse(Payload payload) {
assertThat("Subscription not cancelled.", cancelled.get(), is(true));
}

@Ignore
@Test(timeout = 2_000)
@SuppressWarnings("unchecked")
public void
testServerSideRequestStreamShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() {
final int streamId = 5;
final Queue<Object> received = new ConcurrentLinkedQueue<>();
final Queue<Long> requests = new ConcurrentLinkedQueue<>();

rule.setAcceptingSocket(
new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.<Payload>generate(s -> s.next(payload.retain())).doOnRequest(requests::add);
}
},
256);

rule.sendRequest(streamId, FrameType.REQUEST_STREAM);

assertThat("Unexpected error.", rule.errors, is(empty()));

Subscriber next = rule.connection.getSendSubscribers().iterator().next();

Mockito.doAnswer(
invocation -> {
received.add(invocation.getArgument(0));

if (received.size() == 256) {
throw new RuntimeException();
}

return null;
})
.when(next)
.onNext(Mockito.any());

rule.connection.addToReceivedBuffer(
RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE));
Assertions.assertThat(requests).containsOnly(1L, 2L, 253L);
}

@Ignore
@Test(timeout = 2_000)
@SuppressWarnings("unchecked")
public void
testServerSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() {
final int streamId = 5;
final Queue<Object> received = new ConcurrentLinkedQueue<>();
final Queue<Long> requests = new ConcurrentLinkedQueue<>();

rule.setAcceptingSocket(
new AbstractRSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payload) {
return Flux.<Payload>generate(s -> s.next(EmptyPayload.INSTANCE))
.doOnRequest(requests::add);
}
},
256);

rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL);

assertThat("Unexpected error.", rule.errors, is(empty()));

Subscriber next = rule.connection.getSendSubscribers().iterator().next();

Mockito.doAnswer(
invocation -> {
received.add(invocation.getArgument(0));

if (received.size() == 256) {
throw new RuntimeException();
}

return null;
})
.when(next)
.onNext(Mockito.any());

rule.connection.addToReceivedBuffer(
RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE));
Assertions.assertThat(requests).containsOnly(1L, 2L, 253L);
}

public static class ServerSocketRule extends AbstractSocketRule<RSocketServer> {

private RSocket acceptingSocket;
Expand Down

0 comments on commit 115e836

Please sign in to comment.