Skip to content

Commit

Permalink
Introduce FluxFromStreamSupplier Refaster rule
Browse files Browse the repository at this point in the history
  • Loading branch information
rickie committed Aug 7, 2024
1 parent 2eb4e85 commit afbcbf1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -1913,7 +1914,7 @@ Duration after(StepVerifier.LastStep step, Duration duration) {

/**
* Prefer {@link Mono#fromFuture(Supplier)} over {@link Mono#fromFuture(CompletableFuture)}, as
* the former may defer initiation of the asynchornous computation until subscription.
* the former may defer initiation of the asynchronous computation until subscription.
*/
static final class MonoFromFutureSupplier<T> {
// XXX: Constrain the `future` parameter using `@NotMatches(IsIdentityOperation.class)` once
Expand Down Expand Up @@ -1947,4 +1948,20 @@ Mono<T> after(CompletableFuture<T> future, boolean suppressCancel) {
return Mono.fromFuture(() -> future, suppressCancel);
}
}

/**
* Prefer {@link Flux#fromStream(Supplier)} over {@link Flux#fromStream(Stream)}, as the former
* may defer initiation of the asynchronous computation until subscription.
*/
static final class FluxFromStreamSupplier<T> {
@BeforeTemplate
Flux<T> before(Stream<T> stream) {
return Flux.fromStream(stream);
}

@AfterTemplate
Flux<T> after(Stream<T> stream) {
return Flux.fromStream(() -> stream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.math.MathFlux;
Expand Down Expand Up @@ -660,4 +661,8 @@ Mono<Void> testMonoFromFutureSupplier() {
Mono<Void> testMonoFromFutureSupplierBoolean() {
return Mono.fromFuture(CompletableFuture.completedFuture(null), true);
}

Flux<Integer> testFluxFromStreamSupplier() {
return Flux.fromStream(Stream.of(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.random.RandomGenerator;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.TupleUtils;
Expand Down Expand Up @@ -641,4 +643,8 @@ Mono<Void> testMonoFromFutureSupplier() {
Mono<Void> testMonoFromFutureSupplierBoolean() {
return Mono.fromFuture(() -> CompletableFuture.completedFuture(null), true);
}

Flux<Integer> testFluxFromStreamSupplier() {
return Flux.fromStream(() -> RandomGenerator.StreamableGenerator.of(1));
}
}

0 comments on commit afbcbf1

Please sign in to comment.