diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 746362ca78..314b1ce4dc 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -64,7 +64,6 @@ import rx.operators.OperationReplay; import rx.operators.OperationSample; import rx.operators.OperationSequenceEqual; -import rx.operators.OperationSkip; import rx.operators.OperationSkipUntil; import rx.operators.OperationSwitch; import rx.operators.OperationTakeLast; @@ -118,6 +117,7 @@ import rx.operators.OperatorSkip; import rx.operators.OperatorSkipLast; import rx.operators.OperatorSkipLastTimed; +import rx.operators.OperatorSkipTimed; import rx.operators.OperatorSkipWhile; import rx.operators.OperatorSubscribeOn; import rx.operators.OperatorTake; @@ -5547,7 +5547,7 @@ public final Observable skip(long time, TimeUnit unit) { * @see RxJava Wiki: skip() */ public final Observable skip(long time, TimeUnit unit, Scheduler scheduler) { - return create(new OperationSkip.SkipTimed(this, time, unit, scheduler)); + return lift(new OperatorSkipTimed(time, unit, scheduler)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkip.java b/rxjava-core/src/main/java/rx/operators/OperationSkip.java deleted file mode 100644 index f52b14374d..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationSkip.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Worker; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.subscriptions.CompositeSubscription; - -/** - * Returns an Observable that skips the first num items emitted by the source - * Observable. - *

- * - *

- * You can ignore the first num items emitted by an Observable and attend only to - * those items that come after, by modifying the Observable with the skip operation. - */ -public final class OperationSkip { - - /** - * Skip the items after subscription for the given duration. - * - * @param - * the value type - */ - public static final class SkipTimed implements OnSubscribeFunc { - final Observable source; - final long time; - final TimeUnit unit; - final Scheduler scheduler; - - public SkipTimed(Observable source, long time, TimeUnit unit, Scheduler scheduler) { - this.source = source; - this.time = time; - this.unit = unit; - this.scheduler = scheduler; - } - - @Override - public Subscription onSubscribe(Observer t1) { - Worker inner = scheduler.createWorker(); - CompositeSubscription csub = new CompositeSubscription(inner); - final SourceObserver so = new SourceObserver(t1, csub); - csub.add(so); - source.unsafeSubscribe(so); - if (!so.isUnsubscribed()) { - inner.schedule(so, time, unit); - } - - return csub; - } - - /** - * Observes the source and relays its values once gate turns into true. - * - * @param - * the observed value type - */ - private static final class SourceObserver extends Subscriber implements Action0 { - final AtomicBoolean gate; - final Observer observer; - final Subscription cancel; - - public SourceObserver(Observer observer, - Subscription cancel) { - this.gate = new AtomicBoolean(); - this.observer = observer; - this.cancel = cancel; - } - - @Override - public void onNext(T args) { - if (gate.get()) { - observer.onNext(args); - } - } - - @Override - public void onError(Throwable e) { - try { - observer.onError(e); - } finally { - cancel.unsubscribe(); - } - } - - @Override - public void onCompleted() { - try { - observer.onCompleted(); - } finally { - cancel.unsubscribe(); - } - } - - @Override - public void call() { - gate.set(true); - } - - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSkipTimed.java b/rxjava-core/src/main/java/rx/operators/OperatorSkipTimed.java new file mode 100644 index 0000000000..5119c85aaf --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorSkipTimed.java @@ -0,0 +1,80 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import rx.Observable.Operator; +import rx.Scheduler; +import rx.Scheduler.Worker; +import rx.Subscriber; +import rx.functions.Action0; + +/** + * Skips elements until a specified time elapses. + * @param the value type + */ +public final class OperatorSkipTimed implements Operator { + final long time; + final TimeUnit unit; + final Scheduler scheduler; + + public OperatorSkipTimed(long time, TimeUnit unit, Scheduler scheduler) { + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscriber call(final Subscriber child) { + final Worker worker = scheduler.createWorker(); + child.add(worker); + final AtomicBoolean gate = new AtomicBoolean(); + worker.schedule(new Action0() { + @Override + public void call() { + gate.set(true); + } + }, time, unit); + return new Subscriber(child) { + + @Override + public void onNext(T t) { + if (gate.get()) { + child.onNext(t); + } + } + + @Override + public void onError(Throwable e) { + try { + child.onError(e); + } finally { + unsubscribe(); + } + } + + @Override + public void onCompleted() { + try { + child.onCompleted(); + } finally { + unsubscribe(); + } + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationTakeTimedTest.java b/rxjava-core/src/test/java/rx/operators/OperationTakeTimedTest.java index 856aef6ea3..a31e3dbf5d 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationTakeTimedTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationTakeTimedTest.java @@ -28,7 +28,7 @@ import rx.Observable; import rx.Observer; -import rx.operators.OperationSkipTest.CustomException; +import rx.operators.OperatorSkipTimedTest.CustomException; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSkipLastTimedTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSkipLastTimedTest.java index 6bcaf9c6ba..087462122e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSkipLastTimedTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSkipLastTimedTest.java @@ -28,7 +28,7 @@ import rx.Observable; import rx.Observer; -import rx.operators.OperationSkipTest.CustomException; +import rx.operators.OperatorSkipTimedTest.CustomException; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; diff --git a/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSkipTimedTest.java similarity index 96% rename from rxjava-core/src/test/java/rx/operators/OperationSkipTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorSkipTimedTest.java index 53cfaf63f3..1313b6d001 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSkipTimedTest.java @@ -31,7 +31,7 @@ import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; -public class OperationSkipTest { +public class OperatorSkipTimedTest { @Test public void testSkipTimed() { @@ -41,6 +41,7 @@ public void testSkipTimed() { Observable result = source.skip(1, TimeUnit.SECONDS, scheduler); + @SuppressWarnings("unchecked") Observer o = mock(Observer.class); result.subscribe(o); @@ -78,6 +79,7 @@ public void testSkipTimedFinishBeforeTime() { Observable result = source.skip(1, TimeUnit.SECONDS, scheduler); + @SuppressWarnings("unchecked") Observer o = mock(Observer.class); result.subscribe(o); @@ -108,6 +110,7 @@ public void testSkipTimedErrorBeforeTime() { Observable result = source.skip(1, TimeUnit.SECONDS, scheduler); + @SuppressWarnings("unchecked") Observer o = mock(Observer.class); result.subscribe(o); @@ -135,6 +138,7 @@ public void testSkipTimedErrorAfterTime() { Observable result = source.skip(1, TimeUnit.SECONDS, scheduler); + @SuppressWarnings("unchecked") Observer o = mock(Observer.class); result.subscribe(o);