From f481dced1bf9de28444ce6125fef5c2b54c228f3 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Fri, 9 Dec 2022 22:58:41 +0100 Subject: [PATCH 1/6] Add .singleOptional() to Mono and Flux Fixes #3311 Signed-off-by: Andreas Huber --- .../java/reactor/core/publisher/Flux.java | 31 ++ .../java/reactor/core/publisher/Mono.java | 31 ++ .../core/publisher/MonoSingleOptional.java | 150 +++++++++ .../publisher/MonoSingleOptionalCallable.java | 98 ++++++ .../publisher/MonoSingleOptionalMono.java | 47 +++ .../publisher/MonoSingleOptionalMonoTest.java | 85 +++++ .../publisher/MonoSingleOptionalTest.java | 290 ++++++++++++++++++ 7 files changed, 732 insertions(+) create mode 100644 reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java create mode 100644 reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java create mode 100644 reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalMono.java create mode 100644 reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalMonoTest.java create mode 100644 reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 43cdf159cf..9ff5acddde 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -8235,6 +8235,37 @@ public final Mono singleOrEmpty() { return Mono.onAssembly(new MonoSingle<>(this, null, true)); } + /** + * Expect a single item from this {@link Flux} source, and emit it wrapped into an Optional. + * Emit an empty Optional for an empty source but signal an {@link IndexOutOfBoundsException} for a source + * with more than one element. + *

