Skip to content

Commit

Permalink
Add .singleOptional() to Mono (#3317)
Browse files Browse the repository at this point in the history
Introducing a new operator for the `Mono` stack.

It converts source `Mono` signals of type `T` to `Optional<T>`.

In case of a value, it is wrapped. When the source completes without a value, an
empty `Optional` is delivered. Errors are simply propagated downstream.

Having a dedicated operator for this case improves performance greatly over a
combination of operators:
`.map(Optional::ofNullable).defaultIfEmpty(Optional.empty())`.
  • Loading branch information
AndreasHuber-CH authored Jan 18, 2023
1 parent 07ead40 commit 912441f
Show file tree
Hide file tree
Showing 7 changed files with 1,814 additions and 3 deletions.
33 changes: 32 additions & 1 deletion reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -4245,6 +4245,37 @@ public final Mono<T> single() {
return Mono.onAssembly(new MonoSingleMono<>(this));
}

/**
* Wrap the item produced by this {@link Mono} source into an Optional
* or emit an empty Optional for an empty source.
* <p>
* <img class="marble" src="doc-files/marbles/singleOptional.svg" alt="">
* <p>
*
* @return a {@link Mono} with an Optional containing the item, an empty optional or an error signal
*/
public final Mono<Optional<T>> singleOptional() {
if (this instanceof Callable) {
if (this instanceof Fuseable.ScalarCallable) {
@SuppressWarnings("unchecked")
Fuseable.ScalarCallable<T> scalarCallable = (Fuseable.ScalarCallable<T>) this;

T v;
try {
v = scalarCallable.call();
}
catch (Exception e) {
return Mono.error(Exceptions.unwrap(e));
}
return Mono.just(Optional.ofNullable(v));
}
@SuppressWarnings("unchecked")
Callable<T> thiz = (Callable<T>)this;
return Mono.onAssembly(new MonoSingleOptionalCallable<>(thiz));
}
return Mono.onAssembly(new MonoSingleOptional<>(this));
}

/**
* Subscribe to this {@link Mono} and request unbounded demand.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Optional;

import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Emits a single item from the source wrapped into an Optional, emits
* an empty Optional instead for empty source.
*
* @param <T> the value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class MonoSingleOptional<T> extends InternalMonoOperator<T, Optional<T>> {

MonoSingleOptional(Mono<? extends T> source) {
super(source);
}

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super Optional<T>> actual) {
return new MonoSingleOptional.SingleOptionalSubscriber<>(actual);
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}

static final class SingleOptionalSubscriber<T> extends Operators.MonoInnerProducerBase<Optional<T>> implements InnerConsumer<T> {

Subscription s;

boolean done;

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return super.scanUnsafe(key);
}

@Override
public Context currentContext() {
return actual().currentContext();
}

SingleOptionalSubscriber(CoreSubscriber<? super Optional<T>> actual) {
super(actual);
}

@Override
public void doOnRequest(long n) {
s.request(Long.MAX_VALUE);
}

@Override
public void doOnCancel() {
s.cancel();
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
actual().onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual().currentContext());
return;
}
done = true;
complete(Optional.of(t));
}

@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, actual().currentContext());
return;
}
done = true;
actual().onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
complete(Optional.empty());
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.util.annotation.Nullable;

import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;

/**
* Emits a single item from the source wrapped into an Optional, emits
* an empty Optional instead for empty source.
*
* @param <T> the value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class MonoSingleOptionalCallable<T> extends Mono<Optional<T>>
implements Callable<Optional<T>>, SourceProducer<Optional<T>> {

final Callable<? extends T> callable;

MonoSingleOptionalCallable(Callable<? extends T> source) {
this.callable = Objects.requireNonNull(source, "source");
}

@Override
public void subscribe(CoreSubscriber<? super Optional<T>> actual) {
Operators.MonoInnerProducerBase<Optional<T>>
sds = new Operators.MonoInnerProducerBase<>(actual);

actual.onSubscribe(sds);

if (sds.isCancelled()) {
return;
}

try {
T t = callable.call();
sds.complete(Optional.ofNullable(t));
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(e, actual.currentContext()));
}

}

@Override
public Optional<T> block() {
//duration is ignored below
return block(Duration.ZERO);
}

@Override
public Optional<T> block(Duration m) {
final T v;

try {
v = callable.call();
}
catch (Throwable e) {
throw Exceptions.propagate(e);
}

return Optional.ofNullable(v);
}

@Override
public Optional<T> call() throws Exception {
final T v = callable.call();

return Optional.ofNullable(v);
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
}
}
Loading

0 comments on commit 912441f

Please sign in to comment.