diff --git a/gradle.properties b/gradle.properties index f394ea600..40d676374 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=1.0.0-RC2-SNAPSHOT-TEST +version=1.0.0-RC2-SNAPSHOT diff --git a/rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java b/rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java index b2f0be463..cdb0d0c0c 100755 --- a/rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java @@ -60,10 +60,6 @@ public static RateLimitableRequestPublisher wrap(Publisher source, lon return new RateLimitableRequestPublisher<>(source, prefetch); } - // public static RateLimitableRequestPublisher wrap(Publisher source) { - // return wrap(source, Long.MAX_VALUE); - // } - @Override public void subscribe(CoreSubscriber destination) { synchronized (this) { @@ -103,16 +99,22 @@ private void requestN() { } final long er = externalRequested; + final long p = prefetch; + final int pendingFulfil = pendingToFulfil; - if (er != Long.MAX_VALUE && prefetch != Integer.MAX_VALUE) { + if (er != Long.MAX_VALUE || p != Integer.MAX_VALUE) { // shortcut - if (pendingToFulfil == prefetch) { + if (pendingFulfil == p) { return; } - r = Math.min(prefetch - pendingToFulfil, er); - externalRequested -= r; - pendingToFulfil += r; + r = Math.min(p - pendingFulfil, er); + if (er != Long.MAX_VALUE) { + externalRequested -= r; + } + if (p != Integer.MAX_VALUE) { + pendingToFulfil += r; + } } else { r = Long.MAX_VALUE; } @@ -169,49 +171,51 @@ public void onSubscribe(Subscription s) { public void onNext(T t) { try { destination.onNext(t); - deliveredElements++; - if (deliveredElements == limit) { - deliveredElements = 0; + if (prefetch == Integer.MAX_VALUE) { + return; + } + + final long l = limit; + int d = deliveredElements + 1; + + if (d == l) { + d = 0; final long r; final Subscription s; synchronized (RateLimitableRequestPublisher.this) { + long er = externalRequested; s = internalSubscription; if (s == null) { return; } - if (externalRequested >= limit) { - externalRequested -= limit; + if (er >= l) { + er -= l; // keep pendingToFulfil as is since it is eq to prefetch - r = limit; + r = l; } else { - pendingToFulfil -= limit; - if (externalRequested > 0) { - r = externalRequested; - externalRequested = 0; + pendingToFulfil -= l; + if (er > 0) { + r = er; + er = 0; pendingToFulfil += r; } else { r = 0; } } + + externalRequested = er; } if (r > 0) { s.request(r); } } - // else if (deliveredElements == pendingToFulfil) { - // deliveredElements = 0; - // synchronized (RateLimitableRequestPublisher.this) { - // pendingToFulfil -= deliveredElements; - // if (deliveredElements == pendingToFulfil) { - // return; - // } - // } - // } + + deliveredElements = d; } catch (Throwable e) { onError(e); } diff --git a/rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java b/rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java index 25d7e5fab..8560dc722 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java @@ -103,4 +103,42 @@ public void accept(Long __) { .expectComplete() .verify(Duration.ofMillis(30000)); } + + @Test + public void testThatRequestLongMaxValueWillBeDeliveredInSeparateChunks() { + Flux source = + Flux.range(0, 10000000) + .subscribeOn(Schedulers.parallel()) + .doOnRequest(r -> Assertions.assertThat(r).isLessThanOrEqualTo(128)); + + RateLimitableRequestPublisher rateLimitableRequestPublisher = + RateLimitableRequestPublisher.wrap(source, 128); + + StepVerifier.create(rateLimitableRequestPublisher) + .then( + () -> rateLimitableRequestPublisher.request(Long.MAX_VALUE) + ) + .expectNextCount(10000000) + .expectComplete() + .verify(Duration.ofMillis(30000)); + } + + @Test + public void testThatRequestLongMaxWithIntegerMaxValuePrefetchWillBeDeliveredAsLongMaxValue() { + Flux source = + Flux.range(0, 10000000) + .subscribeOn(Schedulers.parallel()) + .doOnRequest(r -> Assertions.assertThat(r).isEqualTo(Long.MAX_VALUE)); + + RateLimitableRequestPublisher rateLimitableRequestPublisher = + RateLimitableRequestPublisher.wrap(source, Integer.MAX_VALUE); + + StepVerifier.create(rateLimitableRequestPublisher) + .then( + () -> rateLimitableRequestPublisher.request(Long.MAX_VALUE) + ) + .expectNextCount(10000000) + .expectComplete() + .verify(Duration.ofMillis(30000)); + } }