Skip to content

Commit

Permalink
Introduce check for calling flatMap() on Flux<GroupedFlux>
Browse files Browse the repository at this point in the history
  • Loading branch information
eliashenko authored and Stephan202 committed Oct 21, 2022
1 parent df41c50 commit c110c92
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import static com.google.errorprone.BugPattern.StandardTags.LIKELY_ERROR;
import static com.google.errorprone.matchers.method.MethodMatchers.instanceMethod;
import static tech.picnic.errorprone.bugpatterns.util.Documentation.BUG_PATTERNS_BASE_URL;
import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.generic;
import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.subOf;
import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.type;
import static tech.picnic.errorprone.bugpatterns.util.MoreTypes.unbound;

import com.google.auto.service.AutoService;
import com.google.common.collect.Iterables;
Expand All @@ -17,11 +21,14 @@
import com.google.errorprone.fixes.SuggestedFixes;
import com.google.errorprone.matchers.Description;
import com.google.errorprone.matchers.Matcher;
import com.google.errorprone.suppliers.Supplier;
import com.google.errorprone.suppliers.Suppliers;
import com.google.errorprone.util.ASTHelpers;
import com.sun.source.tree.ExpressionTree;
import com.sun.source.tree.MemberReferenceTree;
import com.sun.source.tree.MethodInvocationTree;
import com.sun.tools.javac.code.Type;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;

/**
Expand All @@ -33,11 +40,12 @@
* former interleaves values as they are emitted, yielding nondeterministic results. In most cases
* {@link Flux#concatMap(Function)} should be preferred, as it produces consistent results and
* avoids potentially saturating the thread pool on which subscription happens. If {@code
* concatMap}'s single-subscription semantics are undesirable one should invoke a {@code flatMap} or
* {@code flatMapSequential} overload with an explicit concurrency level.
* concatMap}'s sequential-subscription semantics are undesirable one should invoke a {@code
* flatMap} or {@code flatMapSequential} overload with an explicit concurrency level.
*
* <p>NB: The rarely-used overload {@link Flux#flatMap(Function, Function, Supplier)} is not flagged
* by this check because there is no clear alternative to point to.
* <p>NB: The rarely-used overload {@link Flux#flatMap(Function, Function,
* java.util.function.Supplier)} is not flagged by this check because there is no clear alternative
* to point to.
*/
@AutoService(BugChecker.class)
@BugPattern(
Expand All @@ -52,26 +60,44 @@ public final class FluxFlatMapUsage extends BugChecker
implements MethodInvocationTreeMatcher, MemberReferenceTreeMatcher {
private static final long serialVersionUID = 1L;
private static final String MAX_CONCURRENCY_ARG_NAME = "MAX_CONCURRENCY";
private static final Supplier<Type> FLUX =
Suppliers.typeFromString("reactor.core.publisher.Flux");
private static final Matcher<ExpressionTree> FLUX_FLATMAP =
instanceMethod()
.onDescendantOf("reactor.core.publisher.Flux")
.onDescendantOf(FLUX)
.namedAnyOf("flatMap", "flatMapSequential")
.withParameters(Function.class.getName());
private static final Supplier<Type> FLUX_OF_PUBLISHERS =
VisitorState.memoize(
generic(FLUX, subOf(generic(type("org.reactivestreams.Publisher"), unbound()))));

@Override
public Description matchMethodInvocation(MethodInvocationTree tree, VisitorState state) {
if (!FLUX_FLATMAP.matches(tree, state)) {
return Description.NO_MATCH;
}

return buildDescription(tree)
.addFix(SuggestedFixes.renameMethodInvocation(tree, "concatMap", state))
.addFix(
SuggestedFix.builder()
.postfixWith(
Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME)
.build())
.build();
SuggestedFix serializationFix = SuggestedFixes.renameMethodInvocation(tree, "concatMap", state);
SuggestedFix concurrencyCapFix =
SuggestedFix.builder()
.postfixWith(
Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME)
.build();

Description.Builder description = buildDescription(tree);

if (state.getTypes().isSubtype(ASTHelpers.getType(tree), FLUX_OF_PUBLISHERS.get(state))) {
/*
* Nested publishers may need to be subscribed to eagerly in order to avoid a deadlock, e.g.
* if they are produced by `Flux#groupBy`. In this case we suggest specifying an explicit
* concurrently bound, in favour of sequential subscriptions using `Flux#concatMap`.
*/
description.addFix(concurrencyCapFix).addFix(serializationFix);
} else {
description.addFix(serializationFix).addFix(concurrencyCapFix);
}

return description.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.google.errorprone.BugCheckerRefactoringTestHelper;
import com.google.errorprone.BugCheckerRefactoringTestHelper.FixChoosers;
import com.google.errorprone.BugCheckerRefactoringTestHelper.TestMode;
import com.google.errorprone.CompilationTestHelper;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -33,6 +34,14 @@ void identification() {
" Flux.just(1).flatMapSequential(Flux::just);",
" // BUG: Diagnostic contains:",
" Flux.just(1).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));",
" // BUG: Diagnostic contains:",
" Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);",
" // BUG: Diagnostic contains:",
" Flux.just(1, 2).groupBy(i -> i).<String>flatMap(i -> Flux.just(String.valueOf(i)));",
" // BUG: Diagnostic contains:",
" Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);",
" // BUG: Diagnostic contains:",
" Flux.just(1, 2).groupBy(i -> i).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));",
"",
" Mono.just(1).flatMap(Mono::just);",
" Flux.just(1).concatMap(Flux::just);",
Expand Down Expand Up @@ -71,22 +80,30 @@ void replacementFirstSuggestedFix() {
"import reactor.core.publisher.Flux;",
"",
"class A {",
" private static final int MAX_CONCURRENCY = 8;",
"",
" void m() {",
" Flux.just(1).flatMap(Flux::just);",
" Flux.just(1).flatMapSequential(Flux::just);",
" Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);",
" Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);",
" }",
"}")
.addOutputLines(
"A.java",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" private static final int MAX_CONCURRENCY = 8;",
"",
" void m() {",
" Flux.just(1).concatMap(Flux::just);",
" Flux.just(1).concatMap(Flux::just);",
" Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just, MAX_CONCURRENCY);",
" Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just, MAX_CONCURRENCY);",
" }",
"}")
.doTest();
.doTest(TestMode.TEXT_MATCH);
}

@Test
Expand All @@ -103,6 +120,8 @@ void replacementSecondSuggestedFix() {
" void m() {",
" Flux.just(1).flatMap(Flux::just);",
" Flux.just(1).flatMapSequential(Flux::just);",
" Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);",
" Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);",
" }",
"}")
.addOutputLines(
Expand All @@ -115,8 +134,10 @@ void replacementSecondSuggestedFix() {
" void m() {",
" Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);",
" Flux.just(1).flatMapSequential(Flux::just, MAX_CONCURRENCY);",
" Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);",
" Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);",
" }",
"}")
.doTest();
.doTest(TestMode.TEXT_MATCH);
}
}

0 comments on commit c110c92

Please sign in to comment.