+ * + * + * @return a {@link Mono} with an Optional containing the expected single item, an empty optional or an error + */ + public final Mono> singleOptional() { + if (this instanceof Callable) { + if (this instanceof Fuseable.ScalarCallable) { + @SuppressWarnings("unchecked") + Fuseable.ScalarCallable scalarCallable = (Fuseable.ScalarCallable) this; + + T v; + try { + v = scalarCallable.call(); + } + catch (Exception e) { + return Mono.error(Exceptions.unwrap(e)); + } + return Mono.just(Optional.ofNullable(v)); + } + @SuppressWarnings("unchecked") + Callable thiz = (Callable)this; + return Mono.onAssembly(new MonoSingleOptionalCallable<>(thiz)); + } + return Mono.onAssembly(new MonoSingleOptional<>(this)); + } + /** * Skip the specified number of elements from the beginning of this {@link Flux} then * emit the remaining elements. diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index 77cf41a2d8..b07161dc82 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -4245,6 +4245,37 @@ public final Mono single() { return Mono.onAssembly(new MonoSingleMono<>(this)); } + /** + * Wrap the item produced by this {@link Mono} source into an Optional + * or emit an empty Optional for an empty source. + *

+ * + *

+ * + * @return a {@link Mono} with an Optional containing the item, an empty optional or an error + */ + public final Mono> singleOptional() { + if (this instanceof Callable) { + if (this instanceof Fuseable.ScalarCallable) { + @SuppressWarnings("unchecked") + Fuseable.ScalarCallable scalarCallable = (Fuseable.ScalarCallable) this; + + T v; + try { + v = scalarCallable.call(); + } + catch (Exception e) { + return Mono.error(Exceptions.unwrap(e)); + } + return Mono.just(Optional.ofNullable(v)); + } + @SuppressWarnings("unchecked") + Callable thiz = (Callable)this; + return Mono.onAssembly(new MonoSingleOptionalCallable<>(thiz)); + } + return Mono.onAssembly(new MonoSingleOptionalMono<>(this)); + } + /** * Subscribe to this {@link Mono} and request unbounded demand. *

diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java new file mode 100644 index 0000000000..37773b6483 --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; + +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.util.annotation.Nullable; +import reactor.util.context.Context; + +/** + * Expects and emits a single item from the source wrapped into an Optional, emits + * an empty Optional instead for empty source or signals + * IndexOutOfBoundsException for a multi-item source. + * + * @param the value type + * @see Reactive-Streams-Commons + */ +final class MonoSingleOptional extends MonoFromFluxOperator> { + + MonoSingleOptional(Flux source) { + super(source); + } + + @Override + public CoreSubscriber subscribeOrReturn(CoreSubscriber> actual) { + return new SingleOptionalSubscriber<>(actual); + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; + return super.scanUnsafe(key); + } + + static final class SingleOptionalSubscriber extends Operators.MonoInnerProducerBase> implements InnerConsumer { + + Subscription s; + + int count; + + boolean done; + + @Override + @Nullable + public Object scanUnsafe(Attr key) { + if (key == Attr.TERMINATED) return done; + if (key == Attr.PARENT) return s; + if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; + + return super.scanUnsafe(key); + } + + @Override + public Context currentContext() { + return actual().currentContext(); + } + + SingleOptionalSubscriber(CoreSubscriber> actual) { + super(actual); + } + + @Override + public void doOnRequest(long n) { + s.request(Long.MAX_VALUE); + } + + @Override + public void doOnCancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + actual().onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (isCancelled()) { + //this helps differentiating a duplicate malformed signal "done" from a count > 1 "done" + Operators.onDiscard(t, actual().currentContext()); + return; + } + if (done) { + Operators.onNextDropped(t, actual().currentContext()); + return; + } + if (++count > 1) { + Operators.onDiscard(t, actual().currentContext()); + //mark as both cancelled and done + cancel(); + onError(new IndexOutOfBoundsException("Source emitted more than one item")); + } + else { + setValue(Optional.of(t)); + } + } + + @Override + public void onError(Throwable t) { + if (done) { + Operators.onErrorDropped(t, actual().currentContext()); + return; + } + done = true; + discardTheValue(); + + actual().onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + + int c = count; + if (c == 0) { + + complete(Optional.empty()); + } + else if (c == 1) { + complete(); + } + } + + } +} diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java new file mode 100644 index 0000000000..26d1ed649a --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import reactor.core.CoreSubscriber; +import reactor.core.Exceptions; +import reactor.util.annotation.Nullable; + +import java.time.Duration; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; + +/** + * Expects and emits a single item from the source Callable and warps it into an Optional, + * emits an empty Optional for empty source. + * + * @param the value type + * @see Reactive-Streams-Commons + */ +final class MonoSingleOptionalCallable extends Mono> + implements Callable>, SourceProducer> { + + final Callable callable; + + MonoSingleOptionalCallable(Callable source) { + this.callable = Objects.requireNonNull(source, "source"); + } + + @Override + public void subscribe(CoreSubscriber> actual) { + Operators.MonoInnerProducerBase> + sds = new Operators.MonoInnerProducerBase<>(actual); + + actual.onSubscribe(sds); + + if (sds.isCancelled()) { + return; + } + + try { + T t = callable.call(); + sds.complete(Optional.ofNullable(t)); + } + catch (Throwable e) { + actual.onError(Operators.onOperatorError(e, actual.currentContext())); + } + + } + + @Override + public Optional block() { + //duration is ignored below + return block(Duration.ZERO); + } + + @Override + public Optional block(Duration m) { + final T v; + + try { + v = callable.call(); + } + catch (Throwable e) { + throw Exceptions.propagate(e); + } + + return Optional.ofNullable(v); + } + + @Override + public Optional call() throws Exception { + final T v = callable.call(); + + return Optional.ofNullable(v); + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; + return null; + } +} diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalMono.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalMono.java new file mode 100644 index 0000000000..6305d9ccae --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalMono.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import reactor.core.CoreSubscriber; + +import java.util.Optional; + +/** + * Wraps the item from the source into an Optional, emits + * an empty Optional instead for empty source or signals + * IndexOutOfBoundsException for a multi-item source. + * + * @param the value type + * @see Reactive-Streams-Commons + */ +final class MonoSingleOptionalMono extends InternalMonoOperator> { + + MonoSingleOptionalMono(Mono source) { + super(source); + } + + @Override + public CoreSubscriber subscribeOrReturn(CoreSubscriber> actual) { + return new MonoSingleOptional.SingleOptionalSubscriber<>(actual); + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; + return super.scanUnsafe(key); + } +} diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalMonoTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalMonoTest.java new file mode 100644 index 0000000000..df90581c84 --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalMonoTest.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import org.junit.jupiter.api.Test; +import reactor.core.Fuseable; +import reactor.core.Scannable; +import reactor.test.StepVerifier; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +public class MonoSingleOptionalMonoTest { + + @Test + public void callableEmpty() { + StepVerifier.create(Mono.empty().singleOptional()) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + public void callableValued() { + StepVerifier.create(Mono.just("foo").singleOptional()) + .expectNext(Optional.of("foo")) + .verifyComplete(); + } + + @Test + public void normalEmpty() { + StepVerifier.create(Mono.empty().hide().singleOptional()) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + public void normalValued() { + StepVerifier.create(Mono.just("foo").hide().singleOptional()) + .expectNext(Optional.of("foo")) + .verifyComplete(); + } + + // see https://github.com/reactor/reactor-core/issues/2663 + @Test + void fusionMonoSingleMonoDoesntTriggerFusion() { + Mono> fusedCase = Mono + .just(1) + .map(Function.identity()) + .singleOptional(); + + assertThat(fusedCase) + .as("fusedCase assembly check") + .isInstanceOf(MonoSingleOptionalMono.class) + .isNotInstanceOf(Fuseable.class); + + assertThatCode(() -> fusedCase.filter(v -> true).block()) + .as("fusedCase fused") + .doesNotThrowAnyException(); + } + + @Test + public void scanOperator(){ + MonoSingleOptionalMono test = new MonoSingleOptionalMono<>(Mono.just("foo")); + + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + } +} diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java new file mode 100644 index 0000000000..52c92f082e --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.Callable; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; +import reactor.core.Scannable; +import reactor.test.StepVerifier; +import reactor.test.subscriber.AssertSubscriber; + +import static org.assertj.core.api.Assertions.*; + +public class MonoSingleOptionalTest { + + @Nested + class ConcreteClassConsistency { + //tests Flux.singleOptional, and Mono.singleOptional API consistency over returned classes + + @Test + void monoWithScalarEmpty() { + Mono source = Mono.empty(); + Mono> singleOptional = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(singleOptional).as("singleOptional") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithScalarError() { + Mono source = Mono.error(new IllegalStateException("test")); + Mono> singleOptional = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(singleOptional).as("singleOptional") + .isInstanceOf(MonoError.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithScalarValue() { + Mono source = Mono.just(1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("single") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithCallable() { + Mono source = Mono.fromSupplier(() -> 1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source") + .isInstanceOf(Callable.class) + .isNotInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("single").isInstanceOf(MonoSingleOptionalCallable.class); + } + + @Test + void monoWithNormal() { + Mono source = Mono.just(1).hide(); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isNotInstanceOf(Callable.class); //excludes ScalarCallable too + assertThat(single).as("single").isInstanceOf(MonoSingleOptionalMono.class); + } + + @Test + void fluxWithScalarEmpty() { + Flux source = Flux.empty(); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("single") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void fluxWithScalarError() { + Flux source = Flux.error(new IllegalStateException("test")); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("single") + .isInstanceOf(MonoError.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void fluxWithScalarValue() { + Flux source = Flux.just(1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("single") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void fluxWithCallable() { + Flux source = Mono.fromSupplier(() -> 1).flux(); + Mono> single = source.singleOptional(); + + assertThat(source).as("source") + .isInstanceOf(Callable.class) + .isNotInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("single").isInstanceOf(MonoSingleOptionalCallable.class); + } + + @Test + void fluxWithNormal() { + Flux source = Flux.range(1, 10); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isNotInstanceOf(Callable.class); //excludes ScalarCallable too + assertThat(single).as("single").isInstanceOf(MonoSingleOptional.class); + } + + } + + @Test + void source1Null() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { + new MonoSingleOptional<>(null); + }); + } + + @Test + public void normal() { + + AssertSubscriber> ts = AssertSubscriber.create(); + + Flux.just(1).singleOptional().subscribe(ts); + + ts.assertValues(Optional.of(1)) + .assertNoError() + .assertComplete(); + } + + @Test + public void normalBackpressured() { + AssertSubscriber> ts = AssertSubscriber.create(0); + + Flux.just(1).singleOptional().subscribe(ts); + + ts.assertNoValues() + .assertNoError() + .assertNotComplete(); + + ts.request(1); + + ts.assertValues(Optional.of(1)) + .assertNoError() + .assertComplete(); + } + + @Test + public void empty() { + + AssertSubscriber> ts = AssertSubscriber.create(); + + Flux.empty().singleOptional().subscribe(ts); + + ts.assertValues(Optional.empty()) + .assertNoError() + .assertComplete(); + } + + @Test + public void error() { + StepVerifier.create(Flux.error(new RuntimeException("forced failure")) + .singleOptional()) + .verifyErrorMessage("forced failure"); + } + + @Test + public void errorHide() { + StepVerifier.create(Flux.error(new RuntimeException("forced failure")) + .hide() + .singleOptional()) + .verifyErrorMessage("forced failure"); + } + + @Test + public void multi() { + + AssertSubscriber> ts = AssertSubscriber.create(); + + Flux.range(1, 10).singleOptional().subscribe(ts); + + ts.assertNoValues() + .assertError(IndexOutOfBoundsException.class) + .assertNotComplete(); + } + + @Test + public void multiBackpressured() { + + AssertSubscriber> ts = AssertSubscriber.create(0); + + Flux.range(1, 10).singleOptional().subscribe(ts); + + ts.assertNoValues() + .assertNoError() + .assertNotComplete(); + + ts.request(1); + + ts.assertNoValues() + .assertError(IndexOutOfBoundsException.class) + .assertNotComplete(); + } + + @Test + public void singleCallable() { + StepVerifier.create(Mono.fromCallable(() -> 1) + .flux() + .singleOptional()) + .expectNext(Optional.of(1)) + .verifyComplete(); + } + + @Test + public void singleJustHide() { + StepVerifier.create(Flux.empty() + .hide() + .singleOptional()) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + public void scanOperator(){ + MonoSingleOptional test = new MonoSingleOptional<>(Flux.just("foo")); + + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + } + + @Test + public void scanSubscriber() { + CoreSubscriber> + actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null); + MonoSingleOptional.SingleOptionalSubscriber test = new MonoSingleOptional.SingleOptionalSubscriber<>( + actual); + Subscription parent = Operators.emptySubscription(); + test.onSubscribe(parent); + + assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); + + assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); + assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + + assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse(); + test.onError(new IllegalStateException("boom")); + assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue(); + + assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); + test.cancel(); + assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); + } + +} From 0c7854579bfc10f2234903d04d677bd094d103fd Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sat, 7 Jan 2023 11:19:44 +0100 Subject: [PATCH 2/6] Remove Flux.singleOptional() --- .../java/reactor/core/publisher/Flux.java | 31 -- .../java/reactor/core/publisher/Mono.java | 4 +- .../core/publisher/MonoSingleOptional.java | 43 ++- .../publisher/MonoSingleOptionalMono.java | 47 --- .../publisher/MonoSingleOptionalMonoTest.java | 85 ---- .../publisher/MonoSingleOptionalTest.java | 364 ++++++------------ 6 files changed, 153 insertions(+), 421 deletions(-) delete mode 100644 reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalMono.java delete mode 100644 reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalMonoTest.java diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 9ff5acddde..43cdf159cf 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -8235,37 +8235,6 @@ public final Mono singleOrEmpty() { return Mono.onAssembly(new MonoSingle<>(this, null, true)); } - /** - * Expect a single item from this {@link Flux} source, and emit it wrapped into an Optional. - * Emit an empty Optional for an empty source but signal an {@link IndexOutOfBoundsException} for a source - * with more than one element. - *

- * - * - * @return a {@link Mono} with an Optional containing the expected single item, an empty optional or an error - */ - public final Mono> singleOptional() { - if (this instanceof Callable) { - if (this instanceof Fuseable.ScalarCallable) { - @SuppressWarnings("unchecked") - Fuseable.ScalarCallable scalarCallable = (Fuseable.ScalarCallable) this; - - T v; - try { - v = scalarCallable.call(); - } - catch (Exception e) { - return Mono.error(Exceptions.unwrap(e)); - } - return Mono.just(Optional.ofNullable(v)); - } - @SuppressWarnings("unchecked") - Callable thiz = (Callable)this; - return Mono.onAssembly(new MonoSingleOptionalCallable<>(thiz)); - } - return Mono.onAssembly(new MonoSingleOptional<>(this)); - } - /** * Skip the specified number of elements from the beginning of this {@link Flux} then * emit the remaining elements. diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index b07161dc82..c50395f194 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -4252,7 +4252,7 @@ public final Mono single() { * *

* - * @return a {@link Mono} with an Optional containing the item, an empty optional or an error + * @return a {@link Mono} with an Optional containing the item, an empty optional or an error signal */ public final Mono> singleOptional() { if (this instanceof Callable) { @@ -4273,7 +4273,7 @@ public final Mono> singleOptional() { Callable thiz = (Callable)this; return Mono.onAssembly(new MonoSingleOptionalCallable<>(thiz)); } - return Mono.onAssembly(new MonoSingleOptionalMono<>(this)); + return Mono.onAssembly(new MonoSingleOptional<>(this)); } /** diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java index 37773b6483..159c5d6288 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java @@ -16,11 +16,10 @@ package reactor.core.publisher; -import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Optional; import org.reactivestreams.Subscription; + import reactor.core.CoreSubscriber; import reactor.util.annotation.Nullable; import reactor.util.context.Context; @@ -33,22 +32,30 @@ * @param the value type * @see Reactive-Streams-Commons */ -final class MonoSingleOptional extends MonoFromFluxOperator> { - - MonoSingleOptional(Flux source) { - super(source); - } - - @Override - public CoreSubscriber subscribeOrReturn(CoreSubscriber> actual) { - return new SingleOptionalSubscriber<>(actual); - } - - @Override - public Object scanUnsafe(Attr key) { - if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; - return super.scanUnsafe(key); - } +/** + * Wraps the item from the source into an Optional, emits + * an empty Optional instead for empty source or signals + * IndexOutOfBoundsException for a multi-item source. + * + * @param the value type + * @see Reactive-Streams-Commons + */ +final class MonoSingleOptional extends InternalMonoOperator> { + + MonoSingleOptional(Mono source) { + super(source); + } + + @Override + public CoreSubscriber subscribeOrReturn(CoreSubscriber> actual) { + return new MonoSingleOptional.SingleOptionalSubscriber<>(actual); + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; + return super.scanUnsafe(key); + } static final class SingleOptionalSubscriber extends Operators.MonoInnerProducerBase> implements InnerConsumer { diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalMono.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalMono.java deleted file mode 100644 index 6305d9ccae..0000000000 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalMono.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.publisher; - -import reactor.core.CoreSubscriber; - -import java.util.Optional; - -/** - * Wraps the item from the source into an Optional, emits - * an empty Optional instead for empty source or signals - * IndexOutOfBoundsException for a multi-item source. - * - * @param the value type - * @see Reactive-Streams-Commons - */ -final class MonoSingleOptionalMono extends InternalMonoOperator> { - - MonoSingleOptionalMono(Mono source) { - super(source); - } - - @Override - public CoreSubscriber subscribeOrReturn(CoreSubscriber> actual) { - return new MonoSingleOptional.SingleOptionalSubscriber<>(actual); - } - - @Override - public Object scanUnsafe(Attr key) { - if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; - return super.scanUnsafe(key); - } -} diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalMonoTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalMonoTest.java deleted file mode 100644 index df90581c84..0000000000 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalMonoTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.publisher; - -import org.junit.jupiter.api.Test; -import reactor.core.Fuseable; -import reactor.core.Scannable; -import reactor.test.StepVerifier; - -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.function.Function; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; - -public class MonoSingleOptionalMonoTest { - - @Test - public void callableEmpty() { - StepVerifier.create(Mono.empty().singleOptional()) - .expectNext(Optional.empty()) - .verifyComplete(); - } - - @Test - public void callableValued() { - StepVerifier.create(Mono.just("foo").singleOptional()) - .expectNext(Optional.of("foo")) - .verifyComplete(); - } - - @Test - public void normalEmpty() { - StepVerifier.create(Mono.empty().hide().singleOptional()) - .expectNext(Optional.empty()) - .verifyComplete(); - } - - @Test - public void normalValued() { - StepVerifier.create(Mono.just("foo").hide().singleOptional()) - .expectNext(Optional.of("foo")) - .verifyComplete(); - } - - // see https://github.com/reactor/reactor-core/issues/2663 - @Test - void fusionMonoSingleMonoDoesntTriggerFusion() { - Mono> fusedCase = Mono - .just(1) - .map(Function.identity()) - .singleOptional(); - - assertThat(fusedCase) - .as("fusedCase assembly check") - .isInstanceOf(MonoSingleOptionalMono.class) - .isNotInstanceOf(Fuseable.class); - - assertThatCode(() -> fusedCase.filter(v -> true).block()) - .as("fusedCase fused") - .doesNotThrowAnyException(); - } - - @Test - public void scanOperator(){ - MonoSingleOptionalMono test = new MonoSingleOptionalMono<>(Mono.just("foo")); - - assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); - } -} diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java index 52c92f082e..0451d1a0e0 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,275 +16,163 @@ package reactor.core.publisher; -import java.util.NoSuchElementException; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + import java.util.Optional; import java.util.concurrent.Callable; +import java.util.function.Function; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; + import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.Scannable; import reactor.test.StepVerifier; -import reactor.test.subscriber.AssertSubscriber; - -import static org.assertj.core.api.Assertions.*; public class MonoSingleOptionalTest { - @Nested - class ConcreteClassConsistency { - //tests Flux.singleOptional, and Mono.singleOptional API consistency over returned classes - - @Test - void monoWithScalarEmpty() { - Mono source = Mono.empty(); - Mono> singleOptional = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(singleOptional).as("singleOptional") - .isInstanceOf(MonoJust.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void monoWithScalarError() { - Mono source = Mono.error(new IllegalStateException("test")); - Mono> singleOptional = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(singleOptional).as("singleOptional") - .isInstanceOf(MonoError.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void monoWithScalarValue() { - Mono source = Mono.just(1); - Mono> single = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(single).as("single") - .isInstanceOf(MonoJust.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void monoWithCallable() { - Mono source = Mono.fromSupplier(() -> 1); - Mono> single = source.singleOptional(); - - assertThat(source).as("source") - .isInstanceOf(Callable.class) - .isNotInstanceOf(Fuseable.ScalarCallable.class); - assertThat(single).as("single").isInstanceOf(MonoSingleOptionalCallable.class); - } - - @Test - void monoWithNormal() { - Mono source = Mono.just(1).hide(); - Mono> single = source.singleOptional(); - - assertThat(source).as("source").isNotInstanceOf(Callable.class); //excludes ScalarCallable too - assertThat(single).as("single").isInstanceOf(MonoSingleOptionalMono.class); - } - - @Test - void fluxWithScalarEmpty() { - Flux source = Flux.empty(); - Mono> single = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(single).as("single") - .isInstanceOf(MonoJust.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void fluxWithScalarError() { - Flux source = Flux.error(new IllegalStateException("test")); - Mono> single = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(single).as("single") - .isInstanceOf(MonoError.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void fluxWithScalarValue() { - Flux source = Flux.just(1); - Mono> single = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(single).as("single") - .isInstanceOf(MonoJust.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void fluxWithCallable() { - Flux source = Mono.fromSupplier(() -> 1).flux(); - Mono> single = source.singleOptional(); - - assertThat(source).as("source") - .isInstanceOf(Callable.class) - .isNotInstanceOf(Fuseable.ScalarCallable.class); - assertThat(single).as("single").isInstanceOf(MonoSingleOptionalCallable.class); - } - - @Test - void fluxWithNormal() { - Flux source = Flux.range(1, 10); - Mono> single = source.singleOptional(); - - assertThat(source).as("source").isNotInstanceOf(Callable.class); //excludes ScalarCallable too - assertThat(single).as("single").isInstanceOf(MonoSingleOptional.class); - } - - } - - @Test - void source1Null() { - assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { - new MonoSingleOptional<>(null); - }); - } - - @Test - public void normal() { - - AssertSubscriber> ts = AssertSubscriber.create(); - - Flux.just(1).singleOptional().subscribe(ts); - - ts.assertValues(Optional.of(1)) - .assertNoError() - .assertComplete(); - } - - @Test - public void normalBackpressured() { - AssertSubscriber> ts = AssertSubscriber.create(0); - - Flux.just(1).singleOptional().subscribe(ts); - - ts.assertNoValues() - .assertNoError() - .assertNotComplete(); - - ts.request(1); - - ts.assertValues(Optional.of(1)) - .assertNoError() - .assertComplete(); - } - - @Test - public void empty() { - - AssertSubscriber> ts = AssertSubscriber.create(); - - Flux.empty().singleOptional().subscribe(ts); - - ts.assertValues(Optional.empty()) - .assertNoError() - .assertComplete(); - } - - @Test - public void error() { - StepVerifier.create(Flux.error(new RuntimeException("forced failure")) - .singleOptional()) - .verifyErrorMessage("forced failure"); - } - - @Test - public void errorHide() { - StepVerifier.create(Flux.error(new RuntimeException("forced failure")) - .hide() - .singleOptional()) - .verifyErrorMessage("forced failure"); + @Nested + class ConcreteClassConsistency { + // tests Mono.singleOptional returned classes + + @Test + void monoWithScalarEmpty() { + Mono source = Mono.empty(); + Mono> singleOptional = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(singleOptional).as("singleOptional") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithScalarError() { + Mono source = Mono.error(new IllegalStateException("test")); + Mono> singleOptional = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(singleOptional).as("singleOptional") + .isInstanceOf(MonoError.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithScalarValue() { + Mono source = Mono.just(1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("singleOptional") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithCallable() { + Mono source = Mono.fromSupplier(() -> 1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source") + .isInstanceOf(Callable.class) + .isNotInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("singleOptional").isInstanceOf(MonoSingleOptionalCallable.class); + } + + @Test + void monoWithNormal() { + Mono source = Mono.just(1).hide(); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isNotInstanceOf(Callable.class); //excludes ScalarCallable too + assertThat(single).as("singleOptional").isInstanceOf(MonoSingleOptional.class); + } + } + + @Test + void source1Null() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { + new MonoSingleOptional<>(null); + }); + } + + @Test + public void callableEmpty() { + StepVerifier.create(Mono.empty().singleOptional()) + .expectNext(Optional.empty()) + .verifyComplete(); } @Test - public void multi() { - - AssertSubscriber> ts = AssertSubscriber.create(); - - Flux.range(1, 10).singleOptional().subscribe(ts); - - ts.assertNoValues() - .assertError(IndexOutOfBoundsException.class) - .assertNotComplete(); + public void callableValued() { + StepVerifier.create(Mono.just("foo").singleOptional()) + .expectNext(Optional.of("foo")) + .verifyComplete(); } @Test - public void multiBackpressured() { - - AssertSubscriber> ts = AssertSubscriber.create(0); - - Flux.range(1, 10).singleOptional().subscribe(ts); - - ts.assertNoValues() - .assertNoError() - .assertNotComplete(); - - ts.request(1); - - ts.assertNoValues() - .assertError(IndexOutOfBoundsException.class) - .assertNotComplete(); + public void normalEmpty() { + StepVerifier.create(Mono.empty().hide().singleOptional()) + .expectNext(Optional.empty()) + .verifyComplete(); } @Test - public void singleCallable() { - StepVerifier.create(Mono.fromCallable(() -> 1) - .flux() - .singleOptional()) - .expectNext(Optional.of(1)) + public void normalValued() { + StepVerifier.create(Mono.just("foo").hide().singleOptional()) + .expectNext(Optional.of("foo")) .verifyComplete(); } @Test - public void singleJustHide() { - StepVerifier.create(Flux.empty() - .hide() - .singleOptional()) - .expectNext(Optional.empty()) - .verifyComplete(); + void fusionMonoSingleFusion() { + Mono> fusedCase = Mono + .just(1) + .map(Function.identity()) + .singleOptional(); + + assertThat(fusedCase) + .as("fusedCase assembly check") + .isInstanceOf(MonoSingleOptional.class) + .isNotInstanceOf(Fuseable.class); + + assertThatCode(() -> fusedCase.filter(v -> true).block()) + .as("fusedCase fused") + .doesNotThrowAnyException(); } @Test public void scanOperator(){ - MonoSingleOptional test = new MonoSingleOptional<>(Flux.just("foo")); + MonoSingleOptional test = new MonoSingleOptional<>(Mono.just("foo")); assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); } - @Test - public void scanSubscriber() { - CoreSubscriber> - actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null); - MonoSingleOptional.SingleOptionalSubscriber test = new MonoSingleOptional.SingleOptionalSubscriber<>( - actual); - Subscription parent = Operators.emptySubscription(); - test.onSubscribe(parent); - - assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); - - assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); - assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); - assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); - - assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse(); - test.onError(new IllegalStateException("boom")); - assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue(); - - assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); - test.cancel(); - assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); - } - + @Test + public void scanSubscriber() { + CoreSubscriber> + actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null); + MonoSingleOptional.SingleOptionalSubscriber test = new MonoSingleOptional.SingleOptionalSubscriber<>( + actual); + Subscription parent = Operators.emptySubscription(); + test.onSubscribe(parent); + + assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); + + assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); + assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + + assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse(); + test.onError(new IllegalStateException("boom")); + assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue(); + + assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); + test.cancel(); + assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); + } } From 3c61b6093c5f567a2969bcb9874bbba994c018bc Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Tue, 10 Jan 2023 14:16:16 +0100 Subject: [PATCH 3/6] Updates after review comments --- .../java/reactor/core/publisher/Mono.java | 4 +- .../core/publisher/MonoSingleOptional.java | 11 +- .../publisher/MonoSingleOptionalCallable.java | 2 +- .../core/publisher/MonoSingleMonoTest.java | 15 +- .../MonoSingleOptionalCallableTest.java | 141 ++++++++++ .../publisher/MonoSingleOptionalTest.java | 246 +++++++++--------- 6 files changed, 284 insertions(+), 135 deletions(-) create mode 100644 reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalCallableTest.java diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index c50395f194..2501d8d497 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -4249,8 +4249,6 @@ public final Mono single() { * Wrap the item produced by this {@link Mono} source into an Optional * or emit an empty Optional for an empty source. *

- * - *

* * @return a {@link Mono} with an Optional containing the item, an empty optional or an error signal */ diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java index 159c5d6288..0b9c03f304 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,14 +32,6 @@ * @param the value type * @see Reactive-Streams-Commons */ -/** - * Wraps the item from the source into an Optional, emits - * an empty Optional instead for empty source or signals - * IndexOutOfBoundsException for a multi-item source. - * - * @param the value type - * @see Reactive-Streams-Commons - */ final class MonoSingleOptional extends InternalMonoOperator> { MonoSingleOptional(Mono source) { @@ -145,7 +137,6 @@ public void onComplete() { int c = count; if (c == 0) { - complete(Optional.empty()); } else if (c == 1) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java index 26d1ed649a..1a3e07bfcb 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleMonoTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleMonoTest.java index d443972a6a..f9787c8264 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleMonoTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleMonoTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,12 @@ public void callableValued() { .expectNext("foo") .verifyComplete(); } + + @Test + public void callableError() { + StepVerifier.create(Mono.error(new IllegalStateException("failed")).single()) + .expectErrorMessage("failed"); + } @Test public void normalEmpty() { @@ -59,7 +65,12 @@ public void normalValued() { .expectNext("foo") .verifyComplete(); } - + + @Test + public void normalError() { + StepVerifier.create(Mono.error(new IllegalStateException("failed")).hide().single()) + .expectErrorMessage("failed"); + } // see https://github.com/reactor/reactor-core/issues/2663 @Test void fusionMonoSingleMonoDoesntTriggerFusion() { diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalCallableTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalCallableTest.java new file mode 100644 index 0000000000..73c4ac1d40 --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalCallableTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; + +import org.junit.jupiter.api.Test; + +import reactor.core.Fuseable; +import reactor.core.Scannable; +import reactor.test.StepVerifier; + +class MonoSingleOptionalCallableTest { + + @Test + void testCallableFusedEmptySource() { + Mono> mono = Mono + .fromSupplier(() -> null) + .singleOptional(); + + StepVerifier.create(mono) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + void testCallableFusedSingleEmptySourceOnBlock() { + Mono> mono = Mono + .fromSupplier(() -> null) + .singleOptional(); + + assertEquals(Optional.empty(), mono.block()); + } + + @Test + void testCallableFusedSingleEmptySourceOnCall() throws Exception { + Mono> mono = Mono + .fromSupplier(() -> null) + .singleOptional(); + + assertThat(mono).isInstanceOf(MonoSingleOptionalCallable.class); + + assertEquals(Optional.empty(), ((Callable) mono).call()); + } + + @Test + void sourceNull() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { + new MonoSingleOptionalCallable<>(null); + }); + } + + @Test + void normal() { + StepVerifier.create(new MonoSingleOptionalCallable<>(() -> 1)) + .expectNext(Optional.of(1)) + .verifyComplete(); + } + + @Test + void normalBackpressured() { + StepVerifier.create(new MonoSingleOptionalCallable<>(() -> 1), 0) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(50)) + .thenRequest(1) + .expectNext(Optional.of(1)) + .verifyComplete(); + } + + //scalarCallable empty/error/just are not instantiating MonoSingleOptionalCallable and are covered in MonoSingleTest + //we still cover the case where a callable source throws + + @Test + void failingCallable() { + StepVerifier.create(new MonoSingleOptionalCallable<>(() -> { throw new IllegalStateException("test"); } )) + .verifyErrorMessage("test"); + } + + @Test + void emptyCallable() { + StepVerifier.create(new MonoSingleOptionalCallable<>(() -> null)) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + void valuedCallable() { + @SuppressWarnings("unchecked") + Callable fluxCallable = (Callable) Mono.fromCallable(() -> 1).flux(); + + + StepVerifier.create(new MonoSingleOptionalCallable<>(fluxCallable)) + .expectNext(Optional.of(1)) + .verifyComplete(); + } + + @Test + void fusionMonoSingleOptionalCallableDoesntTriggerFusion() { + Mono> fusedCase = Mono + .fromCallable(() -> 1) + .singleOptional(); + + assertThat(fusedCase) + .as("fusedCase assembly check") + .isInstanceOf(MonoSingleOptionalCallable.class) + .isNotInstanceOf(Fuseable.class); + + assertThatCode(() -> fusedCase.filter(v -> true).block()) + .as("fusedCase fused") + .doesNotThrowAnyException(); + } + + @Test + void scanOperator(){ + MonoSingleOptionalCallable test = new MonoSingleOptionalCallable<>(() -> "foo"); + + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + } + +} diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java index 0451d1a0e0..44b5b93116 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,145 +34,153 @@ import reactor.test.StepVerifier; public class MonoSingleOptionalTest { - - @Nested - class ConcreteClassConsistency { - // tests Mono.singleOptional returned classes - - @Test - void monoWithScalarEmpty() { - Mono source = Mono.empty(); - Mono> singleOptional = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(singleOptional).as("singleOptional") - .isInstanceOf(MonoJust.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void monoWithScalarError() { - Mono source = Mono.error(new IllegalStateException("test")); - Mono> singleOptional = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(singleOptional).as("singleOptional") - .isInstanceOf(MonoError.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void monoWithScalarValue() { - Mono source = Mono.just(1); - Mono> single = source.singleOptional(); - - assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); - assertThat(single).as("singleOptional") - .isInstanceOf(MonoJust.class) - .isInstanceOf(Fuseable.ScalarCallable.class); - } - - @Test - void monoWithCallable() { - Mono source = Mono.fromSupplier(() -> 1); - Mono> single = source.singleOptional(); - - assertThat(source).as("source") - .isInstanceOf(Callable.class) - .isNotInstanceOf(Fuseable.ScalarCallable.class); - assertThat(single).as("singleOptional").isInstanceOf(MonoSingleOptionalCallable.class); - } - - @Test - void monoWithNormal() { - Mono source = Mono.just(1).hide(); - Mono> single = source.singleOptional(); - - assertThat(source).as("source").isNotInstanceOf(Callable.class); //excludes ScalarCallable too - assertThat(single).as("singleOptional").isInstanceOf(MonoSingleOptional.class); - } - } - - @Test - void source1Null() { - assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { - new MonoSingleOptional<>(null); - }); - } - - @Test + + @Nested + class ConcreteClassConsistency { + // tests Mono.singleOptional returned classes + + @Test + void monoWithScalarEmpty() { + Mono source = Mono.empty(); + Mono> singleOptional = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(singleOptional).as("singleOptional") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithScalarError() { + Mono source = Mono.error(new IllegalStateException("test")); + Mono> singleOptional = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(singleOptional).as("singleOptional") + .isInstanceOf(MonoError.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithScalarValue() { + Mono source = Mono.just(1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("singleOptional") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithCallable() { + Mono source = Mono.fromSupplier(() -> 1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source") + .isInstanceOf(Callable.class) + .isNotInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("singleOptional").isInstanceOf(MonoSingleOptionalCallable.class); + } + + @Test + void monoWithNormal() { + Mono source = Mono.just(1).hide(); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isNotInstanceOf(Callable.class); // excludes + // ScalarCallable + // too + assertThat(single).as("singleOptional").isInstanceOf(MonoSingleOptional.class); + } + } + + @Test + void source1Null() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { + new MonoSingleOptional<>(null); + }); + } + + @Test public void callableEmpty() { StepVerifier.create(Mono.empty().singleOptional()) - .expectNext(Optional.empty()) - .verifyComplete(); + .expectNext(Optional.empty()) + .verifyComplete(); } - + @Test public void callableValued() { StepVerifier.create(Mono.just("foo").singleOptional()) - .expectNext(Optional.of("foo")) - .verifyComplete(); + .expectNext(Optional.of("foo")) + .verifyComplete(); } - + + @Test + public void callableError() { + StepVerifier.create(Mono.error(new IllegalStateException("failed")).singleOptional()) + .expectErrorMessage("failed"); + } + @Test public void normalEmpty() { StepVerifier.create(Mono.empty().hide().singleOptional()) - .expectNext(Optional.empty()) - .verifyComplete(); + .expectNext(Optional.empty()) + .verifyComplete(); } - + @Test public void normalValued() { StepVerifier.create(Mono.just("foo").hide().singleOptional()) - .expectNext(Optional.of("foo")) - .verifyComplete(); + .expectNext(Optional.of("foo")) + .verifyComplete(); } - + + @Test + public void normalError() { + StepVerifier.create(Mono.error(new IllegalStateException("failed")).hide().singleOptional()) + .expectErrorMessage("failed"); + } + @Test void fusionMonoSingleFusion() { - Mono> fusedCase = Mono - .just(1) - .map(Function.identity()) - .singleOptional(); - - assertThat(fusedCase) - .as("fusedCase assembly check") + Mono> fusedCase = Mono.just(1).map(Function.identity()).singleOptional(); + + assertThat(fusedCase).as("fusedCase assembly check") .isInstanceOf(MonoSingleOptional.class) .isNotInstanceOf(Fuseable.class); - - assertThatCode(() -> fusedCase.filter(v -> true).block()) - .as("fusedCase fused") + + assertThatCode(() -> fusedCase.filter(v -> true).block()).as("fusedCase fused") .doesNotThrowAnyException(); } - + @Test - public void scanOperator(){ - MonoSingleOptional test = new MonoSingleOptional<>(Mono.just("foo")); - - assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + public void scanOperator() { + MonoSingleOptional test = new MonoSingleOptional<>(Mono.just("foo")); + + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + } + + @Test + public void scanSubscriber() { + CoreSubscriber> actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null); + MonoSingleOptional.SingleOptionalSubscriber test = new MonoSingleOptional.SingleOptionalSubscriber<>( + actual); + Subscription parent = Operators.emptySubscription(); + test.onSubscribe(parent); + + assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); + + assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); + assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + + assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse(); + test.onError(new IllegalStateException("boom")); + assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue(); + + assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); + test.cancel(); + assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); } - - @Test - public void scanSubscriber() { - CoreSubscriber> - actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null); - MonoSingleOptional.SingleOptionalSubscriber test = new MonoSingleOptional.SingleOptionalSubscriber<>( - actual); - Subscription parent = Operators.emptySubscription(); - test.onSubscribe(parent); - - assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); - - assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); - assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); - assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); - - assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse(); - test.onError(new IllegalStateException("boom")); - assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue(); - - assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); - test.cancel(); - assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); - } } From 2fc713004a545b78fa23ff99888ab5e419c140c3 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Wed, 11 Jan 2023 00:03:32 +0100 Subject: [PATCH 4/6] Add marble file --- .../java/reactor/core/publisher/Mono.java | 2 + .../doc-files/marbles/singleOptional.svg | 1220 +++++++++++++++++ 2 files changed, 1222 insertions(+) create mode 100644 reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/singleOptional.svg diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index 2501d8d497..b715cef9de 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -4249,6 +4249,8 @@ public final Mono single() { * Wrap the item produced by this {@link Mono} source into an Optional * or emit an empty Optional for an empty source. *

+ * + *

* * @return a {@link Mono} with an Optional containing the item, an empty optional or an error signal */ diff --git a/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/singleOptional.svg b/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/singleOptional.svg new file mode 100644 index 0000000000..bf5aaa56c4 --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/singleOptional.svg @@ -0,0 +1,1220 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + singleOptional + + + + + + + + + + + + + + + + + + + + + Optional.of(     ) + + Optional.empty() + From e0c5803fcea486542949f1db092538768d40ae8c Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Tue, 17 Jan 2023 06:58:48 +0100 Subject: [PATCH 5/6] Remove count field and complete on first onNext call. This simplifies the MonoSingleOptional class. In case onNext is called multiple times (which should not occur for Mono anyway) extranoues elements are simply dropped rather than throwing IndexOutOfBoundsException. --- .../core/publisher/MonoSingleOptional.java | 32 +++---------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java index 0b9c03f304..ce4225a003 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java @@ -25,9 +25,8 @@ import reactor.util.context.Context; /** - * Expects and emits a single item from the source wrapped into an Optional, emits - * an empty Optional instead for empty source or signals - * IndexOutOfBoundsException for a multi-item source. + * Emits a single item from the source wrapped into an Optional, emits + * an empty Optional instead for empty source. * * @param the value type * @see Reactive-Streams-Commons @@ -53,8 +52,6 @@ static final class SingleOptionalSubscriber extends Operators.MonoInnerProduc Subscription s; - int count; - boolean done; @Override @@ -96,24 +93,12 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (isCancelled()) { - //this helps differentiating a duplicate malformed signal "done" from a count > 1 "done" - Operators.onDiscard(t, actual().currentContext()); - return; - } if (done) { Operators.onNextDropped(t, actual().currentContext()); return; } - if (++count > 1) { - Operators.onDiscard(t, actual().currentContext()); - //mark as both cancelled and done - cancel(); - onError(new IndexOutOfBoundsException("Source emitted more than one item")); - } - else { - setValue(Optional.of(t)); - } + done = true; + complete(Optional.of(t)); } @Override @@ -134,14 +119,7 @@ public void onComplete() { return; } done = true; - - int c = count; - if (c == 0) { - complete(Optional.empty()); - } - else if (c == 1) { - complete(); - } + complete(Optional.empty()); } } From 455202e9129babf4726f2f3532de590bf95505ed Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Tue, 17 Jan 2023 22:27:41 +0100 Subject: [PATCH 6/6] Remove useless discardTheValue() call and align javadoc --- .../main/java/reactor/core/publisher/MonoSingleOptional.java | 2 -- .../reactor/core/publisher/MonoSingleOptionalCallable.java | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java index ce4225a003..dd0d43331a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java @@ -108,8 +108,6 @@ public void onError(Throwable t) { return; } done = true; - discardTheValue(); - actual().onError(t); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java index 1a3e07bfcb..360e53ce45 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java @@ -27,8 +27,8 @@ import java.util.concurrent.Callable; /** - * Expects and emits a single item from the source Callable and warps it into an Optional, - * emits an empty Optional for empty source. + * Emits a single item from the source wrapped into an Optional, emits + * an empty Optional instead for empty source. * * @param the value type * @see Reactive-Streams-Commons