diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index afc35b6a32a..22735209cdc 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -15,9 +15,18 @@ */ package rx; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + import java.util.Date; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; +import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Func0; @@ -71,6 +80,56 @@ public abstract class Scheduler { */ public abstract Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit); + /** + * Schedules a cancelable action to be executed periodically. + * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing + * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. + * + * @param state + * State to pass into the action. + * @param action + * The action to execute periodically. + * @param initialDelay + * Time to wait before executing the action for the first time. + * @param period + * The time interval to wait each time in between executing the action. + * @param unit + * The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + public Subscription schedulePeriodically(T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { + final long periodInNanos = unit.toNanos(period); + final AtomicBoolean complete = new AtomicBoolean(); + + final Func2 recursiveAction = new Func2() { + @Override + public Subscription call(Scheduler scheduler, T state0) { + if (!complete.get()) { + long startedAt = now(); + final Subscription sub1 = action.call(scheduler, state0); + long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt); + final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); + return Subscriptions.create(new Action0() { + @Override + public void call() { + sub1.unsubscribe(); + sub2.unsubscribe(); + } + }); + } + return Subscriptions.empty(); + } + }; + final Subscription sub = schedule(state, recursiveAction, initialDelay, unit); + return Subscriptions.create(new Action0() { + @Override + public void call() { + complete.set(true); + sub.unsubscribe(); + } + }); + } + /** * Schedules a cancelable action to be executed at dueTime. * @@ -103,7 +162,7 @@ public Subscription schedule(final Func1 action) { return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { return action.call(scheduler); } }); @@ -120,7 +179,7 @@ public Subscription schedule(final Func0 action) { return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { return action.call(); } }); @@ -137,7 +196,7 @@ public Subscription schedule(final Action0 action) { return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { action.call(); return Subscriptions.empty(); } @@ -159,7 +218,7 @@ public Subscription schedule(final Func1 action, long d return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { return action.call(scheduler); } }, delayTime, unit); @@ -176,7 +235,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { action.call(); return Subscriptions.empty(); } @@ -194,12 +253,79 @@ public Subscription schedule(final Func0 action, long delayTime, T return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { return action.call(); } }, delayTime, unit); } + /** + * Schedules a cancelable action to be executed periodically. + * + * @param action + * The action to execute periodically. + * @param initialDelay + * Time to wait before executing the action for the first time. + * @param period + * The time interval to wait each time in between executing the action. + * @param unit + * The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + public Subscription schedulePeriodically(final Func1 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(null, new Func2() { + @Override + public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { + return action.call(scheduler); + } + }, initialDelay, period, unit); + } + + /** + * Schedules a cancelable action to be executed periodically. + * + * @param action + * The action to execute periodically. + * @param initialDelay + * Time to wait before executing the action for the first time. + * @param period + * The time interval to wait each time in between executing the action. + * @param unit + * The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + public Subscription schedulePeriodically(final Func0 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(null, new Func2() { + @Override + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { + return action.call(); + } + }, initialDelay, period, unit); + } + + /** + * Schedules an action to be executed periodically. + * + * @param action + * The action to execute periodically. + * @param initialDelay + * Time to wait before executing the action for the first time. + * @param period + * The time interval to wait each time in between executing the action. + * @param unit + * The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(null, new Func2() { + @Override + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { + action.call(); + return Subscriptions.empty(); + } + }, initialDelay, period, unit); + } + /** * Returns the scheduler's notion of current absolute time in milliseconds. */ @@ -207,4 +333,43 @@ public long now() { return System.currentTimeMillis(); } + public static class UnitTest { + @SuppressWarnings("unchecked") // mocking is unchecked, unfortunately + @Test + public void testPeriodicScheduling() { + final Func1 calledOp = mock(Func1.class); + + final TestScheduler scheduler = new TestScheduler(); + Subscription subscription = scheduler.schedulePeriodically(new Action0() { + @Override public void call() { + System.out.println(scheduler.now()); + calledOp.call(scheduler.now()); + } + }, 1, 2, TimeUnit.SECONDS); + + verify(calledOp, never()).call(anyLong()); + + InOrder inOrder = Mockito.inOrder(calledOp); + + scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(1000L); + + scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(3000L); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(3000L); + + scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); + inOrder.verify(calledOp, times(1)).call(5000L); + inOrder.verify(calledOp, times(1)).call(7000L); + + subscription.unsubscribe(); + scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + } + } } diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index ed40bec3507..a2522a29000 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -44,6 +44,27 @@ public ExecutorScheduler(ScheduledExecutorService executor) { this.executor = executor; } + @Override + public Subscription schedulePeriodically(final T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { + if (executor instanceof ScheduledExecutorService) { + final CompositeSubscription subscriptions = new CompositeSubscription(); + + ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + Subscription s = action.call(ExecutorScheduler.this, state); + subscriptions.add(s); + } + }, initialDelay, period, unit); + + subscriptions.add(Subscriptions.create(f)); + return subscriptions; + + } else { + return super.schedulePeriodically(state, action, initialDelay, period, unit); + } + } + @Override public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { final DiscardableAction discardableAction = new DiscardableAction(state, action); diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index c785031df26..a10ab90e216 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -54,11 +54,12 @@ public int compare(TimedAction action1, TimedAction action2) { } } + // Storing time in nanoseconds internally. private long time; @Override public long now() { - return time; + return TimeUnit.NANOSECONDS.toMillis(time); } public void advanceTimeBy(long delayTime, TimeUnit unit) { @@ -79,6 +80,7 @@ private void triggerActions(long targetTimeInNanos) { while (!queue.isEmpty()) { TimedAction current = queue.peek(); if (current.time > targetTimeInNanos) { + time = targetTimeInNanos; break; } time = current.time; @@ -95,7 +97,7 @@ public Subscription schedule(T state, Func2 acti @Override public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - queue.add(new TimedAction(this, now() + unit.toNanos(delayTime), action, state)); + queue.add(new TimedAction(this, time + unit.toNanos(delayTime), action, state)); return Subscriptions.empty(); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index 19a3736826d..a9a46d2bbe6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -20,7 +20,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Test; @@ -57,47 +56,35 @@ public static Func1, Subscription> interval(long interval, TimeUn } private static class Interval implements Func1, Subscription> { - private final long interval; + private final long period; private final TimeUnit unit; private final Scheduler scheduler; private long currentValue; - private final AtomicBoolean complete = new AtomicBoolean(); - private Interval(long interval, TimeUnit unit, Scheduler scheduler) { - this.interval = interval; + private Interval(long period, TimeUnit unit, Scheduler scheduler) { + this.period = period; this.unit = unit; this.scheduler = scheduler; } @Override public Subscription call(final Observer observer) { - scheduler.schedule(new IntervalAction(observer), interval, unit); - return Subscriptions.create(new Action0() { + final Subscription wrapped = scheduler.schedulePeriodically(new Action0() { @Override public void call() { - complete.set(true); + observer.onNext(currentValue); + currentValue++; } - }); - } - - private class IntervalAction implements Action0 { - private final Observer observer; - - private IntervalAction(Observer observer) { - this.observer = observer; - } + }, period, period, unit); - @Override - public void call() { - if (complete.get()) { + return Subscriptions.create(new Action0() { + @Override + public void call() { + wrapped.unsubscribe(); observer.onCompleted(); - } else { - observer.onNext(currentValue); - currentValue++; - scheduler.schedule(this, interval, unit); } - } + }); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index 618e507a441..2941766547b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -27,6 +27,7 @@ import rx.Observer; import rx.Subscription; import rx.subjects.PublishSubject; +import rx.subjects.Subject; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; @@ -175,9 +176,8 @@ public Boolean call(Integer input) @Test public void testTakeWhileOnSubject1() { - PublishSubject s = PublishSubject.create(); - Observable w = (Observable) s; - Observable take = Observable.create(takeWhile(w, new Func1() + Subject s = PublishSubject.create(); + Observable take = Observable.create(takeWhile(s, new Func1() { @Override public Boolean call(Integer input) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTester.java b/rxjava-core/src/main/java/rx/operators/OperatorTester.java index 086affa96da..6fa93c1a1d7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTester.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTester.java @@ -339,6 +339,26 @@ public Subscription schedule(T state, Func2 acti return underlying.schedule(state, action, dueTime, unit); } + @Override + public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(Func0 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(state, action, initialDelay, period, unit); + } + @Override public long now() { return underlying.now();