diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 7fea0a1fb1..22252577eb 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -55,7 +55,6 @@ import rx.operators.OperationDefaultIfEmpty; import rx.operators.OperationDefer; import rx.operators.OperationDelay; -import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationFinally; @@ -95,6 +94,7 @@ import rx.operators.OperatorAsObservable; import rx.operators.OperatorCache; import rx.operators.OperatorCast; +import rx.operators.OperatorDematerialize; import rx.operators.OperatorDoOnEach; import rx.operators.OperatorElementAt; import rx.operators.OperatorFilter; @@ -3590,9 +3590,9 @@ public final Observable delaySubscription(long delay, TimeUnit unit, Schedule * @see RxJava Wiki: dematerialize() * @see MSDN: Observable.dematerialize */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public final Observable dematerialize() { - return create(OperationDematerialize.dematerialize((Observable>) this)); + return lift(new OperatorDematerialize()); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java b/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java deleted file mode 100644 index 026bbc891d..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import rx.Notification; -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; - -/** - * Reverses the effect of {@link OperatorMaterialize} by transforming the Notification objects - * emitted by a source Observable into the items or notifications they represent. - *

- * - *

- * See here for the - * Microsoft Rx equivalent. - */ -public final class OperationDematerialize { - - /** - * Dematerializes the explicit notification values of an observable sequence as implicit notifications. - * - * @param sequence - * An observable sequence containing explicit notification values which have to be turned into implicit notifications. - * @return An observable sequence exhibiting the behavior corresponding to the source sequence's notification values. - * @see Observable.Dematerialize(TSource) Method - */ - public static OnSubscribeFunc dematerialize(final Observable> sequence) { - return new DematerializeObservable(sequence); - } - - private static class DematerializeObservable implements OnSubscribeFunc { - - private final Observable> sequence; - - public DematerializeObservable(Observable> sequence) { - this.sequence = sequence; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - return sequence.unsafeSubscribe(new Subscriber>() { - @Override - public void onCompleted() { - observer.onCompleted(); - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onNext(Notification value) { - switch (value.getKind()) { - case OnNext: - observer.onNext(value.getValue()); - break; - case OnError: - observer.onError(value.getThrowable()); - break; - case OnCompleted: - observer.onCompleted(); - break; - } - } - }); - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorDematerialize.java b/rxjava-core/src/main/java/rx/operators/OperatorDematerialize.java new file mode 100644 index 0000000000..043837ac42 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorDematerialize.java @@ -0,0 +1,76 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Notification; +import rx.Observable.Operator; +import rx.Subscriber; + +/** + * Reverses the effect of {@link OperatorMaterialize} by transforming the Notification objects + * emitted by a source Observable into the items or notifications they represent. + *

+ * + *

+ * See here for the + * Microsoft Rx equivalent. + * + * @param the wrapped value type + */ +public final class OperatorDematerialize implements Operator> { + + @Override + public Subscriber> call(final Subscriber child) { + return new Subscriber>(child) { + /** Do not send two onCompleted events. */ + boolean terminated; + @Override + public void onNext(Notification t) { + switch (t.getKind()) { + case OnNext: + if (!terminated) { + child.onNext(t.getValue()); + } + break; + case OnError: + onError(t.getThrowable()); + break; + case OnCompleted: + onCompleted(); + break; + } + } + + @Override + public void onError(Throwable e) { + if (!terminated) { + terminated = true; + child.onError(e); + } + } + + @Override + public void onCompleted() { + if (!terminated) { + terminated = true; + child.onCompleted(); + } + } + + }; + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationDematerializeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorDematerializeTest.java similarity index 74% rename from rxjava-core/src/test/java/rx/operators/OperationDematerializeTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorDematerializeTest.java index c3f9954a48..3280ed3fd0 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDematerializeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorDematerializeTest.java @@ -20,16 +20,16 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static rx.operators.OperationDematerialize.dematerialize; import org.junit.Test; import rx.Notification; import rx.Observable; import rx.Observer; +import rx.observers.Subscribers; import rx.observers.TestSubscriber; -public class OperationDematerializeTest { +public class OperatorDematerializeTest { @Test @SuppressWarnings("unchecked") @@ -51,7 +51,7 @@ public void testDematerialize1() { public void testDematerialize2() { Throwable exception = new Throwable("test"); Observable observable = Observable.error(exception); - Observable dematerialize = Observable.create(dematerialize(observable.materialize())); + Observable dematerialize = observable.materialize().dematerialize(); Observer observer = mock(Observer.class); dematerialize.subscribe(observer); @@ -66,7 +66,7 @@ public void testDematerialize2() { public void testDematerialize3() { Exception exception = new Exception("test"); Observable observable = Observable.error(exception); - Observable dematerialize = Observable.create(dematerialize(observable.materialize())); + Observable dematerialize = observable.materialize().dematerialize(); Observer observer = mock(Observer.class); dematerialize.subscribe(observer); @@ -106,4 +106,33 @@ public void testCompletePassThru() { verify(observer, times(0)).onNext(any(Integer.class)); } + @Test + public void testHonorsContractWhenCompleted() { + Observable source = Observable.just(1); + + Observable result = source.materialize().dematerialize(); + + Observer o = mock(Observer.class); + + result.unsafeSubscribe(Subscribers.from(o)); + + verify(o).onNext(1); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testHonorsContractWhenThrows() { + Observable source = Observable.error(new OperationReduceTest.CustomException()); + + Observable result = source.materialize().dematerialize(); + + Observer o = mock(Observer.class); + + result.unsafeSubscribe(Subscribers.from(o)); + + verify(o, never()).onNext(any(Integer.class)); + verify(o, never()).onCompleted(); + verify(o).onError(any(OperationReduceTest.CustomException.class)); + } }