From ea73bd1f51aaff22215b839441aafbe3757a76d8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 12 Mar 2014 15:01:35 +0100 Subject: [PATCH] OperatorSkipWhile --- rxjava-core/src/main/java/rx/Observable.java | 6 +- .../java/rx/operators/OperationSkipWhile.java | 98 ------------------- .../java/rx/operators/OperatorSkipWhile.java | 78 +++++++++++++++ .../rx/operators/OperationSkipWhileTest.java | 33 +++++-- 4 files changed, 106 insertions(+), 109 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorSkipWhile.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5bfb5ad278..0f8fa4e5c8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -91,7 +91,7 @@ import rx.operators.OperationSkip; import rx.operators.OperationSkipLast; import rx.operators.OperationSkipUntil; -import rx.operators.OperationSkipWhile; +import rx.operators.OperatorSkipWhile; import rx.operators.OperationSum; import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; @@ -6427,7 +6427,7 @@ public final Observable skipUntil(Observable other) { * @see MSDN: Observable.SkipWhile */ public final Observable skipWhile(Func1 predicate) { - return create(OperationSkipWhile.skipWhile(this, predicate)); + return lift(new OperatorSkipWhile(OperatorSkipWhile.toPredicate2(predicate))); } /** @@ -6445,7 +6445,7 @@ public final Observable skipWhile(Func1 predicate) { * @see MSDN: Observable.SkipWhile */ public final Observable skipWhileWithIndex(Func2 predicate) { - return create(OperationSkipWhile.skipWhileWithIndex(this, predicate)); + return lift(new OperatorSkipWhile(predicate)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java b/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java deleted file mode 100644 index aaa2914986..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; -import rx.functions.Func1; -import rx.functions.Func2; - -/** - * Skips any emitted source items as long as the specified condition holds true. Emits all further source items - * as soon as the condition becomes false. - */ -public final class OperationSkipWhile { - public static OnSubscribeFunc skipWhileWithIndex(Observable source, Func2 predicate) { - return new SkipWhile(source, predicate); - } - - public static OnSubscribeFunc skipWhile(Observable source, final Func1 predicate) { - return new SkipWhile(source, new Func2() { - @Override - public Boolean call(T value, Integer index) { - return predicate.call(value); - } - }); - } - - private static class SkipWhile implements OnSubscribeFunc { - private final Observable source; - private final Func2 predicate; - private final AtomicBoolean skipping = new AtomicBoolean(true); - private final AtomicInteger index = new AtomicInteger(0); - - SkipWhile(Observable source, Func2 pred) { - this.source = source; - this.predicate = pred; - } - - public Subscription onSubscribe(Observer observer) { - return source.subscribe(new SkipWhileObserver(observer)); - } - - private class SkipWhileObserver implements Observer { - private final Observer observer; - - public SkipWhileObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onNext(T next) { - if (!skipping.get()) { - observer.onNext(next); - } else { - try { - if (!predicate.call(next, index.getAndIncrement())) { - skipping.set(false); - observer.onNext(next); - } else { - } - } catch (Throwable t) { - observer.onError(t); - } - } - } - - } - - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSkipWhile.java b/rxjava-core/src/main/java/rx/operators/OperatorSkipWhile.java new file mode 100644 index 0000000000..70e5fc220a --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorSkipWhile.java @@ -0,0 +1,78 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observable.Operator; +import rx.Observer; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Func1; +import rx.functions.Func2; + +/** + * Skips any emitted source items as long as the specified condition holds true. Emits all further source items + * as soon as the condition becomes false. + */ +public final class OperatorSkipWhile implements Operator { + private final Func2 predicate; + + public OperatorSkipWhile(Func2 predicate) { + this.predicate = predicate; + } + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber(child) { + boolean skipping = true; + int index; + @Override + public void onNext(T t) { + if (!skipping) { + child.onNext(t); + } else { + if (!predicate.call(t, index++)) { + skipping = false; + child.onNext(t); + } + } + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onCompleted() { + child.onCompleted(); + } + }; + } + /** Convert to Func2 type predicate. */ + public static Func2 toPredicate2(final Func1 predicate) { + return new Func2() { + + @Override + public Boolean call(T t1, Integer t2) { + return predicate.call(t1); + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationSkipWhileTest.java b/rxjava-core/src/test/java/rx/operators/OperationSkipWhileTest.java index 708ac42d50..b769f4f453 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSkipWhileTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSkipWhileTest.java @@ -15,9 +15,7 @@ */ package rx.operators; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.OperationSkipWhile.*; import org.junit.Test; import org.mockito.InOrder; @@ -51,7 +49,7 @@ public Boolean call(Integer value, Integer index) { @Test public void testSkipWithIndex() { Observable src = Observable.from(1, 2, 3, 4, 5); - Observable.create(skipWhileWithIndex(src, INDEX_LESS_THAN_THREE)).subscribe(w); + src.skipWhileWithIndex(INDEX_LESS_THAN_THREE).subscribe(w); InOrder inOrder = inOrder(w); inOrder.verify(w, times(1)).onNext(4); @@ -63,7 +61,7 @@ public void testSkipWithIndex() { @Test public void testSkipEmpty() { Observable src = Observable.empty(); - Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + src.skipWhile(LESS_THAN_FIVE).subscribe(w); verify(w, never()).onNext(anyInt()); verify(w, never()).onError(any(Throwable.class)); verify(w, times(1)).onCompleted(); @@ -72,7 +70,7 @@ public void testSkipEmpty() { @Test public void testSkipEverything() { Observable src = Observable.from(1, 2, 3, 4, 3, 2, 1); - Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + src.skipWhile(LESS_THAN_FIVE).subscribe(w); verify(w, never()).onNext(anyInt()); verify(w, never()).onError(any(Throwable.class)); verify(w, times(1)).onCompleted(); @@ -81,7 +79,7 @@ public void testSkipEverything() { @Test public void testSkipNothing() { Observable src = Observable.from(5, 3, 1); - Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + src.skipWhile(LESS_THAN_FIVE).subscribe(w); InOrder inOrder = inOrder(w); inOrder.verify(w, times(1)).onNext(5); @@ -94,7 +92,7 @@ public void testSkipNothing() { @Test public void testSkipSome() { Observable src = Observable.from(1, 2, 3, 4, 5, 3, 1, 5); - Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + src.skipWhile(LESS_THAN_FIVE).subscribe(w); InOrder inOrder = inOrder(w); inOrder.verify(w, times(1)).onNext(5); @@ -108,11 +106,30 @@ public void testSkipSome() { @Test public void testSkipError() { Observable src = Observable.from(1, 2, 42, 5, 3, 1); - Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + src.skipWhile(LESS_THAN_FIVE).subscribe(w); InOrder inOrder = inOrder(w); inOrder.verify(w, never()).onNext(anyInt()); inOrder.verify(w, never()).onCompleted(); inOrder.verify(w, times(1)).onError(any(RuntimeException.class)); } + + @Test + public void testSkipManySubscribers() { + Observable src = Observable.range(1, 10).skipWhile(LESS_THAN_FIVE); + int n = 5; + for (int i = 0; i < n; i++) { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + src.subscribe(o); + + for (int j = 5; j < 10; j++) { + inOrder.verify(o).onNext(j); + } + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + } }