diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java
index c50395f194..2a6f5e9027 100644
--- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java
+++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java
@@ -1,5352 +1,5350 @@
-/*
- * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package reactor.core.publisher;
-
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.Spliterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.BiPredicate;
-import java.util.function.BooleanSupplier;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.LongConsumer;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.logging.Level;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import reactor.core.CorePublisher;
-import reactor.core.CoreSubscriber;
-import reactor.core.Disposable;
-import reactor.core.Exceptions;
-import reactor.core.Fuseable;
-import reactor.core.Scannable;
-import reactor.core.publisher.FluxOnAssembly.AssemblySnapshot;
-import reactor.core.publisher.FluxOnAssembly.CheckpointHeavySnapshot;
-import reactor.core.publisher.FluxOnAssembly.CheckpointLightSnapshot;
-import reactor.core.scheduler.Scheduler;
-import reactor.core.scheduler.Scheduler.Worker;
-import reactor.core.scheduler.Schedulers;
-import reactor.util.Logger;
-import reactor.util.Metrics;
-import reactor.util.annotation.Nullable;
-import reactor.util.concurrent.Queues;
-import reactor.util.context.Context;
-import reactor.util.context.ContextView;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuple3;
-import reactor.util.function.Tuple4;
-import reactor.util.function.Tuple5;
-import reactor.util.function.Tuple6;
-import reactor.util.function.Tuple7;
-import reactor.util.function.Tuple8;
-import reactor.util.function.Tuples;
-import reactor.core.observability.SignalListener;
-import reactor.core.observability.SignalListenerFactory;
-import reactor.util.retry.Retry;
-
-/**
- * A Reactive Streams {@link Publisher} with basic rx operators that emits at most one item via the
- * {@code onNext} signal then terminates with an {@code onComplete} signal (successful Mono,
- * with or without value), or only emits a single {@code onError} signal (failed Mono).
- *
- *
Most Mono implementations are expected to immediately call {@link Subscriber#onComplete()}
- * after having called {@link Subscriber#onNext(T)}. {@link Mono#never() Mono.never()} is an outlier: it doesn't
- * emit any signal, which is not technically forbidden although not terribly useful outside
- * of tests. On the other hand, a combination of {@code onNext} and {@code onError} is explicitly forbidden.
- *
- *
- * The recommended way to learn about the {@link Mono} API and discover new operators is
- * through the reference documentation, rather than through this javadoc (as opposed to
- * learning more about individual operators). See the
- * "which operator do I need?" appendix.
- *
- *
- *
- *
- *
- *
The rx operators will offer aliases for input {@link Mono} type to preserve the "at most one"
- * property of the resulting {@link Mono}. For instance {@link Mono#flatMap flatMap} returns a
- * {@link Mono}, while there is a {@link Mono#flatMapMany flatMapMany} alias with possibly more than
- * 1 emission.
- *
- *
{@code Mono} should be used for {@link Publisher} that just completes without any value.
- *
- * It is intended to be used in implementations and return types, input parameters should keep
- * using raw {@link Publisher} as much as possible.
- *
- *
Note that using state in the {@code java.util.function} / lambdas used within Mono operators
- * should be avoided, as these may be shared between several {@link Subscriber Subscribers}.
- *
- * @param the type of the single value of this class
- * @author Sebastien Deleuze
- * @author Stephane Maldini
- * @author David Karnok
- * @author Simon Baslé
- * @see Flux
- */
-public abstract class Mono implements CorePublisher {
-
-// ==============================================================================================================
-// Static Generators
-// ==============================================================================================================
-
- /**
- * Creates a deferred emitter that can be used with callback-based
- * APIs to signal at most one value, a complete or an error signal.
- *
- *
- *
- * Bridging legacy API involves mostly boilerplate code due to the lack
- * of standard types and methods. There are two kinds of API surfaces:
- * 1) addListener/removeListener and 2) callback-handler.
- *
- * 1) addListener/removeListener pairs
- * To work with such API one has to instantiate the listener,
- * call the sink from the listener then register it with the source:
- *
- * Mono.<String>create(sink -> {
- * HttpListener listener = event -> {
- * if (event.getResponseCode() >= 400) {
- * sink.error(new RuntimeException("Failed"));
- * } else {
- * String body = event.getBody();
- * if (body.isEmpty()) {
- * sink.success();
- * } else {
- * sink.success(body.toLowerCase());
- * }
- * }
- * };
- *
- * client.addListener(listener);
- *
- * sink.onDispose(() -> client.removeListener(listener));
- * });
- *
- * Note that this works only with single-value emitting listeners. Otherwise,
- * all subsequent signals are dropped. You may have to add {@code client.removeListener(this);}
- * to the listener's body.
- *
- * 2) callback handler
- * This requires a similar instantiation pattern such as above, but usually the
- * successful completion and error are separated into different methods.
- * In addition, the legacy API may or may not support some cancellation mechanism.
- *
- * Mono.<String>create(sink -> {
- * Callback<String> callback = new Callback<String>() {
- * @Override
- * public void onResult(String data) {
- * sink.success(data.toLowerCase());
- * }
- *
- * @Override
- * public void onError(Exception e) {
- * sink.error(e);
- * }
- * }
- *
- * // without cancellation support:
- *
- * client.call("query", callback);
- *
- * // with cancellation support:
- *
- * AutoCloseable cancel = client.call("query", callback);
- * sink.onDispose(() -> {
- * try {
- * cancel.close();
- * } catch (Exception ex) {
- * Exceptions.onErrorDropped(ex);
- * }
- * });
- * });
- *
- * @param callback Consume the {@link MonoSink} provided per-subscriber by Reactor to generate signals.
- * @param The type of the value emitted
- * @return a {@link Mono}
- */
- public static Mono create(Consumer> callback) {
- return onAssembly(new MonoCreate<>(callback));
- }
-
- /**
- * Create a {@link Mono} provider that will {@link Supplier#get supply} a target {@link Mono} to subscribe to for
- * each {@link Subscriber} downstream.
- *
- *
- *
- *
- * @param supplier a {@link Mono} factory
- * @param the element type of the returned Mono instance
- * @return a deferred {@link Mono}
- * @see #deferContextual(Function)
- */
- public static Mono defer(Supplier extends Mono extends T>> supplier) {
- return onAssembly(new MonoDefer<>(supplier));
- }
-
- /**
- * Create a {@link Mono} provider that will {@link Function#apply supply} a target {@link Mono}
- * to subscribe to for each {@link Subscriber} downstream.
- * This operator behaves the same way as {@link #defer(Supplier)},
- * but accepts a {@link Function} that will receive the current {@link ContextView} as an argument.
- *
- *
- *
- *
- * @param contextualMonoFactory a {@link Mono} factory
- * @param the element type of the returned Mono instance
- * @return a deferred {@link Mono} deriving actual {@link Mono} from context values for each subscription
- */
- public static Mono deferContextual(Function> contextualMonoFactory) {
- return onAssembly(new MonoDeferContextual<>(contextualMonoFactory));
- }
-
- /**
- * Create a Mono which delays an onNext signal by a given {@link Duration duration}
- * on a default Scheduler and completes.
- * If the demand cannot be produced in time, an onError will be signalled instead.
- * The delay is introduced through the {@link Schedulers#parallel() parallel} default Scheduler.
- *
- *
- *
- *
- * @param duration the duration of the delay
- *
- * @return a new {@link Mono}
- */
- public static Mono delay(Duration duration) {
- return delay(duration, Schedulers.parallel());
- }
-
- /**
- * Create a Mono which delays an onNext signal by a given {@link Duration duration}
- * on a provided {@link Scheduler} and completes.
- * If the demand cannot be produced in time, an onError will be signalled instead.
- *
- *
- *
- *
- * @param duration the {@link Duration} of the delay
- * @param timer a time-capable {@link Scheduler} instance to run on
- *
- * @return a new {@link Mono}
- */
- public static Mono delay(Duration duration, Scheduler timer) {
- return onAssembly(new MonoDelay(duration.toNanos(), TimeUnit.NANOSECONDS, timer));
- }
-
- /**
- * Create a {@link Mono} that completes without emitting any item.
- *
- *
- *
- *
- * @param the reified {@link Subscriber} type
- *
- * @return a completed {@link Mono}
- */
- public static Mono empty() {
- return MonoEmpty.instance();
- }
-
- /**
- * Create a {@link Mono} that terminates with the specified error immediately after
- * being subscribed to.
- *
- *
- *
- * @param error the onError signal
- * @param the reified {@link Subscriber} type
- *
- * @return a failing {@link Mono}
- */
- public static Mono error(Throwable error) {
- return onAssembly(new MonoError<>(error));
- }
-
- /**
- * Create a {@link Mono} that terminates with an error immediately after being
- * subscribed to. The {@link Throwable} is generated by a {@link Supplier}, invoked
- * each time there is a subscription and allowing for lazy instantiation.
- *
- *
- *
- * @param errorSupplier the error signal {@link Supplier} to invoke for each {@link Subscriber}
- * @param the reified {@link Subscriber} type
- *
- * @return a failing {@link Mono}
- */
- public static Mono error(Supplier extends Throwable> errorSupplier) {
- return onAssembly(new MonoErrorSupplied<>(errorSupplier));
- }
-
- /**
- * Pick the first {@link Mono} to emit any signal (value, empty completion or error)
- * and replay that signal, effectively behaving like the fastest of these competing
- * sources.
- *
- *
- *
- * @param monos The deferred monos to use.
- * @param The type of the function result.
- *
- * @return a new {@link Mono} behaving like the fastest of its sources.
- * @deprecated use {@link #firstWithSignal(Mono[])}. To be removed in reactor 3.5.
- */
- @SafeVarargs
- @Deprecated
- public static Mono first(Mono extends T>... monos) {
- return firstWithSignal(monos);
- }
-
- /**
- * Pick the first {@link Mono} to emit any signal (value, empty completion or error)
- * and replay that signal, effectively behaving like the fastest of these competing
- * sources.
- *
- *
- *
- * @param monos The deferred monos to use.
- * @param The type of the function result.
- *
- * @return a new {@link Mono} behaving like the fastest of its sources.
- * @deprecated use {@link #firstWithSignal(Iterable)}. To be removed in reactor 3.5.
- */
- @Deprecated
- public static Mono first(Iterable extends Mono extends T>> monos) {
- return firstWithSignal(monos);
- }
-
- /**
- * Pick the first {@link Mono} to emit any signal (value, empty completion or error)
- * and replay that signal, effectively behaving like the fastest of these competing
- * sources.
- *
- *
- *
- * @param monos The deferred monos to use.
- * @param The type of the function result.
- *
- * @return a new {@link Mono} behaving like the fastest of its sources.
- */
- @SafeVarargs
- public static Mono firstWithSignal(Mono extends T>... monos) {
- return onAssembly(new MonoFirstWithSignal<>(monos));
- }
-
- /**
- * Pick the first {@link Mono} to emit any signal (value, empty completion or error)
- * and replay that signal, effectively behaving like the fastest of these competing
- * sources.
- *
- *
- *
- * @param monos The deferred monos to use.
- * @param The type of the function result.
- *
- * @return a new {@link Mono} behaving like the fastest of its sources.
- */
- public static Mono firstWithSignal(Iterable extends Mono extends T>> monos) {
- return onAssembly(new MonoFirstWithSignal<>(monos));
- }
-
- /**
- * Pick the first {@link Mono} source to emit any value and replay that signal,
- * effectively behaving like the source that first emits an
- * {@link Subscriber#onNext(Object) onNext}.
- *
- *
- * Valued sources always "win" over an empty source (one that only emits onComplete)
- * or a failing source (one that only emits onError).
- *
- * When no source can provide a value, this operator fails with a {@link NoSuchElementException}
- * (provided there are at least two sources). This exception has a {@link Exceptions#multiple(Throwable...) composite}
- * as its {@link Throwable#getCause() cause} that can be used to inspect what went wrong with each source
- * (so the composite has as many elements as there are sources).
- *
- * Exceptions from failing sources are directly reflected in the composite at the index of the failing source.
- * For empty sources, a {@link NoSuchElementException} is added at their respective index.
- * One can use {@link Exceptions#unwrapMultiple(Throwable) Exceptions.unwrapMultiple(topLevel.getCause())}
- * to easily inspect these errors as a {@link List}.
- *
- * Note that like in {@link #firstWithSignal(Iterable)}, an infinite source can be problematic
- * if no other source emits onNext.
- *
- *
- *
- * @param monos An {@link Iterable} of the competing source monos
- * @param The type of the element in the sources and the resulting mono
- *
- * @return a new {@link Mono} behaving like the fastest of its sources
- */
- public static Mono firstWithValue(Iterable extends Mono extends T>> monos) {
- return onAssembly(new MonoFirstWithValue<>(monos));
- }
-
- /**
- * Pick the first {@link Mono} source to emit any value and replay that signal,
- * effectively behaving like the source that first emits an
- * {@link Subscriber#onNext(Object) onNext}.
- *
- * Valued sources always "win" over an empty source (one that only emits onComplete)
- * or a failing source (one that only emits onError).
- *
- * When no source can provide a value, this operator fails with a {@link NoSuchElementException}
- * (provided there are at least two sources). This exception has a {@link Exceptions#multiple(Throwable...) composite}
- * as its {@link Throwable#getCause() cause} that can be used to inspect what went wrong with each source
- * (so the composite has as many elements as there are sources).
- *
- * Exceptions from failing sources are directly reflected in the composite at the index of the failing source.
- * For empty sources, a {@link NoSuchElementException} is added at their respective index.
- * One can use {@link Exceptions#unwrapMultiple(Throwable) Exceptions.unwrapMultiple(topLevel.getCause())}
- * to easily inspect these errors as a {@link List}.
- *
- * Note that like in {@link #firstWithSignal(Mono[])}, an infinite source can be problematic
- * if no other source emits onNext.
- * In case the {@code first} source is already an array-based {@link #firstWithValue(Mono, Mono[])}
- * instance, nesting is avoided: a single new array-based instance is created with all the
- * sources from {@code first} plus all the {@code others} sources at the same level.
- *
- *
- *
- * @param first the first competing source {@link Mono}
- * @param others the other competing sources {@link Mono}
- * @param The type of the element in the sources and the resulting mono
- *
- * @return a new {@link Mono} behaving like the fastest of its sources
- */
- @SafeVarargs
- public static Mono firstWithValue(Mono extends T> first, Mono extends T>... others) {
- if (first instanceof MonoFirstWithValue) {
- @SuppressWarnings("unchecked")
- MonoFirstWithValue a = (MonoFirstWithValue) first;
- Mono result = a.firstValuedAdditionalSources(others);
- if (result != null) {
- return result;
- }
- }
- return onAssembly(new MonoFirstWithValue<>(first, others));
- }
-
- /**
- * Expose the specified {@link Publisher} with the {@link Mono} API, and ensure it will emit 0 or 1 item.
- * The source emitter will be cancelled on the first `onNext`.
- *
- *
- *
- * {@link Hooks#onEachOperator(String, Function)} and similar assembly hooks are applied
- * unless the source is already a {@link Mono} (including {@link Mono} that was decorated as a {@link Flux},
- * see {@link Flux#from(Publisher)}).
- *
- * @param source the {@link Publisher} source
- * @param the source type
- *
- * @return the next item emitted as a {@link Mono}
- */
- public static Mono from(Publisher extends T> source) {
- //some sources can be considered already assembled monos
- //all conversion methods (from, fromDirect, wrap) must accommodate for this
- if (source instanceof Mono) {
- @SuppressWarnings("unchecked")
- Mono casted = (Mono) source;
- return casted;
- }
- if (source instanceof FluxSourceMono
- || source instanceof FluxSourceMonoFuseable) {
- @SuppressWarnings("unchecked")
- FluxFromMonoOperator wrapper = (FluxFromMonoOperator) source;
- @SuppressWarnings("unchecked")
- Mono extracted = (Mono) wrapper.source;
- return extracted;
- }
-
- //we delegate to `wrap` and apply assembly hooks
- @SuppressWarnings("unchecked") Publisher downcasted = (Publisher) source;
- return onAssembly(wrap(downcasted, true));
- }
-
- /**
- * Create a {@link Mono} producing its value using the provided {@link Callable}. If
- * the Callable resolves to {@code null}, the resulting Mono completes empty.
- *
- *
- *
- *
- * @param supplier {@link Callable} that will produce the value
- * @param type of the expected value
- *
- * @return A {@link Mono}.
- */
- public static Mono fromCallable(Callable extends T> supplier) {
- return onAssembly(new MonoCallable<>(supplier));
- }
-
- /**
- * Create a {@link Mono}, producing its value using the provided {@link CompletionStage}.
- *
- *
- *
- *
- * If the completionStage is also a {@link Future}, cancelling the Mono will cancel the future.
- * Use {@link #fromFuture(CompletableFuture, boolean)} with {@code suppressCancellation} set to
- * {@code true} if you need to suppress cancellation propagation.
- *
- * @param completionStage {@link CompletionStage} that will produce a value (or a null to
- * complete immediately)
- * @param type of the expected value
- * @return A {@link Mono}.
- */
- public static Mono fromCompletionStage(CompletionStage extends T> completionStage) {
- return onAssembly(new MonoCompletionStage<>(completionStage, false));
- }
-
- /**
- * Create a {@link Mono} that wraps a lazily-supplied {@link CompletionStage} on subscription,
- * emitting the value produced by the {@link CompletionStage}.
- *
- *
- *
- *
- * If the completionStage is also a {@link Future}, cancelling the Mono will cancel the future.
- * Use {@link #fromFuture(CompletableFuture, boolean)} with {@code suppressCancellation} set to
- * {@code true} if you need to suppress cancellation propagation.
- *
- * @param stageSupplier The {@link Supplier} of a {@link CompletionStage} that will produce a value (or a null to
- * complete immediately). This allows lazy triggering of CompletionStage-based APIs.
- * @param type of the expected value
- * @return A {@link Mono}.
- */
- public static Mono fromCompletionStage(Supplier extends CompletionStage extends T>> stageSupplier) {
- return defer(() -> onAssembly(new MonoCompletionStage<>(stageSupplier.get(), false)));
- }
-
- /**
- * Convert a {@link Publisher} to a {@link Mono} without any cardinality check
- * (ie this method doesn't cancel the source past the first element).
- * Conversion transparently returns {@link Mono} sources without wrapping and otherwise
- * supports {@link Fuseable} sources.
- * Note this is an advanced interoperability operator that implies you know the
- * {@link Publisher} you are converting follows the {@link Mono} semantics and only
- * ever emits one element.
- *
- * {@link Hooks#onEachOperator(String, Function)} and similar assembly hooks are applied
- * unless the source is already a {@link Mono}.
- *
- * @param source the Mono-compatible {@link Publisher} to wrap
- * @param type of the value emitted by the publisher
- * @return a wrapped {@link Mono}
- */
- public static Mono fromDirect(Publisher extends I> source){
- //some sources can be considered already assembled monos
- //all conversion methods (from, fromDirect, wrap) must accommodate for this
- if(source instanceof Mono){
- @SuppressWarnings("unchecked")
- Mono m = (Mono)source;
- return m;
- }
- if (source instanceof FluxSourceMono
- || source instanceof FluxSourceMonoFuseable) {
- @SuppressWarnings("unchecked")
- FluxFromMonoOperator wrapper = (FluxFromMonoOperator) source;
- @SuppressWarnings("unchecked")
- Mono extracted = (Mono) wrapper.source;
- return extracted;
- }
-
- //we delegate to `wrap` and apply assembly hooks
- @SuppressWarnings("unchecked") Publisher downcasted = (Publisher) source;
- return onAssembly(wrap(downcasted, false));
- }
-
- /**
- * Create a {@link Mono}, producing its value using the provided {@link CompletableFuture}
- * and cancelling the future if the Mono gets cancelled.
- *
- *
- *
- *
- * Use {@link #fromFuture(CompletableFuture, boolean)} with {@code suppressCancellation} set to
- * {@code true} if you need to suppress cancellation propagation.
- *
- * @param future {@link CompletableFuture} that will produce a value (or a null to
- * complete immediately)
- * @param type of the expected value
- * @return A {@link Mono}.
- * @see #fromCompletionStage(CompletionStage) fromCompletionStage for a generalization
- */
- public static Mono fromFuture(CompletableFuture extends T> future) {
- return fromFuture(future, false);
- }
-
- /**
- * Create a {@link Mono}, producing its value using the provided {@link CompletableFuture}
- * and optionally cancelling the future if the Mono gets cancelled (if {@code suppressCancel == false}).
- *
- *
- *
- *
- *
- * @param future {@link CompletableFuture} that will produce a value (or a null to complete immediately)
- * @param suppressCancel {@code true} to prevent cancellation of the future when the Mono is cancelled,
- * {@code false} otherwise (the default)
- * @param type of the expected value
- * @return A {@link Mono}.
- */
- public static Mono fromFuture(CompletableFuture extends T> future, boolean suppressCancel) {
- return onAssembly(new MonoCompletionStage<>(future, suppressCancel));
- }
-
- /**
- * Create a {@link Mono} that wraps a lazily-supplied {@link CompletableFuture} on subscription,
- * emitting the value produced by the future and cancelling the future if the Mono gets cancelled.
- *
- *
- *
- *
- *
- * @param futureSupplier The {@link Supplier} of a {@link CompletableFuture} that will produce a value
- * (or a null to complete immediately). This allows lazy triggering of future-based APIs.
- * @param type of the expected value
- * @return A {@link Mono}.
- * @see #fromCompletionStage(Supplier) fromCompletionStage for a generalization
- */
- public static Mono fromFuture(Supplier extends CompletableFuture extends T>> futureSupplier) {
- return fromFuture(futureSupplier, false);
- }
-
- /**
- * Create a {@link Mono} that wraps a lazily-supplied {@link CompletableFuture} on subscription,
- * emitting the value produced by the future and optionally cancelling the future if the Mono gets cancelled
- * (if {@code suppressCancel == false}).
- *
- *
- *
- *
- *
- * @param futureSupplier The {@link Supplier} of a {@link CompletableFuture} that will produce a value
- * (or a null to complete immediately). This allows lazy triggering of future-based APIs.
- * @param suppressCancel {@code true} to prevent cancellation of the future when the Mono is cancelled,
- * {@code false} otherwise (the default)
- * @param type of the expected value
- * @return A {@link Mono}.
- * @see #fromCompletionStage(Supplier) fromCompletionStage for a generalization
- */
- public static Mono fromFuture(Supplier extends CompletableFuture extends T>> futureSupplier, boolean suppressCancel) {
- return defer(() -> onAssembly(new MonoCompletionStage<>(futureSupplier.get(), suppressCancel)));
- }
-
- /**
- * Create a {@link Mono} that completes empty once the provided {@link Runnable} has
- * been executed.
- *
- *
- *
- *
- * @param runnable {@link Runnable} that will be executed before emitting the completion signal
- *
- * @param The generic type of the upstream, which is preserved by this operator
- * @return A {@link Mono}.
- */
- public static Mono fromRunnable(Runnable runnable) {
- return onAssembly(new MonoRunnable<>(runnable));
- }
-
- /**
- * Create a {@link Mono}, producing its value using the provided {@link Supplier}. If
- * the Supplier resolves to {@code null}, the resulting Mono completes empty.
- *
- *
- *
- *
- * @param supplier {@link Supplier} that will produce the value
- * @param type of the expected value
- *
- * @return A {@link Mono}.
- */
- public static Mono fromSupplier(Supplier extends T> supplier) {
- return onAssembly(new MonoSupplier<>(supplier));
- }
-
-
- /**
- * Create a new {@link Mono} that ignores elements from the source (dropping them),
- * but completes when the source completes.
- *
- *
- *
- *
- *
- *
Discard Support: This operator discards the element from the source.
- *
- * @param source the {@link Publisher} to ignore
- * @param the source type of the ignored data
- *
- * @return a new completable {@link Mono}.
- */
- public static Mono ignoreElements(Publisher source) {
- return onAssembly(new MonoIgnorePublisher<>(source));
- }
-
- /**
- * Create a new {@link Mono} that emits the specified item, which is captured at
- * instantiation time.
- *
- *
- *
- *
- * @param data the only item to onNext
- * @param the type of the produced item
- *
- * @return a {@link Mono}.
- */
- public static Mono just(T data) {
- return onAssembly(new MonoJust<>(data));
- }
-
- /**
- * Create a new {@link Mono} that emits the specified item if {@link Optional#isPresent()} otherwise only emits
- * onComplete.
- *
- *
- *
- *
- * @param data the {@link Optional} item to onNext or onComplete if not present
- * @param the type of the produced item
- *
- * @return a {@link Mono}.
- */
- public static Mono justOrEmpty(@Nullable Optional extends T> data) {
- return data != null && data.isPresent() ? just(data.get()) : empty();
- }
-
- /**
- * Create a new {@link Mono} that emits the specified item if non null otherwise only emits
- * onComplete.
- *
- *
- *
- *
- * @param data the item to onNext or onComplete if null
- * @param the type of the produced item
- *
- * @return a {@link Mono}.
- */
- public static Mono justOrEmpty(@Nullable T data) {
- return data != null ? just(data) : empty();
- }
-
-
- /**
- * Return a {@link Mono} that will never signal any data, error or completion signal,
- * essentially running indefinitely.
- *
- *
- *
- * @param the {@link Subscriber} type target
- *
- * @return a never completing {@link Mono}
- */
- public static Mono never() {
- return MonoNever.instance();
- }
-
- /**
- * Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
- * same by comparing the items emitted by each Publisher pairwise.
- *
- *
- *
- * @param source1 the first Publisher to compare
- * @param source2 the second Publisher to compare
- * @param the type of items emitted by each Publisher
- * @return a Mono that emits a Boolean value that indicates whether the two sequences are the same
- */
- public static Mono sequenceEqual(Publisher extends T> source1, Publisher extends T> source2) {
- return sequenceEqual(source1, source2, equalsBiPredicate(), Queues.SMALL_BUFFER_SIZE);
- }
-
- /**
- * Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
- * same by comparing the items emitted by each Publisher pairwise based on the results of a specified
- * equality function.
- *
- *
- *
- * @param source1 the first Publisher to compare
- * @param source2 the second Publisher to compare
- * @param isEqual a function used to compare items emitted by each Publisher
- * @param the type of items emitted by each Publisher
- * @return a Mono that emits a Boolean value that indicates whether the two Publisher two sequences
- * are the same according to the specified function
- */
- public static Mono sequenceEqual(Publisher extends T> source1, Publisher extends T> source2,
- BiPredicate super T, ? super T> isEqual) {
- return sequenceEqual(source1, source2, isEqual, Queues.SMALL_BUFFER_SIZE);
- }
-
- /**
- * Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
- * same by comparing the items emitted by each Publisher pairwise based on the results of a specified
- * equality function.
- *
- *
- *
- * @param source1 the first Publisher to compare
- * @param source2 the second Publisher to compare
- * @param isEqual a function used to compare items emitted by each Publisher
- * @param prefetch the number of items to prefetch from the first and second source Publisher
- * @param the type of items emitted by each Publisher
- * @return a Mono that emits a Boolean value that indicates whether the two Publisher two sequences
- * are the same according to the specified function
- */
- public static Mono sequenceEqual(Publisher extends T> source1,
- Publisher extends T> source2,
- BiPredicate super T, ? super T> isEqual, int prefetch) {
- return onAssembly(new MonoSequenceEqual<>(source1, source2, isEqual, prefetch));
- }
-
- /**
- * Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
- * Mono derived from the same resource and makes sure the resource is released if the
- * sequence terminates or the Subscriber cancels.
- *
- *
- * - For eager cleanup, unlike in {@link Flux#using(Callable, Function, Consumer, boolean) Flux},
- * in the case of a valued {@link Mono} the cleanup happens just before passing the value to downstream.
- * In all cases, exceptions raised by the eager cleanup {@link Consumer} may override the terminal event,
- * discarding the element if the derived {@link Mono} was valued.
- * - Non-eager cleanup will drop any exception.
- *
- *
- *
- *
- * @param resourceSupplier a {@link Callable} that is called on subscribe to create the resource
- * @param sourceSupplier a {@link Mono} factory to create the Mono depending on the created resource
- * @param resourceCleanup invoked on completion to clean-up the resource
- * @param eager set to true to clean before any signal (including onNext) is passed downstream
- * @param emitted type
- * @param resource type
- *
- * @return new {@link Mono}
- */
- public static Mono using(Callable extends D> resourceSupplier,
- Function super D, ? extends Mono extends T>> sourceSupplier,
- Consumer super D> resourceCleanup,
- boolean eager) {
- return onAssembly(new MonoUsing<>(resourceSupplier, sourceSupplier,
- resourceCleanup, eager));
- }
-
- /**
- * Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
- * Mono derived from the same resource and makes sure the resource is released if the
- * sequence terminates or the Subscriber cancels.
- *
- * Unlike in {@link Flux#using(Callable, Function, Consumer) Flux}, in the case of a valued {@link Mono} the cleanup
- * happens just before passing the value to downstream. In all cases, exceptions raised by the cleanup
- * {@link Consumer} may override the terminal event, discarding the element if the derived {@link Mono} was valued.
- *
- *
- *
- * @param resourceSupplier a {@link Callable} that is called on subscribe to create the resource
- * @param sourceSupplier a {@link Mono} factory to create the Mono depending on the created resource
- * @param resourceCleanup invoked on completion to clean-up the resource
- * @param emitted type
- * @param resource type
- *
- * @return new {@link Mono}
- */
- public static Mono using(Callable extends D> resourceSupplier,
- Function super D, ? extends Mono extends T>> sourceSupplier,
- Consumer super D> resourceCleanup) {
- return using(resourceSupplier, sourceSupplier, resourceCleanup, true);
- }
-
-
- /**
- * Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber},
- * to derive a {@link Mono}. Note that all steps of the operator chain that would need the
- * resource to be in an open stable state need to be described inside the {@code resourceClosure}
- * {@link Function}.
- *
- * Unlike in {@link Flux#usingWhen(Publisher, Function, Function) the Flux counterpart}, ALL signals are deferred
- * until the {@link Mono} terminates and the relevant {@link Function} generates and invokes a "cleanup"
- * {@link Publisher}. This is because a failure in the cleanup Publisher
- * must result in a lone {@code onError} signal in the downstream {@link Mono} (any potential value in the
- * derived {@link Mono} is discarded). Here are the various scenarios that can play out:
- *
- * - empty Mono, asyncCleanup ends with {@code onComplete()}: downstream receives {@code onComplete()}
- * - empty Mono, asyncCleanup ends with {@code onError(t)}: downstream receives {@code onError(t)}
- * - valued Mono, asyncCleanup ends with {@code onComplete()}: downstream receives {@code onNext(value),onComplete()}
- * - valued Mono, asyncCleanup ends with {@code onError(t)}: downstream receives {@code onError(t)}, {@code value} is discarded
- * - error(e) Mono, asyncCleanup ends with {@code onComplete()}: downstream receives {@code onError(e)}
- * - error(e) Mono, asyncCleanup ends with {@code onError(t)}: downstream receives {@code onError(t)}, t suppressing e
- *
- *
- *
- *
- * Note that if the resource supplying {@link Publisher} emits more than one resource, the
- * subsequent resources are dropped ({@link Operators#onNextDropped(Object, Context)}). If
- * the publisher errors AFTER having emitted one resource, the error is also silently dropped
- * ({@link Operators#onErrorDropped(Throwable, Context)}).
- * An empty completion or error without at least one onNext signal (no resource supplied)
- * triggers a short-circuit of the main sequence with the same terminal signal
- * (no cleanup is invoked).
- *
- *
Discard Support: This operator discards any source element if the {@code asyncCleanup} handler fails.
- *
- * @param resourceSupplier a {@link Publisher} that "generates" the resource,
- * subscribed for each subscription to the main sequence
- * @param resourceClosure a factory to derive a {@link Mono} from the supplied resource
- * @param asyncCleanup an asynchronous resource cleanup invoked when the resource
- * closure terminates (with onComplete, onError or cancel)
- * @param the type of elements emitted by the resource closure, and thus the main sequence
- * @param the type of the resource object
- *
- * @return a new {@link Mono} built around a "transactional" resource, with deferred emission until the
- * asynchronous cleanup sequence completes
- */
- public static Mono usingWhen(Publisher resourceSupplier,
- Function super D, ? extends Mono extends T>> resourceClosure,
- Function super D, ? extends Publisher>> asyncCleanup) {
- return usingWhen(resourceSupplier, resourceClosure, asyncCleanup,
- (res, error) -> asyncCleanup.apply(res),
- asyncCleanup);
- }
-
- /**
- * Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber},
- * to derive a {@link Mono}.Note that all steps of the operator chain that would need the
- * resource to be in an open stable state need to be described inside the {@code resourceClosure}
- * {@link Function}.
- *
- * Unlike in {@link Flux#usingWhen(Publisher, Function, Function, BiFunction, Function) the Flux counterpart},
- * ALL signals are deferred until the {@link Mono} terminates and the relevant {@link Function}
- * generates and invokes a "cleanup" {@link Publisher}. This is because a failure in the cleanup Publisher
- * must result in a lone {@code onError} signal in the downstream {@link Mono} (any potential value in the
- * derived {@link Mono} is discarded). Here are the various scenarios that can play out:
- *
- * - empty Mono, asyncComplete ends with {@code onComplete()}: downstream receives {@code onComplete()}
- * - empty Mono, asyncComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}
- * - valued Mono, asyncComplete ends with {@code onComplete()}: downstream receives {@code onNext(value),onComplete()}
- * - valued Mono, asyncComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}, {@code value} is discarded
- * - error(e) Mono, errorComplete ends with {@code onComplete()}: downstream receives {@code onError(e)}
- * - error(e) Mono, errorComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}, t suppressing e
- *
- *
- *
- *
- * Individual cleanups can also be associated with mono cancellation and
- * error terminations:
- *
- *
- *
- * Note that if the resource supplying {@link Publisher} emits more than one resource, the
- * subsequent resources are dropped ({@link Operators#onNextDropped(Object, Context)}). If
- * the publisher errors AFTER having emitted one resource, the error is also silently dropped
- * ({@link Operators#onErrorDropped(Throwable, Context)}).
- * An empty completion or error without at least one onNext signal (no resource supplied)
- * triggers a short-circuit of the main sequence with the same terminal signal
- * (no cleanup is invoked).
- *
- *
Discard Support: This operator discards the element if the {@code asyncComplete} handler fails.
- *
- * @param resourceSupplier a {@link Publisher} that "generates" the resource,
- * subscribed for each subscription to the main sequence
- * @param resourceClosure a factory to derive a {@link Mono} from the supplied resource
- * @param asyncComplete an asynchronous resource cleanup invoked if the resource closure terminates with onComplete
- * @param asyncError an asynchronous resource cleanup invoked if the resource closure terminates with onError.
- * The terminating error is provided to the {@link BiFunction}
- * @param asyncCancel an asynchronous resource cleanup invoked if the resource closure is cancelled.
- * When {@code null}, the {@code asyncComplete} path is used instead.
- * @param the type of elements emitted by the resource closure, and thus the main sequence
- * @param the type of the resource object
- *
- * @return a new {@link Mono} built around a "transactional" resource, with several
- * termination path triggering asynchronous cleanup sequences
- *
- */
- public static Mono usingWhen(Publisher resourceSupplier,
- Function super D, ? extends Mono extends T>> resourceClosure,
- Function super D, ? extends Publisher>> asyncComplete,
- BiFunction super D, ? super Throwable, ? extends Publisher>> asyncError,
- //the operator itself accepts null for asyncCancel, but we won't in the public API
- Function super D, ? extends Publisher>> asyncCancel) {
- return onAssembly(new MonoUsingWhen<>(resourceSupplier, resourceClosure,
- asyncComplete, asyncError, asyncCancel));
- }
-
- /**
- * Aggregate given publishers into a new {@literal Mono} that will be fulfilled
- * when all of the given {@literal sources} have completed. An error will cause
- * pending results to be cancelled and immediate error emission to the returned {@link Mono}.
- *
- *
- *
- * @param sources The sources to use.
- *
- * @return a {@link Mono}.
- */
- public static Mono when(Publisher>... sources) {
- if (sources.length == 0) {
- return empty();
- }
- if (sources.length == 1) {
- return empty(sources[0]);
- }
- return onAssembly(new MonoWhen(false, sources));
- }
-
-
- /**
- * Aggregate given publishers into a new {@literal Mono} that will be
- * fulfilled when all of the given {@literal Publishers} have completed.
- * An error will cause pending results to be cancelled and immediate error emission
- * to the returned {@link Mono}.
- *
- *
- *
- *
- *
- * @param sources The sources to use.
- *
- * @return a {@link Mono}.
- */
- public static Mono when(final Iterable extends Publisher>> sources) {
- return onAssembly(new MonoWhen(false, sources));
- }
-
- /**
- * Aggregate given publishers into a new {@literal Mono} that will be
- * fulfilled when all of the given {@literal sources} have completed. Errors from
- * the sources are delayed.
- * If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception).
- *
- *
- *
- *
- *
- * @param sources The sources to use.
- *
- * @return a {@link Mono}.
- */
- public static Mono whenDelayError(final Iterable extends Publisher>> sources) {
- return onAssembly(new MonoWhen(true, sources));
- }
-
- /**
- * Merge given publishers into a new {@literal Mono} that will be fulfilled when
- * all of the given {@literal sources} have completed. Errors from the sources are delayed.
- * If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception).
- *
- *
- *
- *
- * @param sources The sources to use.
- *
- * @return a {@link Mono}.
- */
- public static Mono whenDelayError(Publisher>... sources) {
- if (sources.length == 0) {
- return empty();
- }
- if (sources.length == 1) {
- return empty(sources[0]);
- }
- return onAssembly(new MonoWhen(true, sources));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values into a {@link Tuple2}.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- *
- * @return a {@link Mono}.
- */
- public static Mono> zip(Mono extends T1> p1, Mono extends T2> p2) {
- return zip(p1, p2, Flux.tuple2Function());
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values as defined by the combinator function.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param combinator a {@link BiFunction} combinator function when both sources
- * complete
- * @param type of the value from p1
- * @param type of the value from p2
- * @param output value
- *
- * @return a {@link Mono}.
- */
- public static Mono zip(Mono extends T1> p1, Mono
- extends T2> p2, BiFunction super T1, ? super T2, ? extends O> combinator) {
- return onAssembly(new MonoZip(false, p1, p2, combinator));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values into a {@link Tuple3}.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param p3 The third upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- * @param type of the value from p3
- *
- * @return a {@link Mono}.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Mono> zip(Mono extends T1> p1, Mono extends T2> p2, Mono extends T3> p3) {
- return onAssembly(new MonoZip(false, a -> Tuples.fromArray((Object[])a), p1, p2, p3));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values into a {@link Tuple4}.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param p3 The third upstream {@link Publisher} to subscribe to.
- * @param p4 The fourth upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- * @param type of the value from p3
- * @param type of the value from p4
- *
- * @return a {@link Mono}.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Mono> zip(Mono extends T1> p1,
- Mono extends T2> p2,
- Mono extends T3> p3,
- Mono extends T4> p4) {
- return onAssembly(new MonoZip(false, a -> Tuples.fromArray((Object[])a), p1, p2, p3, p4));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values into a {@link Tuple5}.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param p3 The third upstream {@link Publisher} to subscribe to.
- * @param p4 The fourth upstream {@link Publisher} to subscribe to.
- * @param p5 The fifth upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- * @param type of the value from p3
- * @param type of the value from p4
- * @param type of the value from p5
- *
- * @return a {@link Mono}.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Mono> zip(Mono extends T1> p1,
- Mono extends T2> p2,
- Mono extends T3> p3,
- Mono extends T4> p4,
- Mono extends T5> p5) {
- return onAssembly(new MonoZip(false, a -> Tuples.fromArray((Object[])a), p1, p2, p3, p4, p5));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values into a {@link Tuple6}.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param p3 The third upstream {@link Publisher} to subscribe to.
- * @param p4 The fourth upstream {@link Publisher} to subscribe to.
- * @param p5 The fifth upstream {@link Publisher} to subscribe to.
- * @param p6 The sixth upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- * @param type of the value from p3
- * @param type of the value from p4
- * @param type of the value from p5
- * @param type of the value from p6
- *
- * @return a {@link Mono}.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Mono> zip(Mono extends T1> p1,
- Mono extends T2> p2,
- Mono extends T3> p3,
- Mono extends T4> p4,
- Mono extends T5> p5,
- Mono extends T6> p6) {
- return onAssembly(new MonoZip(false, a -> Tuples.fromArray((Object[])a), p1, p2, p3, p4, p5, p6));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values into a {@link Tuple7}.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param p3 The third upstream {@link Publisher} to subscribe to.
- * @param p4 The fourth upstream {@link Publisher} to subscribe to.
- * @param p5 The fifth upstream {@link Publisher} to subscribe to.
- * @param p6 The sixth upstream {@link Publisher} to subscribe to.
- * @param p7 The seventh upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- * @param type of the value from p3
- * @param type of the value from p4
- * @param type of the value from p5
- * @param type of the value from p6
- * @param type of the value from p7
- *
- * @return a {@link Mono}.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Mono> zip(Mono extends T1> p1,
- Mono extends T2> p2,
- Mono extends T3> p3,
- Mono extends T4> p4,
- Mono extends T5> p5,
- Mono extends T6> p6,
- Mono extends T7> p7) {
- return onAssembly(new MonoZip(false, a -> Tuples.fromArray((Object[])a), p1, p2, p3, p4, p5, p6, p7));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values into a {@link Tuple8}.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param p3 The third upstream {@link Publisher} to subscribe to.
- * @param p4 The fourth upstream {@link Publisher} to subscribe to.
- * @param p5 The fifth upstream {@link Publisher} to subscribe to.
- * @param p6 The sixth upstream {@link Publisher} to subscribe to.
- * @param p7 The seventh upstream {@link Publisher} to subscribe to.
- * @param p8 The eight upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- * @param type of the value from p3
- * @param type of the value from p4
- * @param type of the value from p5
- * @param type of the value from p6
- * @param type of the value from p7
- * @param type of the value from p8
- *
- * @return a {@link Mono}.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Mono> zip(Mono extends T1> p1,
- Mono extends T2> p2,
- Mono extends T3> p3,
- Mono extends T4> p4,
- Mono extends T5> p5,
- Mono extends T6> p6,
- Mono extends T7> p7,
- Mono extends T8> p8) {
- return onAssembly(new MonoZip(false, a -> Tuples.fromArray((Object[])a), p1, p2, p3, p4, p5, p6, p7, p8));
- }
-
- /**
- * Aggregate given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal
- * Monos} have produced an item, aggregating their values according to the provided combinator function.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- *
- *
- * @param monos The monos to use.
- * @param combinator the function to transform the combined array into an arbitrary
- * object.
- * @param the combined result
- *
- * @return a {@link Mono}.
- */
- public static Mono zip(final Iterable extends Mono>> monos, Function super Object[], ? extends R> combinator) {
- return onAssembly(new MonoZip<>(false, combinator, monos));
- }
-
- /**
- * Aggregate given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal
- * Monos} have produced an item, aggregating their values according to the provided combinator function.
- * An error or empty completion of any source will cause other sources
- * to be cancelled and the resulting Mono to immediately error or complete, respectively.
- *
- *
- *
- * @param monos The monos to use.
- * @param combinator the function to transform the combined array into an arbitrary
- * object.
- * @param the combined result
- *
- * @return a {@link Mono}.
- */
- public static Mono zip(Function super Object[], ? extends R> combinator, Mono>... monos) {
- if (monos.length == 0) {
- return empty();
- }
- if (monos.length == 1) {
- return monos[0].map(d -> combinator.apply(new Object[]{d}));
- }
- return onAssembly(new MonoZip<>(false, combinator, monos));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Monos}
- * have produced an item, aggregating their values into a {@link Tuple2} and delaying errors.
- * If a Mono source completes without value, the other source is run to completion then the
- * resulting {@link Mono} completes empty.
- * If both Monos error, the two exceptions are combined (as suppressed exceptions on a root exception).
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- *
- * @return a {@link Mono}.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Mono> zipDelayError(Mono extends T1> p1, Mono extends T2> p2) {
- return onAssembly(new MonoZip(true, a -> Tuples.fromArray((Object[])a), p1, p2));
- }
-
- /**
- * Merge given monos into a new {@literal Mono} that will be fulfilled when all of the given {@literal Mono Monos}
- * have produced an item, aggregating their values into a {@link Tuple3} and delaying errors.
- * If a Mono source completes without value, all other sources are run to completion then
- * the resulting {@link Mono} completes empty.
- * If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
- *
- *
- *
- *
- * @param p1 The first upstream {@link Publisher} to subscribe to.
- * @param p2 The second upstream {@link Publisher} to subscribe to.
- * @param p3 The third upstream {@link Publisher} to subscribe to.
- * @param type of the value from p1
- * @param type of the value from p2
- * @param type of the value from p3
- *
- * @return a {@link Mono}.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static