Skip to content

Commit

Permalink
Introduce FluxFromStreamSupplier Refaster rule (#1261)
Browse files Browse the repository at this point in the history
  • Loading branch information
rickie authored Aug 7, 2024
1 parent 2eb4e85 commit 77d183f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -1204,10 +1205,17 @@ Flux<T> after(Flux<? extends Iterable<T>> flux, int prefetch) {
}

/** Prefer {@link Flux#fromIterable(Iterable)} over less efficient alternatives. */
// XXX: Once the `FluxFromStreamSupplier` rule is constrained using
// `@NotMatches(IsIdentityOperation.class)`, this rule should also cover
// `Flux.fromStream(collection.stream())`.
static final class FluxFromIterable<T> {
// XXX: Once the `MethodReferenceUsage` check is generally enabled, drop the second
// `Refaster.anyOf` variant.
@BeforeTemplate
Flux<T> before(Collection<T> collection) {
return Flux.fromStream(collection.stream());
return Flux.fromStream(
Refaster.<Supplier<Stream<? extends T>>>anyOf(
collection::stream, () -> collection.stream()));
}

@AfterTemplate
Expand Down Expand Up @@ -1913,7 +1921,7 @@ Duration after(StepVerifier.LastStep step, Duration duration) {

/**
* Prefer {@link Mono#fromFuture(Supplier)} over {@link Mono#fromFuture(CompletableFuture)}, as
* the former may defer initiation of the asynchornous computation until subscription.
* the former may defer initiation of the asynchronous computation until subscription.
*/
static final class MonoFromFutureSupplier<T> {
// XXX: Constrain the `future` parameter using `@NotMatches(IsIdentityOperation.class)` once
Expand All @@ -1932,7 +1940,7 @@ Mono<T> after(CompletableFuture<T> future) {
/**
* Prefer {@link Mono#fromFuture(Supplier, boolean)} over {@link
* Mono#fromFuture(CompletableFuture, boolean)}, as the former may defer initiation of the
* asynchornous computation until subscription.
* asynchronous computation until subscription.
*/
static final class MonoFromFutureSupplierBoolean<T> {
// XXX: Constrain the `future` parameter using `@NotMatches(IsIdentityOperation.class)` once
Expand All @@ -1947,4 +1955,23 @@ Mono<T> after(CompletableFuture<T> future, boolean suppressCancel) {
return Mono.fromFuture(() -> future, suppressCancel);
}
}

/**
* Prefer {@link Flux#fromStream(Supplier)} over {@link Flux#fromStream(Stream)}, as the former
* yields a {@link Flux} that is more likely to behave as expected when subscribed to more than
* once.
*/
static final class FluxFromStreamSupplier<T> {
// XXX: Constrain the `stream` parameter using `@NotMatches(IsIdentityOperation.class)` once
// `IsIdentityOperation` no longer matches nullary method invocations.
@BeforeTemplate
Flux<T> before(Stream<T> stream) {
return Flux.fromStream(stream);
}

@AfterTemplate
Flux<T> after(Stream<T> stream) {
return Flux.fromStream(() -> stream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.math.MathFlux;
Expand Down Expand Up @@ -434,8 +435,10 @@ ImmutableSet<Flux<String>> testConcatMapIterableIdentityWithPrefetch() {
Flux.just(ImmutableList.of("bar")).concatMap(Flux::fromIterable, 2));
}

Flux<String> testFluxFromIterable() {
return Flux.fromStream(ImmutableList.of("foo").stream());
ImmutableSet<Flux<String>> testFluxFromIterable() {
return ImmutableSet.of(
Flux.fromStream(ImmutableList.of("foo")::stream),
Flux.fromStream(() -> ImmutableList.of("bar").stream()));
}

ImmutableSet<Mono<Integer>> testFluxCountMapMathToIntExact() {
Expand Down Expand Up @@ -660,4 +663,8 @@ Mono<Void> testMonoFromFutureSupplier() {
Mono<Void> testMonoFromFutureSupplierBoolean() {
return Mono.fromFuture(CompletableFuture.completedFuture(null), true);
}

Flux<Integer> testFluxFromStreamSupplier() {
return Flux.fromStream(Stream.of(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.TupleUtils;
Expand Down Expand Up @@ -429,8 +430,9 @@ ImmutableSet<Flux<String>> testConcatMapIterableIdentityWithPrefetch() {
Flux.just(ImmutableList.of("bar")).concatMapIterable(identity(), 2));
}

Flux<String> testFluxFromIterable() {
return Flux.fromIterable(ImmutableList.of("foo"));
ImmutableSet<Flux<String>> testFluxFromIterable() {
return ImmutableSet.of(
Flux.fromIterable(ImmutableList.of("foo")), Flux.fromIterable(ImmutableList.of("bar")));
}

ImmutableSet<Mono<Integer>> testFluxCountMapMathToIntExact() {
Expand Down Expand Up @@ -641,4 +643,8 @@ Mono<Void> testMonoFromFutureSupplier() {
Mono<Void> testMonoFromFutureSupplierBoolean() {
return Mono.fromFuture(() -> CompletableFuture.completedFuture(null), true);
}

Flux<Integer> testFluxFromStreamSupplier() {
return Flux.fromStream(() -> Stream.of(1));
}
}

0 comments on commit 77d183f

Please sign in to comment.