Skip to content

Commit

Permalink
Also flag Flux#flatMapSequential
Browse files Browse the repository at this point in the history
  • Loading branch information
rickie committed Jan 2, 2022
1 parent 38935bf commit edf932c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
import reactor.core.publisher.Flux;

/**
* A {@link BugChecker} which flags usages of {@link Flux#flatMap(Function)}.
* A {@link BugChecker} which flags usages of {@link Flux#flatMap(Function)} and {@link
* Flux#flatMapSequential(Function)}.
*
* <p>{@link Flux#flatMap(Function)} eagerly performs up to {@link
* reactor.util.concurrent.Queues#SMALL_BUFFER_SIZE} subscriptions, interleaving the results as they
* are emitted. In most cases {@link Flux#concatMap(Function)} should be preferred, as it produces
* consistent results and avoids potentially saturating the thread pool on which subscription
* happens. If {@code concatMap}'s single-subscription semantics are undesirable one should invoke a
* {@code flatMap} or {@code flatMapSequential} overload with an explicit concurrency level.
* <p>{@link Flux#flatMap(Function)} and {@link Flux#flatMapSequential(Function)} eagerly perform up
* to {@link reactor.util.concurrent.Queues#SMALL_BUFFER_SIZE} subscriptions, interleaving the
* results as they are emitted. In most cases {@link Flux#concatMap(Function)} should be preferred,
* as it produces consistent results and avoids potentially saturating the thread pool on which
* subscription happens. If {@code concatMap}'s single-subscription semantics are undesirable one
* should invoke a {@code flatMap} or {@code flatMapSequential} overload with an explicit
* concurrency level.
*
* <p>NB: The rarely-used overload {@link Flux#flatMap(Function, Function, Supplier)} is not flagged
* by this check because there is no clear alternative to point to.
Expand All @@ -40,7 +42,7 @@
@BugPattern(
name = "FluxFlatMapUsage",
summary =
"`Flux#flatMap` has subtle semantics; please use `Flux#concatMap` or explicitly specify the desired amount of concurrency",
"`Flux#flatMap` and `Flux#flatMapSequential` have subtle semantics; please use `Flux#concatMap` or explicitly specify the desired amount of concurrency",
linkType = LinkType.NONE,
severity = SeverityLevel.ERROR,
tags = StandardTags.LIKELY_ERROR)
Expand All @@ -51,7 +53,7 @@ public final class FluxFlatMapUsageCheck extends BugChecker
private static final Matcher<ExpressionTree> FLUX_FLATMAP =
instanceMethod()
.onDescendantOf("reactor.core.publisher.Flux")
.named("flatMap")
.namedAnyOf("flatMap", "flatMapSequential")
.withParameters(Function.class.getName());

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,31 @@ void identification() {
" Flux.just(1).flatMap(Flux::just);",
" // BUG: Diagnostic contains:",
" Flux.just(1).<String>flatMap(i -> Flux.just(String.valueOf(i)));",
" // BUG: Diagnostic contains:",
" Flux.just(1).flatMapSequential(Flux::just);",
" // BUG: Diagnostic contains:",
" Flux.just(1).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));",
"",
" Mono.just(1).flatMap(Mono::just);",
" Flux.just(1).concatMap(Flux::just);",
"",
" Flux.just(1).flatMap(Flux::just, 1);",
" Flux.just(1).flatMap(Flux::just, 1, 1);",
" Flux.just(1).flatMap(Flux::just, throwable -> Flux.empty(), Flux::empty);",
"",
" Flux.just(1).flatMapSequential(Flux::just, 1);",
" Flux.just(1).flatMapSequential(Flux::just, 1, 1);",
"",
" // BUG: Diagnostic contains:",
" this.<String, Flux<String>>sink(Flux::flatMap);",
" // BUG: Diagnostic contains:",
" this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMap);",
"",
" // BUG: Diagnostic contains:",
" this.<String, Flux<String>>sink(Flux::flatMapSequential);",
" // BUG: Diagnostic contains:",
" this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMapSequential);",
"",
" this.<String, Mono<String>>sink(Mono::flatMap);",
" }",
"",
Expand All @@ -59,6 +73,7 @@ void replacementFirstSuggestedFix() {
"class A {",
" void m() {",
" Flux.just(1).flatMap(Flux::just);",
" Flux.just(1).flatMapSequential(Flux::just);",
" }",
"}")
.addOutputLines(
Expand All @@ -68,6 +83,7 @@ void replacementFirstSuggestedFix() {
"class A {",
" void m() {",
" Flux.just(1).concatMap(Flux::just);",
" Flux.just(1).concatMap(Flux::just);",
" }",
"}")
.doTest();
Expand All @@ -86,6 +102,7 @@ void replacementSecondSuggestedFix() {
"",
" void m() {",
" Flux.just(1).flatMap(Flux::just);",
" Flux.just(1).flatMapSequential(Flux::just);",
" }",
"}")
.addOutputLines(
Expand All @@ -97,6 +114,7 @@ void replacementSecondSuggestedFix() {
"",
" void m() {",
" Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);",
" Flux.just(1).flatMapSequential(Flux::just, MAX_CONCURRENCY);",
" }",
"}")
.doTest();
Expand Down

0 comments on commit edf932c

Please sign in to comment.