Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce FluxFlatMapUsageCheck #26

Merged
merged 8 commits into from
Jan 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package tech.picnic.errorprone.bugpatterns;

import static com.google.errorprone.matchers.method.MethodMatchers.instanceMethod;

import com.google.auto.service.AutoService;
import com.google.common.collect.Iterables;
import com.google.errorprone.BugPattern;
import com.google.errorprone.BugPattern.LinkType;
import com.google.errorprone.BugPattern.SeverityLevel;
import com.google.errorprone.BugPattern.StandardTags;
import com.google.errorprone.VisitorState;
import com.google.errorprone.bugpatterns.BugChecker;
import com.google.errorprone.bugpatterns.BugChecker.MemberReferenceTreeMatcher;
import com.google.errorprone.bugpatterns.BugChecker.MethodInvocationTreeMatcher;
import com.google.errorprone.fixes.SuggestedFix;
import com.google.errorprone.fixes.SuggestedFixes;
import com.google.errorprone.matchers.Description;
import com.google.errorprone.matchers.Matcher;
import com.sun.source.tree.ExpressionTree;
import com.sun.source.tree.MemberReferenceTree;
import com.sun.source.tree.MethodInvocationTree;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;

/**
* A {@link BugChecker} which flags usages of {@link Flux#flatMap(Function)} and {@link
* Flux#flatMapSequential(Function)}.
*
* <p>{@link Flux#flatMap(Function)} and {@link Flux#flatMapSequential(Function)} eagerly perform up
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure about this first two sentences, are they still correct 😬 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite; will push something :)

* to {@link reactor.util.concurrent.Queues#SMALL_BUFFER_SIZE} subscriptions. Additionally, the
* former interleaves values as they are emitted, yielding nondeterministic results. In most cases
* {@link Flux#concatMap(Function)} should be preferred, as it produces consistent results and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we then also not include {concat,flat}MapIterable for that reason? Or is this one problematic because there's no concurrency overload?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that unlike #flatMap(Function) and #concatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.

Good one, I think it would make sense to just pick one (concat?) to keep usages uniform 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So IIUC:

  • {concat,flat}MapIterable do not suffer from the issue with eager subscription.
  • {concat,flat}MapIterable both emit values in a deterministic order.

Based on this I guess all we need is a Refaster check to replace flatMapIterable with concatMapIterable. I guess strictly speaking that's out of scope for this PR, but let's just add it for completeness 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a commit 😄 !

* avoids potentially saturating the thread pool on which subscription happens. If {@code
* concatMap}'s single-subscription semantics are undesirable one should invoke a {@code flatMap} or
* {@code flatMapSequential} overload with an explicit concurrency level.
*
* <p>NB: The rarely-used overload {@link Flux#flatMap(Function, Function, Supplier)} is not flagged
* by this check because there is no clear alternative to point to.
*/
@AutoService(BugChecker.class)
@BugPattern(
name = "FluxFlatMapUsage",
summary =
"`Flux#flatMap` and `Flux#flatMapSequential` have subtle semantics; "
+ "please use `Flux#concatMap` or explicitly specify the desired amount of concurrency",
linkType = LinkType.NONE,
severity = SeverityLevel.ERROR,
tags = StandardTags.LIKELY_ERROR)
public final class FluxFlatMapUsageCheck extends BugChecker
implements MethodInvocationTreeMatcher, MemberReferenceTreeMatcher {
private static final long serialVersionUID = 1L;
private static final String MAX_CONCURRENCY_ARG_NAME = "MAX_CONCURRENCY";
private static final Matcher<ExpressionTree> FLUX_FLATMAP =
instanceMethod()
.onDescendantOf("reactor.core.publisher.Flux")
.namedAnyOf("flatMap", "flatMapSequential")
.withParameters(Function.class.getName());

@Override
public Description matchMethodInvocation(MethodInvocationTree tree, VisitorState state) {
if (!FLUX_FLATMAP.matches(tree, state)) {
return Description.NO_MATCH;
}

return buildDescription(tree)
.addFix(SuggestedFixes.renameMethodInvocation(tree, "concatMap", state))
.addFix(
SuggestedFix.builder()
.postfixWith(
Iterables.getOnlyElement(tree.getArguments()), ", " + MAX_CONCURRENCY_ARG_NAME)
.build())
.build();
}

@Override
public Description matchMemberReference(MemberReferenceTree tree, VisitorState state) {
if (!FLUX_FLATMAP.matches(tree, state)) {
return Description.NO_MATCH;
}

// Method references are expected to occur very infrequently; generating both variants of
// suggested fixes is not worth the trouble.
return describeMatch(tree);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,35 @@ Flux<T> after(Flux<T> flux) {
}
}

/** Prefer {@link Flux#concatMap(Function)} over more contrived alternatives. */
static final class FluxConcatMap<T, S> {
@BeforeTemplate
Flux<S> before(Flux<T> flux, Function<? super T, ? extends Publisher<? extends S>> function) {
return Refaster.anyOf(flux.flatMap(function, 1), flux.flatMapSequential(function, 1));
}

@AfterTemplate
Flux<S> after(Flux<T> flux, Function<? super T, ? extends Publisher<? extends S>> function) {
return flux.concatMap(function);
}
}

/**
* Prefer {@link Flux#concatMapIterable(Function)} over {@link Flux#concatMapIterable(Function)},
* as the former has equivalent semantics but a clearer name.
*/
static final class FluxConcatMapIterable<T, S> {
@BeforeTemplate
Flux<S> before(Flux<T> flux, Function<? super T, ? extends Iterable<? extends S>> function) {
return flux.flatMapIterable(function);
}

@AfterTemplate
Flux<S> after(Flux<T> flux, Function<? super T, ? extends Iterable<? extends S>> function) {
return flux.concatMapIterable(function);
}
}

/**
* Don't use {@link Mono#flatMapMany(Function)} to implicitly convert a {@link Mono} to a {@link
* Flux}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package tech.picnic.errorprone.bugpatterns;

import static com.google.errorprone.BugCheckerRefactoringTestHelper.newInstance;

import com.google.errorprone.BugCheckerRefactoringTestHelper;
import com.google.errorprone.BugCheckerRefactoringTestHelper.FixChoosers;
import com.google.errorprone.CompilationTestHelper;
import org.junit.jupiter.api.Test;

final class FluxFlatMapUsageCheckTest {
private final CompilationTestHelper compilationTestHelper =
CompilationTestHelper.newInstance(FluxFlatMapUsageCheck.class, getClass());
private final BugCheckerRefactoringTestHelper refactoringTestHelper =
newInstance(FluxFlatMapUsageCheck.class, getClass());

@Test
void identification() {
compilationTestHelper
.addSourceLines(
"A.java",
"import java.util.function.BiFunction;",
"import java.util.function.Function;",
"import reactor.core.publisher.Mono;",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" void m() {",
" // BUG: Diagnostic contains:",
" Flux.just(1).flatMap(Flux::just);",
" // BUG: Diagnostic contains:",
" Flux.just(1).<String>flatMap(i -> Flux.just(String.valueOf(i)));",
" // BUG: Diagnostic contains:",
" Flux.just(1).flatMapSequential(Flux::just);",
" // BUG: Diagnostic contains:",
" Flux.just(1).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));",
"",
" Mono.just(1).flatMap(Mono::just);",
" Flux.just(1).concatMap(Flux::just);",
"",
" Flux.just(1).flatMap(Flux::just, 1);",
" Flux.just(1).flatMap(Flux::just, 1, 1);",
" Flux.just(1).flatMap(Flux::just, throwable -> Flux.empty(), Flux::empty);",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overload has the same implicit reliance on the default concurrency level as the unary variant. However, there is no more elaborate overload we can refer to. So perhaps we should just call out this observation in the main code, without further flagging this method.

"",
" Flux.just(1).flatMapSequential(Flux::just, 1);",
" Flux.just(1).flatMapSequential(Flux::just, 1, 1);",
"",
" // BUG: Diagnostic contains:",
" this.<String, Flux<String>>sink(Flux::flatMap);",
" // BUG: Diagnostic contains:",
" this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMap);",
"",
" // BUG: Diagnostic contains:",
" this.<String, Flux<String>>sink(Flux::flatMapSequential);",
" // BUG: Diagnostic contains:",
" this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMapSequential);",
"",
" this.<String, Mono<String>>sink(Mono::flatMap);",
" }",
"",
" private <T, P> void sink(BiFunction<P, Function<T, P>, P> fun) {}",
"}")
.doTest();
}

@Test
void replacementFirstSuggestedFix() {
refactoringTestHelper
.setFixChooser(FixChoosers.FIRST)
.addInputLines(
"in/A.java",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" void m() {",
" Flux.just(1).flatMap(Flux::just);",
" Flux.just(1).flatMapSequential(Flux::just);",
" }",
"}")
.addOutputLines(
"out/A.java",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" void m() {",
" Flux.just(1).concatMap(Flux::just);",
" Flux.just(1).concatMap(Flux::just);",
" }",
"}")
.doTest();
}

@Test
void replacementSecondSuggestedFix() {
refactoringTestHelper
.setFixChooser(FixChoosers.SECOND)
.addInputLines(
"in/A.java",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" private static final int MAX_CONCURRENCY = 8;",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking, is this constant necessary in the before case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. The suggested fix does not introduce this constant, so strictly speaking it yields non-compilable code, which refactoringTestHelper doesn't like. So we introduce this constant to work around that.

(In theory we could update the code to suggest a MAX_CONCURRENCY constant, but that's not worth the hassle.)

"",
" void m() {",
" Flux.just(1).flatMap(Flux::just);",
" Flux.just(1).flatMapSequential(Flux::just);",
" }",
"}")
.addOutputLines(
"out/A.java",
"import reactor.core.publisher.Flux;",
"",
"class A {",
" private static final int MAX_CONCURRENCY = 8;",
"",
" void m() {",
" Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);",
" Flux.just(1).flatMapSequential(Flux::just, MAX_CONCURRENCY);",
" }",
"}")
.doTest();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.picnic.errorprone.bugpatterns;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.util.Optional;
Expand Down Expand Up @@ -45,6 +46,15 @@ ImmutableSet<Flux<Integer>> testFluxSwitchIfEmptyOfEmptyPublisher() {
Flux.just(1).switchIfEmpty(Mono.empty()), Flux.just(2).switchIfEmpty(Flux.empty()));
}

ImmutableSet<Flux<Integer>> testFluxConcatMap() {
return ImmutableSet.of(
Flux.just(1).flatMap(Mono::just, 1), Flux.just(2).flatMapSequential(Mono::just, 1));
}

Flux<Integer> testFluxConcatMapIterable() {
return Flux.just(1, 2).flatMapIterable(ImmutableList::of);
}

Flux<String> testMonoFlatMapToFlux() {
return Mono.just("foo").flatMapMany(s -> Mono.just(s + s));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.google.common.collect.MoreCollectors.toOptional;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.util.Optional;
Expand Down Expand Up @@ -46,6 +47,14 @@ ImmutableSet<Flux<Integer>> testFluxSwitchIfEmptyOfEmptyPublisher() {
return ImmutableSet.of(Flux.just(1), Flux.just(2));
}

ImmutableSet<Flux<Integer>> testFluxConcatMap() {
return ImmutableSet.of(Flux.just(1).concatMap(Mono::just), Flux.just(2).concatMap(Mono::just));
}

Flux<Integer> testFluxConcatMapIterable() {
return Flux.just(1, 2).concatMapIterable(ImmutableList::of);
}

Flux<String> testMonoFlatMapToFlux() {
return Mono.just("foo").flatMap(s -> Mono.just(s + s)).flux();
}
Expand Down