Skip to content

Commit

Permalink
Prefer Flux#take(long, boolean) over Flux#take(long) to limit ups…
Browse files Browse the repository at this point in the history
…tream generation (#314)
  • Loading branch information
eric-picnic authored Oct 26, 2022
1 parent 6cb10ff commit 45dfc53
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tech.picnic.errorprone.refasterrules;

import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.errorprone.BugPattern.SeverityLevel.WARNING;
import static com.google.errorprone.refaster.ImportPolicy.STATIC_IMPORT_ALWAYS;
import static java.util.function.Function.identity;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -28,7 +29,9 @@
import reactor.test.StepVerifier;
import reactor.test.publisher.PublisherProbe;
import reactor.util.context.Context;
import tech.picnic.errorprone.refaster.annotation.Description;
import tech.picnic.errorprone.refaster.annotation.OnlineDocumentation;
import tech.picnic.errorprone.refaster.annotation.Severity;
import tech.picnic.errorprone.refaster.matchers.ThrowsCheckedException;

/** Refaster rules related to Reactor expressions and statements. */
Expand Down Expand Up @@ -144,6 +147,34 @@ Mono<S> after(Mono<T> mono, S object) {
}
}

/**
* Prefer {@link Flux#take(long, boolean)} over {@link Flux#take(long)}.
*
* <p>In Reactor versions prior to 3.5.0, {@code Flux#take(long)} makes an unbounded request
* upstream, and is equivalent to {@code Flux#take(long, false)}. In 3.5.0, the behavior of {@code
* Flux#take(long)} will change to that of {@code Flux#take(long, true)}.
*
* <p>The intent with this Refaster rule is to get the new behavior before upgrading to Reactor
* 3.5.0.
*/
// XXX: Drop this rule some time after upgrading to Reactor 3.6.0, or introduce a way to apply
// this rule only when an older version of Reactor is on the classpath.
// XXX: Once Reactor 3.6.0 is out, introduce a rule that rewrites code in the opposite direction.
@Description(
"Prior to Reactor 3.5.0, `take(n)` requests and unbounded number of elements upstream.")
@Severity(WARNING)
static final class FluxTake<T> {
@BeforeTemplate
Flux<T> before(Flux<T> flux, long n) {
return flux.take(n);
}

@AfterTemplate
Flux<T> after(Flux<T> flux, long n) {
return flux.take(n, /* limitRequest= */ true);
}
}

/** Don't unnecessarily pass an empty publisher to {@link Mono#switchIfEmpty(Mono)}. */
static final class MonoSwitchIfEmptyOfEmptyPublisher<T> {
@BeforeTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ Mono<String> testMonoThenReturn() {
return Mono.empty().then(Mono.just("foo"));
}

Flux<Integer> testFluxTake() {
return Flux.just(1, 2, 3).take(1);
}

Mono<Integer> testMonoSwitchIfEmptyOfEmptyPublisher() {
return Mono.just(1).switchIfEmpty(Mono.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Mono<String> testMonoThenReturn() {
return Mono.empty().thenReturn("foo");
}

Flux<Integer> testFluxTake() {
return Flux.just(1, 2, 3).take(1, true);
}

Mono<Integer> testMonoSwitchIfEmptyOfEmptyPublisher() {
return Mono.just(1);
}
Expand Down

0 comments on commit 45dfc53

Please sign in to comment.