diff --git a/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageCheck.java b/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageCheck.java
new file mode 100644
index 0000000000..2e42043872
--- /dev/null
+++ b/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageCheck.java
@@ -0,0 +1,86 @@
+package tech.picnic.errorprone.bugpatterns;
+
+import static com.google.errorprone.matchers.method.MethodMatchers.instanceMethod;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Iterables;
+import com.google.errorprone.BugPattern;
+import com.google.errorprone.BugPattern.LinkType;
+import com.google.errorprone.BugPattern.SeverityLevel;
+import com.google.errorprone.BugPattern.StandardTags;
+import com.google.errorprone.VisitorState;
+import com.google.errorprone.bugpatterns.BugChecker;
+import com.google.errorprone.bugpatterns.BugChecker.MemberReferenceTreeMatcher;
+import com.google.errorprone.bugpatterns.BugChecker.MethodInvocationTreeMatcher;
+import com.google.errorprone.fixes.SuggestedFix;
+import com.google.errorprone.fixes.SuggestedFixes;
+import com.google.errorprone.matchers.Description;
+import com.google.errorprone.matchers.Matcher;
+import com.sun.source.tree.ExpressionTree;
+import com.sun.source.tree.MemberReferenceTree;
+import com.sun.source.tree.MethodInvocationTree;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import reactor.core.publisher.Flux;
+
+/**
+ * A {@link BugChecker} which flags usages of {@link Flux#flatMap(Function)} and {@link
+ * Flux#flatMapSequential(Function)}.
+ *
+ *
{@link Flux#flatMap(Function)} and {@link Flux#flatMapSequential(Function)} eagerly perform up
+ * to {@link reactor.util.concurrent.Queues#SMALL_BUFFER_SIZE} subscriptions. Additionally, the
+ * former interleaves values as they are emitted, yielding nondeterministic results. In most cases
+ * {@link Flux#concatMap(Function)} should be preferred, as it produces consistent results and
+ * avoids potentially saturating the thread pool on which subscription happens. If {@code
+ * concatMap}'s single-subscription semantics are undesirable one should invoke a {@code flatMap} or
+ * {@code flatMapSequential} overload with an explicit concurrency level.
+ *
+ *
NB: The rarely-used overload {@link Flux#flatMap(Function, Function, Supplier)} is not flagged
+ * by this check because there is no clear alternative to point to.
+ */
+@AutoService(BugChecker.class)
+@BugPattern(
+ name = "FluxFlatMapUsage",
+ summary =
+ "`Flux#flatMap` and `Flux#flatMapSequential` have subtle semantics; "
+ + "please use `Flux#concatMap` or explicitly specify the desired amount of concurrency",
+ linkType = LinkType.NONE,
+ severity = SeverityLevel.ERROR,
+ tags = StandardTags.LIKELY_ERROR)
+public final class FluxFlatMapUsageCheck extends BugChecker
+ implements MethodInvocationTreeMatcher, MemberReferenceTreeMatcher {
+ private static final long serialVersionUID = 1L;
+ private static final String MAX_CONCURRENCY_ARG_NAME = "MAX_CONCURRENCY";
+ private static final Matcher FLUX_FLATMAP =
+ instanceMethod()
+ .onDescendantOf("reactor.core.publisher.Flux")
+ .namedAnyOf("flatMap", "flatMapSequential")
+ .withParameters(Function.class.getName());
+
+ @Override
+ public Description matchMethodInvocation(MethodInvocationTree tree, VisitorState state) {
+ if (!FLUX_FLATMAP.matches(tree, state)) {
+ return Description.NO_MATCH;
+ }
+
+ return buildDescription(tree)
+ .addFix(SuggestedFixes.renameMethodInvocation(tree, "concatMap", state))
+ .addFix(
+ SuggestedFix.builder()
+ .postfixWith(
+ Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME)
+ .build())
+ .build();
+ }
+
+ @Override
+ public Description matchMemberReference(MemberReferenceTree tree, VisitorState state) {
+ if (!FLUX_FLATMAP.matches(tree, state)) {
+ return Description.NO_MATCH;
+ }
+
+ // Method references are expected to occur very infrequently; generating both variants of
+ // suggested fixes is not worth the trouble.
+ return describeMatch(tree);
+ }
+}
diff --git a/error-prone-contrib/src/main/java/tech/picnic/errorprone/refastertemplates/ReactorTemplates.java b/error-prone-contrib/src/main/java/tech/picnic/errorprone/refastertemplates/ReactorTemplates.java
index 2ec67549e8..3f93e629fd 100644
--- a/error-prone-contrib/src/main/java/tech/picnic/errorprone/refastertemplates/ReactorTemplates.java
+++ b/error-prone-contrib/src/main/java/tech/picnic/errorprone/refastertemplates/ReactorTemplates.java
@@ -142,6 +142,35 @@ Flux after(Flux flux) {
}
}
+ /** Prefer {@link Flux#concatMap(Function)} over more contrived alternatives. */
+ static final class FluxConcatMap {
+ @BeforeTemplate
+ Flux before(Flux flux, Function super T, ? extends Publisher extends S>> function) {
+ return Refaster.anyOf(flux.flatMap(function, 1), flux.flatMapSequential(function, 1));
+ }
+
+ @AfterTemplate
+ Flux after(Flux flux, Function super T, ? extends Publisher extends S>> function) {
+ return flux.concatMap(function);
+ }
+ }
+
+ /**
+ * Prefer {@link Flux#concatMapIterable(Function)} over {@link Flux#concatMapIterable(Function)},
+ * as the former has equivalent semantics but a clearer name.
+ */
+ static final class FluxConcatMapIterable {
+ @BeforeTemplate
+ Flux before(Flux flux, Function super T, ? extends Iterable extends S>> function) {
+ return flux.flatMapIterable(function);
+ }
+
+ @AfterTemplate
+ Flux after(Flux flux, Function super T, ? extends Iterable extends S>> function) {
+ return flux.concatMapIterable(function);
+ }
+ }
+
/**
* Don't use {@link Mono#flatMapMany(Function)} to implicitly convert a {@link Mono} to a {@link
* Flux}.
diff --git a/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageCheckTest.java b/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageCheckTest.java
new file mode 100644
index 0000000000..e6dff10e8a
--- /dev/null
+++ b/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxFlatMapUsageCheckTest.java
@@ -0,0 +1,122 @@
+package tech.picnic.errorprone.bugpatterns;
+
+import static com.google.errorprone.BugCheckerRefactoringTestHelper.newInstance;
+
+import com.google.errorprone.BugCheckerRefactoringTestHelper;
+import com.google.errorprone.BugCheckerRefactoringTestHelper.FixChoosers;
+import com.google.errorprone.CompilationTestHelper;
+import org.junit.jupiter.api.Test;
+
+final class FluxFlatMapUsageCheckTest {
+ private final CompilationTestHelper compilationTestHelper =
+ CompilationTestHelper.newInstance(FluxFlatMapUsageCheck.class, getClass());
+ private final BugCheckerRefactoringTestHelper refactoringTestHelper =
+ newInstance(FluxFlatMapUsageCheck.class, getClass());
+
+ @Test
+ void identification() {
+ compilationTestHelper
+ .addSourceLines(
+ "A.java",
+ "import java.util.function.BiFunction;",
+ "import java.util.function.Function;",
+ "import reactor.core.publisher.Mono;",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " // BUG: Diagnostic contains:",
+ " Flux.just(1).flatMap(Flux::just);",
+ " // BUG: Diagnostic contains:",
+ " Flux.just(1).flatMap(i -> Flux.just(String.valueOf(i)));",
+ " // BUG: Diagnostic contains:",
+ " Flux.just(1).flatMapSequential(Flux::just);",
+ " // BUG: Diagnostic contains:",
+ " Flux.just(1).flatMapSequential(i -> Flux.just(String.valueOf(i)));",
+ "",
+ " Mono.just(1).flatMap(Mono::just);",
+ " Flux.just(1).concatMap(Flux::just);",
+ "",
+ " Flux.just(1).flatMap(Flux::just, 1);",
+ " Flux.just(1).flatMap(Flux::just, 1, 1);",
+ " Flux.just(1).flatMap(Flux::just, throwable -> Flux.empty(), Flux::empty);",
+ "",
+ " Flux.just(1).flatMapSequential(Flux::just, 1);",
+ " Flux.just(1).flatMapSequential(Flux::just, 1, 1);",
+ "",
+ " // BUG: Diagnostic contains:",
+ " this.>sink(Flux::flatMap);",
+ " // BUG: Diagnostic contains:",
+ " this.>sink(Flux::flatMap);",
+ "",
+ " // BUG: Diagnostic contains:",
+ " this.>sink(Flux::flatMapSequential);",
+ " // BUG: Diagnostic contains:",
+ " this.>sink(Flux::flatMapSequential);",
+ "",
+ " this.>sink(Mono::flatMap);",
+ " }",
+ "",
+ " private void sink(BiFunction, P> fun) {}",
+ "}")
+ .doTest();
+ }
+
+ @Test
+ void replacementFirstSuggestedFix() {
+ refactoringTestHelper
+ .setFixChooser(FixChoosers.FIRST)
+ .addInputLines(
+ "in/A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " Flux.just(1).flatMap(Flux::just);",
+ " Flux.just(1).flatMapSequential(Flux::just);",
+ " }",
+ "}")
+ .addOutputLines(
+ "out/A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " Flux.just(1).concatMap(Flux::just);",
+ " Flux.just(1).concatMap(Flux::just);",
+ " }",
+ "}")
+ .doTest();
+ }
+
+ @Test
+ void replacementSecondSuggestedFix() {
+ refactoringTestHelper
+ .setFixChooser(FixChoosers.SECOND)
+ .addInputLines(
+ "in/A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " private static final int MAX_CONCURRENCY = 8;",
+ "",
+ " void m() {",
+ " Flux.just(1).flatMap(Flux::just);",
+ " Flux.just(1).flatMapSequential(Flux::just);",
+ " }",
+ "}")
+ .addOutputLines(
+ "out/A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " private static final int MAX_CONCURRENCY = 8;",
+ "",
+ " void m() {",
+ " Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);",
+ " Flux.just(1).flatMapSequential(Flux::just, MAX_CONCURRENCY);",
+ " }",
+ "}")
+ .doTest();
+ }
+}
diff --git a/error-prone-contrib/src/test/resources/tech/picnic/errorprone/bugpatterns/ReactorTemplatesTestInput.java b/error-prone-contrib/src/test/resources/tech/picnic/errorprone/bugpatterns/ReactorTemplatesTestInput.java
index 5a22d191e0..add57fb20c 100644
--- a/error-prone-contrib/src/test/resources/tech/picnic/errorprone/bugpatterns/ReactorTemplatesTestInput.java
+++ b/error-prone-contrib/src/test/resources/tech/picnic/errorprone/bugpatterns/ReactorTemplatesTestInput.java
@@ -1,5 +1,6 @@
package tech.picnic.errorprone.bugpatterns;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.util.Optional;
@@ -45,6 +46,15 @@ ImmutableSet> testFluxSwitchIfEmptyOfEmptyPublisher() {
Flux.just(1).switchIfEmpty(Mono.empty()), Flux.just(2).switchIfEmpty(Flux.empty()));
}
+ ImmutableSet> testFluxConcatMap() {
+ return ImmutableSet.of(
+ Flux.just(1).flatMap(Mono::just, 1), Flux.just(2).flatMapSequential(Mono::just, 1));
+ }
+
+ Flux testFluxConcatMapIterable() {
+ return Flux.just(1, 2).flatMapIterable(ImmutableList::of);
+ }
+
Flux testMonoFlatMapToFlux() {
return Mono.just("foo").flatMapMany(s -> Mono.just(s + s));
}
diff --git a/error-prone-contrib/src/test/resources/tech/picnic/errorprone/bugpatterns/ReactorTemplatesTestOutput.java b/error-prone-contrib/src/test/resources/tech/picnic/errorprone/bugpatterns/ReactorTemplatesTestOutput.java
index c300a31d78..3dcff32c5f 100644
--- a/error-prone-contrib/src/test/resources/tech/picnic/errorprone/bugpatterns/ReactorTemplatesTestOutput.java
+++ b/error-prone-contrib/src/test/resources/tech/picnic/errorprone/bugpatterns/ReactorTemplatesTestOutput.java
@@ -2,6 +2,7 @@
import static com.google.common.collect.MoreCollectors.toOptional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.util.Optional;
@@ -46,6 +47,14 @@ ImmutableSet> testFluxSwitchIfEmptyOfEmptyPublisher() {
return ImmutableSet.of(Flux.just(1), Flux.just(2));
}
+ ImmutableSet> testFluxConcatMap() {
+ return ImmutableSet.of(Flux.just(1).concatMap(Mono::just), Flux.just(2).concatMap(Mono::just));
+ }
+
+ Flux testFluxConcatMapIterable() {
+ return Flux.just(1, 2).concatMapIterable(ImmutableList::of);
+ }
+
Flux testMonoFlatMapToFlux() {
return Mono.just("foo").flatMap(s -> Mono.just(s + s)).flux();
}