From c110c92e222dda43c675dcd601671a979226fbbd Mon Sep 17 00:00:00 2001 From: Elena Liashenko Date: Wed, 31 Aug 2022 15:51:38 +0200 Subject: [PATCH] Introduce check for calling flatMap() on Flux --- .../bugpatterns/FluxFlatMapUsage.java | 54 ++++++++++++++----- .../bugpatterns/FluxFlatMapUsageTest.java | 25 ++++++++- 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsage.java b/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsage.java index 1107d8bb83b..395d2609a7f 100644 --- a/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsage.java +++ b/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsage.java @@ -5,6 +5,10 @@ import static com.google.errorprone.BugPattern.StandardTags.LIKELY_ERROR; import static com.google.errorprone.matchers.method.MethodMatchers.instanceMethod; import static tech.picnic.errorprone.bugpatterns.util.Documentation.BUG_PATTERNS_BASE_URL; +import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.generic; +import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.subOf; +import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.type; +import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.unbound; import com.google.auto.service.AutoService; import com.google.common.collect.Iterables; @@ -17,11 +21,14 @@ import com.google.errorprone.fixes.SuggestedFixes; import com.google.errorprone.matchers.Description; import com.google.errorprone.matchers.Matcher; +import com.google.errorprone.suppliers.Supplier; +import com.google.errorprone.suppliers.Suppliers; +import com.google.errorprone.util.ASTHelpers; import com.sun.source.tree.ExpressionTree; import com.sun.source.tree.MemberReferenceTree; import com.sun.source.tree.MethodInvocationTree; +import com.sun.tools.javac.code.Type; import java.util.function.Function; -import java.util.function.Supplier; import reactor.core.publisher.Flux; /** @@ -33,11 +40,12 @@ * former interleaves values as they are emitted, yielding nondeterministic results. In most cases * {@link Flux#concatMap(Function)} should be preferred, as it produces consistent results and * avoids potentially saturating the thread pool on which subscription happens. If {@code - * concatMap}'s single-subscription semantics are undesirable one should invoke a {@code flatMap} or - * {@code flatMapSequential} overload with an explicit concurrency level. + * concatMap}'s sequential-subscription semantics are undesirable one should invoke a {@code + * flatMap} or {@code flatMapSequential} overload with an explicit concurrency level. * - *

NB: The rarely-used overload {@link Flux#flatMap(Function, Function, Supplier)} is not flagged - * by this check because there is no clear alternative to point to. + *

NB: The rarely-used overload {@link Flux#flatMap(Function, Function, + * java.util.function.Supplier)} is not flagged by this check because there is no clear alternative + * to point to. */ @AutoService(BugChecker.class) @BugPattern( @@ -52,11 +60,16 @@ public final class FluxFlatMapUsage extends BugChecker implements MethodInvocationTreeMatcher, MemberReferenceTreeMatcher { private static final long serialVersionUID = 1L; private static final String MAX_CONCURRENCY_ARG_NAME = "MAX_CONCURRENCY"; + private static final Supplier FLUX = + Suppliers.typeFromString("reactor.core.publisher.Flux"); private static final Matcher FLUX_FLATMAP = instanceMethod() - .onDescendantOf("reactor.core.publisher.Flux") + .onDescendantOf(FLUX) .namedAnyOf("flatMap", "flatMapSequential") .withParameters(Function.class.getName()); + private static final Supplier FLUX_OF_PUBLISHERS = + VisitorState.memoize( + generic(FLUX, subOf(generic(type("org.reactivestreams.Publisher"), unbound())))); @Override public Description matchMethodInvocation(MethodInvocationTree tree, VisitorState state) { @@ -64,14 +77,27 @@ public Description matchMethodInvocation(MethodInvocationTree tree, VisitorState return Description.NO_MATCH; } - return buildDescription(tree) - .addFix(SuggestedFixes.renameMethodInvocation(tree, "concatMap", state)) - .addFix( - SuggestedFix.builder() - .postfixWith( - Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME) - .build()) - .build(); + SuggestedFix serializationFix = SuggestedFixes.renameMethodInvocation(tree, "concatMap", state); + SuggestedFix concurrencyCapFix = + SuggestedFix.builder() + .postfixWith( + Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME) + .build(); + + Description.Builder description = buildDescription(tree); + + if (state.getTypes().isSubtype(ASTHelpers.getType(tree), FLUX_OF_PUBLISHERS.get(state))) { + /* + * Nested publishers may need to be subscribed to eagerly in order to avoid a deadlock, e.g. + * if they are produced by `Flux#groupBy`. In this case we suggest specifying an explicit + * concurrently bound, in favour of sequential subscriptions using `Flux#concatMap`. + */ + description.addFix(concurrencyCapFix).addFix(serializationFix); + } else { + description.addFix(serializationFix).addFix(concurrencyCapFix); + } + + return description.build(); } @Override diff --git a/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageTest.java b/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageTest.java index 7cfd295b566..a80414e18bb 100644 --- a/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageTest.java +++ b/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageTest.java @@ -4,6 +4,7 @@ import com.google.errorprone.BugCheckerRefactoringTestHelper; import com.google.errorprone.BugCheckerRefactoringTestHelper.FixChoosers; +import com.google.errorprone.BugCheckerRefactoringTestHelper.TestMode; import com.google.errorprone.CompilationTestHelper; import org.junit.jupiter.api.Test; @@ -33,6 +34,14 @@ void identification() { " Flux.just(1).flatMapSequential(Flux::just);", " // BUG: Diagnostic contains:", " Flux.just(1).flatMapSequential(i -> Flux.just(String.valueOf(i)));", + " // BUG: Diagnostic contains:", + " Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);", + " // BUG: Diagnostic contains:", + " Flux.just(1, 2).groupBy(i -> i).flatMap(i -> Flux.just(String.valueOf(i)));", + " // BUG: Diagnostic contains:", + " Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);", + " // BUG: Diagnostic contains:", + " Flux.just(1, 2).groupBy(i -> i).flatMapSequential(i -> Flux.just(String.valueOf(i)));", "", " Mono.just(1).flatMap(Mono::just);", " Flux.just(1).concatMap(Flux::just);", @@ -71,9 +80,13 @@ void replacementFirstSuggestedFix() { "import reactor.core.publisher.Flux;", "", "class A {", + " private static final int MAX_CONCURRENCY = 8;", + "", " void m() {", " Flux.just(1).flatMap(Flux::just);", " Flux.just(1).flatMapSequential(Flux::just);", + " Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);", + " Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);", " }", "}") .addOutputLines( @@ -81,12 +94,16 @@ void replacementFirstSuggestedFix() { "import reactor.core.publisher.Flux;", "", "class A {", + " private static final int MAX_CONCURRENCY = 8;", + "", " void m() {", " Flux.just(1).concatMap(Flux::just);", " Flux.just(1).concatMap(Flux::just);", + " Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just, MAX_CONCURRENCY);", + " Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just, MAX_CONCURRENCY);", " }", "}") - .doTest(); + .doTest(TestMode.TEXT_MATCH); } @Test @@ -103,6 +120,8 @@ void replacementSecondSuggestedFix() { " void m() {", " Flux.just(1).flatMap(Flux::just);", " Flux.just(1).flatMapSequential(Flux::just);", + " Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);", + " Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);", " }", "}") .addOutputLines( @@ -115,8 +134,10 @@ void replacementSecondSuggestedFix() { " void m() {", " Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);", " Flux.just(1).flatMapSequential(Flux::just, MAX_CONCURRENCY);", + " Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);", + " Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);", " }", "}") - .doTest(); + .doTest(TestMode.TEXT_MATCH); } }