Skip to content

Commit

Permalink
Suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephan202 authored and rickie committed Jan 2, 2022
1 parent 8714b37 commit 38935bf
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,81 @@
import static com.google.errorprone.matchers.method.MethodMatchers.instanceMethod;

import com.google.auto.service.AutoService;
import com.google.common.collect.Iterables;
import com.google.errorprone.BugPattern;
import com.google.errorprone.BugPattern.LinkType;
import com.google.errorprone.BugPattern.SeverityLevel;
import com.google.errorprone.BugPattern.StandardTags;
import com.google.errorprone.VisitorState;
import com.google.errorprone.bugpatterns.BugChecker;
import com.google.errorprone.bugpatterns.BugChecker.MemberSelectTreeMatcher;
import com.google.errorprone.bugpatterns.BugChecker.MemberReferenceTreeMatcher;
import com.google.errorprone.bugpatterns.BugChecker.MethodInvocationTreeMatcher;
import com.google.errorprone.fixes.SuggestedFix;
import com.google.errorprone.fixes.SuggestedFixes;
import com.google.errorprone.matchers.Description;
import com.google.errorprone.matchers.Matcher;
import com.sun.source.tree.ExpressionTree;
import com.sun.source.tree.MemberSelectTree;
import com.sun.source.tree.Tree;
import com.sun.source.tree.MemberReferenceTree;
import com.sun.source.tree.MethodInvocationTree;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;

