diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 078950b404..f9dd5b9428 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2253,6 +2253,31 @@ public Observable delay(long delay, TimeUnit unit, Scheduler scheduler) { return OperationDelay.delay(this, delay, unit, scheduler); } + /** + * Return an Observable which delays the subscription to this Observable sequence + * by the given amount. + * @param delay the time to delay the subscription + * @param unit the time unit + * @return an Observable which delays the subscription to this Observable sequence + * by the given amount. + */ + public Observable delaySubscription(long delay, TimeUnit unit) { + return delaySubscription(delay, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Return an Observable which delays the subscription to this Observable sequence + * by the given amount, waiting and subscribing on the given scheduler. + * @param delay the time to delay the subscription + * @param unit the time unit + * @param scheduler the scheduler where the waiting and subscription will happen + * @return an Observable which delays the subscription to this Observable sequence + * by the given amount, waiting and subscribing on the given scheduler + */ + public Observable delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) { + return create(OperationDelay.delaySubscription(this, delay, unit, scheduler)); + } + /** * Drops items emitted by an Observable that are followed by newer items * before a timeout value expires. The timer resets on each emission. @@ -5489,6 +5514,148 @@ public Observable takeLast(final int count) { return create(OperationTakeLast.takeLast(this, count)); } + /** + * Return an Observable which contains the items from this observable which + * were emitted not before this completed minus a time window. + * + * @param time the length of the time window, relative to the completion of this + * observable. + * @param unit the time unit + * @return an Observable which contains the items from this observable which + * were emitted not before this completed minus a time window. + */ + public Observable takeLast(long time, TimeUnit unit) { + return takeLast(time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Return an Observable which contains the items from this observable which + * were emitted not before this completed minus a time window, where the timing + * information is provided by the given scheduler. + * + * @param time the length of the time window, relative to the completion of this + * observable. + * @param unit the time unit + * @param scheduler the scheduler which provides the timestamps for the observed + * elements + * @return an Observable which contains the items from this observable which + * were emitted not before this completed minus a time window, where the timing + * information is provided by the given scheduler + */ + public Observable takeLast(long time, TimeUnit unit, Scheduler scheduler) { + return create(OperationTakeLast.takeLast(this, time, unit, scheduler)); + } + + /** + * Return an Observable which contains at most count items from this Observable + * which were emitted not before this completed minus a time window. + * + * @param count the maximum number of items to return + * @param time the length of the time window, relative to the completion of this + * observable. + * @param unit the time unit + * @return Return an Observable which contains at most count items from this Observable + * which were emitted not before this completed minus a time window. + */ + public Observable takeLast(int count, long time, TimeUnit unit) { + return takeLast(count, time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Return an Observable which contains at most count items from this Observable + * which were emitted not before this completed minus a time window, where the timing + * information is provided by the given scheduler. + * + * @param count the maximum number of items to return + * @param time the length of the time window, relative to the completion of this + * observable. + * @param unit the time unit + * @param scheduler the scheduler which provides the timestamps for the observed + * elements + * @return Return an Observable which contains at most count items from this Observable + * which were emitted not before this completed minus a time window, where the timing + * information is provided by the given scheduler + */ + public Observable takeLast(int count, long time, TimeUnit unit, Scheduler scheduler) { + if (count < 0) { + throw new IllegalArgumentException("count >= 0 required"); + } + return create(OperationTakeLast.takeLast(this, count, time, unit, scheduler)); + } + + /** + * Return an Observable which emits single List containing the last count + * elements from this Observable. + * + * @param count the number of items to take last + * @return an Observable which emits single list containing the last count + * elements from this Observable. + */ + public Observable> takeLastBuffer(int count) { + return takeLast(count).toList(); + } + + /** + * Return an Observable which emits single List containing items which + * were emitted not before this completed minus a time window. + * @param time the length of the time window, relative to the completion of this + * observable. + * @param unit the time unit + * @return an Observable which emits single list containing items which + * were emitted not before this completed minus a time window + */ + public Observable> takeLastBuffer(long time, TimeUnit unit) { + return takeLast(time, unit).toList(); + } + + /** + * Return an Observable which emits single List containing items which + * were emitted not before this completed minus a time window, where the timing + * information is provided by the given scheduler. + * @param time the length of the time window, relative to the completion of this + * observable. + * @param unit the time unit + * @param scheduler the scheduler which provides the timestamps for the observed + * elements + * @return an Observable which emits single list containing items which + * were emitted not before this completed minus a time window, where the timing + * information is provided by the given scheduler + */ + public Observable> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) { + return takeLast(time, unit, scheduler).toList(); + } + + /** + * Return an Observable which emits a single List containing at most count items + * from this Observable which were emitted not before this completed minus a time window. + * @param count the number of items to take last + * @param time the length of the time window, relative to the completion of this + * observable. + * @param unit the time unit + * @return an Observable which emits a single List containing at most count items + * from this Observable which were emitted not before this completed minus a time window. + */ + public Observable> takeLastBuffer(int count, long time, TimeUnit unit) { + return takeLast(count, time, unit).toList(); + } + + /** + * Return an Observable which emits a single List containing at most count items + * from this Observable which were emitted not before this completed minus a time window. + * @param count the number of items to take last + * @param time the length of the time window, relative to the completion of this + * observable. + * @param unit the time unit + * @param scheduler the scheduler which provides the timestamps for the observed + * elements + * @return an Observable which emits a single List containing at most count items + * from this Observable which were emitted not before this completed minus a time window. + */ + public Observable> takeLastBuffer(int count, long time, TimeUnit unit, Scheduler scheduler) { + return takeLast(count, time, unit, scheduler).toList(); + } + + /** * Returns an Observable that emits the items from the source Observable * only until the other Observable emits an item. diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index b1f7b088da..4594adda76 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -18,8 +18,13 @@ import java.util.concurrent.TimeUnit; import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; import rx.Scheduler; +import rx.Subscription; import rx.observables.ConnectableObservable; +import rx.subscriptions.SerialSubscription; +import rx.util.functions.Action0; import rx.util.functions.Func1; public final class OperationDelay { @@ -40,4 +45,41 @@ public T call(Long ignored) { }); return Observable.concat(seqs); } + + /** + * Delays the subscription to the source by the given amount, running on the given scheduler. + */ + public static OnSubscribeFunc delaySubscription(Observable source, long time, TimeUnit unit, Scheduler scheduler) { + return new DelaySubscribeFunc(source, time, unit, scheduler); + } + + /** Subscribe function which schedules the actual subscription to source on a scheduler at a later time. */ + private static final class DelaySubscribeFunc implements OnSubscribeFunc { + final Observable source; + final Scheduler scheduler; + final long time; + final TimeUnit unit; + + public DelaySubscribeFunc(Observable source, long time, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + this.time = time; + this.unit = unit; + } + @Override + public Subscription onSubscribe(final Observer t1) { + final SerialSubscription ssub = new SerialSubscription(); + + ssub.setSubscription(scheduler.schedule(new Action0() { + @Override + public void call() { + if (!ssub.isUnsubscribed()) { + ssub.setSubscription(source.subscribe(t1)); + } + } + }, time, unit)); + + return ssub; + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index d324f7ce42..c0558d43e1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -17,12 +17,16 @@ import java.util.Deque; import java.util.LinkedList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.SingleAssignmentSubscription; +import rx.util.Timestamped; /** * Returns an Observable that emits the last count items emitted by the source @@ -119,4 +123,121 @@ public void onNext(T value) { } } + + /** + * Returns the items emitted by source whose arrived in the time window + * before the source completed. + */ + public static OnSubscribeFunc takeLast(Observable source, long time, TimeUnit unit, Scheduler scheduler) { + return new TakeLastTimed(source, -1, time, unit, scheduler); + } + + /** + * Returns the items emitted by source whose arrived in the time window + * before the source completed and at most count values. + */ + public static OnSubscribeFunc takeLast(Observable source, int count, long time, TimeUnit unit, Scheduler scheduler) { + return new TakeLastTimed(source, count, time, unit, scheduler); + } + + /** Take only the values which appeared some time before the completion. */ + static final class TakeLastTimed implements OnSubscribeFunc { + final Observable source; + final long ageMillis; + final Scheduler scheduler; + final int count; + + public TakeLastTimed(Observable source, int count, long time, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.ageMillis = unit.toMillis(time); + this.scheduler = scheduler; + this.count = count; + } + + @Override + public Subscription onSubscribe(Observer t1) { + SingleAssignmentSubscription sas = new SingleAssignmentSubscription(); + sas.set(source.subscribe(new TakeLastTimedObserver(t1, sas, count, ageMillis, scheduler))); + return sas; + } + } + /** Observes source values and keeps the most recent items. */ + static final class TakeLastTimedObserver implements Observer { + final Observer observer; + final Subscription cancel; + final long ageMillis; + final Scheduler scheduler; + /** -1 indicates unlimited buffer. */ + final int count; + + final Deque> buffer = new LinkedList>(); + + public TakeLastTimedObserver(Observer observer, Subscription cancel, + int count, long ageMillis, Scheduler scheduler) { + this.observer = observer; + this.cancel = cancel; + this.ageMillis = ageMillis; + this.scheduler = scheduler; + this.count = count; + } + + protected void runEvictionPolicy(long now) { + // trim size + while (count >= 0 && buffer.size() > count) { + buffer.pollFirst(); + } + // remove old entries + while (!buffer.isEmpty()) { + Timestamped v = buffer.peekFirst(); + if (v.getTimestampMillis() < now - ageMillis) { + buffer.pollFirst(); + } else { + break; + } + } + } + + @Override + public void onNext(T args) { + long t = scheduler.now(); + buffer.add(new Timestamped(t, args)); + runEvictionPolicy(t); + } + + @Override + public void onError(Throwable e) { + buffer.clear(); + observer.onError(e); + cancel.unsubscribe(); + } + + /** + * Emit the contents of the buffer. + * @return true if no exception was raised in the process + */ + protected boolean emitBuffer() { + for (Timestamped v : buffer) { + try { + observer.onNext(v.getValue()); + } catch (Throwable t) { + buffer.clear(); + observer.onError(t); + return false; + } + } + buffer.clear(); + return true; + } + + @Override + public void onCompleted() { + runEvictionPolicy(scheduler.now()); + + if (emitBuffer()) { + observer.onCompleted(); + } + cancel.unsubscribe(); + } + + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java index 538131731d..10fdd459bd 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java @@ -30,9 +30,11 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mock; +import static org.mockito.Mockito.mock; import rx.Observable; import rx.Observer; +import rx.Subscription; import rx.schedulers.TestScheduler; import rx.util.functions.Func1; @@ -193,4 +195,44 @@ public void testDelayWithMultipleSubscriptions() { verify(observer, never()).onError(any(Throwable.class)); verify(observer2, never()).onError(any(Throwable.class)); } + + @Test + public void testDelaySubscription() { + TestScheduler scheduler = new TestScheduler(); + + Observable result = Observable.from(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + inOrder.verify(o, never()).onNext(any()); + inOrder.verify(o, never()).onCompleted(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + inOrder.verify(o, times(1)).onNext(1); + inOrder.verify(o, times(1)).onNext(2); + inOrder.verify(o, times(1)).onNext(3); + inOrder.verify(o, times(1)).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testDelaySubscriptionCancelBeforeTime() { + TestScheduler scheduler = new TestScheduler(); + + Observable result = Observable.from(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler); + + Observer o = mock(Observer.class); + + Subscription s = result.subscribe(o); + s.unsubscribe(); + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationTakeLastTest.java b/rxjava-core/src/test/java/rx/operators/OperationTakeLastTest.java index 5ec876368b..763e3f4c1e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationTakeLastTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationTakeLastTest.java @@ -15,7 +15,7 @@ */ package rx.operators; -import static org.mockito.Matchers.*; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.*; import static rx.operators.OperationTakeLast.*; @@ -24,6 +24,8 @@ import rx.Observable; import rx.Observer; +import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; public class OperationTakeLastTest { @@ -110,4 +112,168 @@ public void testTakeLastWithNegativeCount() { any(IndexOutOfBoundsException.class)); verify(aObserver, never()).onCompleted(); } + + @Test + public void takeLastTimed() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.takeLast(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + source.onNext(1); // T: 0ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(2); // T: 250ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(3); // T: 500ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(4); // T: 750ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(5); // T: 1000ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onCompleted(); // T: 1250ms + + inOrder.verify(o, times(1)).onNext(2); + inOrder.verify(o, times(1)).onNext(3); + inOrder.verify(o, times(1)).onNext(4); + inOrder.verify(o, times(1)).onNext(5); + inOrder.verify(o, times(1)).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void takeLastTimedDelayCompletion() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.takeLast(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + source.onNext(1); // T: 0ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(2); // T: 250ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(3); // T: 500ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(4); // T: 750ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(5); // T: 1000ms + scheduler.advanceTimeBy(1250, TimeUnit.MILLISECONDS); + source.onCompleted(); // T: 2250ms + + inOrder.verify(o, times(1)).onCompleted(); + + verify(o, never()).onNext(any()); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void takeLastTimedWithCapacity() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.takeLast(2, 1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + source.onNext(1); // T: 0ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(2); // T: 250ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(3); // T: 500ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(4); // T: 750ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(5); // T: 1000ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onCompleted(); // T: 1250ms + + inOrder.verify(o, times(1)).onNext(4); + inOrder.verify(o, times(1)).onNext(5); + inOrder.verify(o, times(1)).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + } + static final class CustomException extends RuntimeException { + + } + @Test + public void takeLastTimedThrowingSource() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.takeLast(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + source.onNext(1); // T: 0ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(2); // T: 250ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(3); // T: 500ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(4); // T: 750ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(5); // T: 1000ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onError(new CustomException()); // T: 1250ms + + inOrder.verify(o, times(1)).onError(any(CustomException.class)); + + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + + @Test + public void takeLastTimedWithZeroCapacity() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.takeLast(0, 1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + source.onNext(1); // T: 0ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(2); // T: 250ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(3); // T: 500ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(4); // T: 750ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onNext(5); // T: 1000ms + scheduler.advanceTimeBy(250, TimeUnit.MILLISECONDS); + source.onCompleted(); // T: 1250ms + + inOrder.verify(o, times(1)).onCompleted(); + + verify(o, never()).onNext(any()); + verify(o, never()).onError(any(Throwable.class)); + } }