/** A {@link BugChecker} which flags usages of {@link Flux#flatMap(Function)}s. */
/**
* A {@link BugChecker} which flags usages of {@link Flux#flatMap(Function)}.
*
* <p>{@link Flux#flatMap(Function)} eagerly performs up to {@link
* reactor.util.concurrent.Queues#SMALL_BUFFER_SIZE} subscriptions, interleaving the results as they
* are emitted. In most cases {@link Flux#concatMap(Function)} should be preferred, as it produces
* consistent results and avoids potentially saturating the thread pool on which subscription
* happens. If {@code concatMap}'s single-subscription semantics are undesirable one should invoke a
* {@code flatMap} or {@code flatMapSequential} overload with an explicit concurrency level.
*
* <p>NB: The rarely-used overload {@link Flux#flatMap(Function, Function, Supplier)} is not flagged
* by this check because there is no clear alternative to point to.
*/
@AutoService(BugChecker.class)
@BugPattern(
name = "FluxFlatMapUsage",
summary =
"`Flux#flatMap` is not allowed, please use `Flux#concatMap` or specify an argument for the concurrency.",
explanation =
"`Flux#flatMap` provides unbounded parallelism and is not guaranteed to be sequential. "
+ "Therefore, we disallow the use of the non-overloaded `Flux#flatMap`.",
"`Flux#flatMap` has subtle semantics; please use `Flux#concatMap` or explicitly specify the desired amount of concurrency",
linkType = LinkType.NONE,
severity = SeverityLevel.ERROR,
tags = StandardTags.LIKELY_ERROR)
public final class FluxFlatMapUsageCheck extends BugChecker implements MemberSelectTreeMatcher {
public final class FluxFlatMapUsageCheck extends BugChecker
implements MethodInvocationTreeMatcher, MemberReferenceTreeMatcher {
private static final long serialVersionUID = 1L;
private static final String NAME_CONCURRENCY_ARGUMENT = "MAX_CONCURRENCY";
private static final String MAX_CONCURRENCY_ARG_NAME = "MAX_CONCURRENCY";
private static final Matcher<ExpressionTree> FLUX_FLATMAP =
instanceMethod()
.onDescendantOf("reactor.core.publisher.Flux")
.named("flatMap")
.withParameters(Function.class.getName());

@Override
public Description matchMemberSelect(MemberSelectTree tree, VisitorState state) {
public Description matchMethodInvocation(MethodInvocationTree tree, VisitorState state) {
if (!FLUX_FLATMAP.matches(tree, state)) {
return Description.NO_MATCH;
}

Tree parentExpression = state.getPath().getParentPath().getLeaf();

return buildDescription(tree)
.setMessage(message())
.addFix(SuggestedFixes.renameMethodInvocation(tree, "concatMap", state))
.addFix(
SuggestedFix.builder()
.replace(tree, Util.treeToString(tree, state).replace("flatMap", "concatMap"))
.build())
.addFix(
SuggestedFix.builder()
.replace(
parentExpression,
getReplacementWithConcurrencyArgument(parentExpression, state))
.postfixWith(
Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME)
.build())
.build();
}

private static String getReplacementWithConcurrencyArgument(
Tree parentExpression, VisitorState state) {
String parentString = Util.treeToString(parentExpression, state);
return String.format(
"%s, %s)",
parentString.substring(0, parentString.lastIndexOf(')')), NAME_CONCURRENCY_ARGUMENT);
@Override
public Description matchMemberReference(MemberReferenceTree tree, VisitorState state) {
if (!FLUX_FLATMAP.matches(tree, state)) {
return Description.NO_MATCH;
}

// Method references are expected to occur very infrequently; generating both variants of
// suggested fixes is not worth the trouble.
return describeMatch(tree);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ Flux<T> after(Flux<T> flux) {
}
}

/** Prefer {@link Flux#concatMap(Function)} over more contrived alternatives. */
static final class FluxConcatMap<T, S> {
@BeforeTemplate
Flux<S> before(Flux<T> flux, Function<? super T, ? extends Publisher<? extends S>> function) {
return Refaster.anyOf(flux.flatMap(function, 1), flux.flatMapSequential(function, 1));
}

@AfterTemplate
Flux<S> after(Flux<T> flux, Function<? super T, ? extends Publisher<? extends S>> function) {
return flux.concatMap(function);
}
}

/**
* Don't use {@link Mono#flatMapMany(Function)} to implicitly convert a {@link Mono} to a {@link
* Flux}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,32 @@ void identification() {
compilationTestHelper
.addSourceLines(
"A.java",
"import java.util.function.BiFunction;",
"import java.util.function.Function;",
"import reactor.core.publisher.Mono;",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" void positive() {",
" // BUG: Diagnostic contains:",
" Flux.just(1).flatMap(Flux::just);",
" }",
" void m() {",
" // BUG: Diagnostic contains:",
" Flux.just(1).flatMap(Flux::just);",
" // BUG: Diagnostic contains:",
" Flux.just(1).<String>flatMap(i -> Flux.just(String.valueOf(i)));",
"",
" void negative() {",
" Flux.just(1).concatMap(Flux::just);",
" Flux.just(1).flatMap(Flux::just, 1);",
" Flux.just(1).flatMap(Flux::just, 1, 1);",
" Flux.just(1).flatMap(Flux::just, throwable -> Flux.empty(), Flux::empty);",
"",
" // BUG: Diagnostic contains:",
" this.<String, Flux<String>>sink(Flux::flatMap);",
" // BUG: Diagnostic contains:",
" this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMap);",
"",
" this.<String, Mono<String>>sink(Mono::flatMap);",
" }",
"",
" private <T, P> void sink(BiFunction<P, Function<T, P>, P> fun) {}",
"}")
.doTest();
}
Expand All @@ -45,17 +57,17 @@ void replacementFirstSuggestedFix() {
"import reactor.core.publisher.Flux;",
"",
"class A {",
" void positive() {",
" Flux.just(1).flatMap(Flux::just);",
" void m() {",
" Flux.just(1).flatMap(Flux::just);",
" }",
"}")
.addOutputLines(
"out/A.java",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" void positive() {",
" Flux.just(1).concatMap(Flux::just);",
" void m() {",
" Flux.just(1).concatMap(Flux::just);",
" }",
"}")
.doTest();
Expand All @@ -70,21 +82,21 @@ void replacementSecondSuggestedFix() {
"import reactor.core.publisher.Flux;",
"",
"class A {",
" private static final int MAX_CONCURRENCY = 10;",
" private static final int MAX_CONCURRENCY = 8;",
"",
" void positive() {",
" Flux.just(1).flatMap(Flux::just);",
" void m() {",
" Flux.just(1).flatMap(Flux::just);",
" }",
"}")
.addOutputLines(
"out/A.java",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" private static final int MAX_CONCURRENCY = 10;",
" private static final int MAX_CONCURRENCY = 8;",
"",
" void positive() {",
" Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);",
" void m() {",
" Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);",
" }",
"}")
.doTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ ImmutableSet<Flux<Integer>> testFluxSwitchIfEmptyOfEmptyPublisher() {
Flux.just(1).switchIfEmpty(Mono.empty()), Flux.just(2).switchIfEmpty(Flux.empty()));
}

ImmutableSet<Flux<Integer>> testFluxConcatMap() {
return ImmutableSet.of(
Flux.just(1).flatMap(Mono::just, 1), Flux.just(2).flatMapSequential(Mono::just, 1));
}

Flux<String> testMonoFlatMapToFlux() {
return Mono.just("foo").flatMapMany(s -> Mono.just(s + s));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ ImmutableSet<Flux<Integer>> testFluxSwitchIfEmptyOfEmptyPublisher() {
return ImmutableSet.of(Flux.just(1), Flux.just(2));
}

ImmutableSet<Flux<Integer>> testFluxConcatMap() {
return ImmutableSet.of(Flux.just(1).concatMap(Mono::just), Flux.just(2).concatMap(Mono::just));
}

Flux<String> testMonoFlatMapToFlux() {
return Mono.just("foo").flatMap(s -> Mono.just(s + s)).flux();
}
Expand Down

0 comments on commit 38935bf

Please sign in to comment.