Skip to content

Commit

Permalink
Merge pull request #246 from jmhofer/schedulePeriodically
Browse files Browse the repository at this point in the history
Scheduling actions periodically
  • Loading branch information
benjchristensen committed May 1, 2013
2 parents f2c53cc + 58456da commit cec81cb
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 36 deletions.
177 changes: 171 additions & 6 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@
*/
package rx;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

import rx.concurrency.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
Expand Down Expand Up @@ -71,6 +80,56 @@ public abstract class Scheduler {
*/
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);

/**
* Schedules a cancelable action to be executed periodically.
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param state
* State to pass into the action.
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final AtomicBoolean complete = new AtomicBoolean();

final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, T state0) {
if (!complete.get()) {
long startedAt = now();
final Subscription sub1 = action.call(scheduler, state0);
long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
return Subscriptions.create(new Action0() {
@Override
public void call() {
sub1.unsubscribe();
sub2.unsubscribe();
}
});
}
return Subscriptions.empty();
}
};
final Subscription sub = schedule(state, recursiveAction, initialDelay, unit);
return Subscriptions.create(new Action0() {
@Override
public void call() {
complete.set(true);
sub.unsubscribe();
}
});
}

/**
* Schedules a cancelable action to be executed at dueTime.
*
Expand Down Expand Up @@ -103,7 +162,7 @@ public Subscription schedule(final Func1<Scheduler, Subscription> action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call(scheduler);
}
});
Expand All @@ -120,7 +179,7 @@ public Subscription schedule(final Func0<Subscription> action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call();
}
});
Expand All @@ -137,7 +196,7 @@ public Subscription schedule(final Action0 action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
action.call();
return Subscriptions.empty();
}
Expand All @@ -159,7 +218,7 @@ public Subscription schedule(final Func1<Scheduler, Subscription> action, long d
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call(scheduler);
}
}, delayTime, unit);
Expand All @@ -176,7 +235,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
action.call();
return Subscriptions.empty();
}
Expand All @@ -194,17 +253,123 @@ public Subscription schedule(final Func0<Subscription> action, long delayTime, T
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call();
}
}, delayTime, unit);
}

/**
* Schedules a cancelable action to be executed periodically.
*
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public Subscription schedulePeriodically(final Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call(scheduler);
}
}, initialDelay, period, unit);
}

/**
* Schedules a cancelable action to be executed periodically.
*
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public Subscription schedulePeriodically(final Func0<Subscription> action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
@Override
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
return action.call();
}
}, initialDelay, period, unit);
}

/**
* Schedules an action to be executed periodically.
*
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @param unit
* The time unit the interval above is given in.
* @return A subscription to be able to unsubscribe from action.
*/
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
@Override
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
action.call();
return Subscriptions.empty();
}
}, initialDelay, period, unit);
}

/**
* Returns the scheduler's notion of current absolute time in milliseconds.
*/
public long now() {
return System.currentTimeMillis();
}

public static class UnitTest {
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
@Test
public void testPeriodicScheduling() {
final Func1<Long, Void> calledOp = mock(Func1.class);

final TestScheduler scheduler = new TestScheduler();
Subscription subscription = scheduler.schedulePeriodically(new Action0() {
@Override public void call() {
System.out.println(scheduler.now());
calledOp.call(scheduler.now());
}
}, 1, 2, TimeUnit.SECONDS);

verify(calledOp, never()).call(anyLong());

InOrder inOrder = Mockito.inOrder(calledOp);

scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).call(anyLong());

scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).call(1000L);

scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).call(3000L);

scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).call(3000L);

scheduler.advanceTimeBy(5L, TimeUnit.SECONDS);
inOrder.verify(calledOp, times(1)).call(5000L);
inOrder.verify(calledOp, times(1)).call(7000L);

subscription.unsubscribe();
scheduler.advanceTimeBy(11L, TimeUnit.SECONDS);
inOrder.verify(calledOp, never()).call(anyLong());
}
}
}
21 changes: 21 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
this.executor = executor;
}

@Override
public <T> Subscription schedulePeriodically(final T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
final CompositeSubscription subscriptions = new CompositeSubscription();

ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Subscription s = action.call(ExecutorScheduler.this, state);
subscriptions.add(s);
}
}, initialDelay, period, unit);

subscriptions.add(Subscriptions.create(f));
return subscriptions;

} else {
return super.schedulePeriodically(state, action, initialDelay, period, unit);
}
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
Expand Down
6 changes: 4 additions & 2 deletions rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ public int compare(TimedAction<?> action1, TimedAction<?> action2) {
}
}

// Storing time in nanoseconds internally.
private long time;

@Override
public long now() {
return time;
return TimeUnit.NANOSECONDS.toMillis(time);
}

public void advanceTimeBy(long delayTime, TimeUnit unit) {
Expand All @@ -79,6 +80,7 @@ private void triggerActions(long targetTimeInNanos) {
while (!queue.isEmpty()) {
TimedAction<?> current = queue.peek();
if (current.time > targetTimeInNanos) {
time = targetTimeInNanos;
break;
}
time = current.time;
Expand All @@ -95,7 +97,7 @@ public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> acti

@Override
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
queue.add(new TimedAction<T>(this, now() + unit.toNanos(delayTime), action, state));
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
return Subscriptions.empty();
}
}
37 changes: 12 additions & 25 deletions rxjava-core/src/main/java/rx/operators/OperationInterval.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -57,47 +56,35 @@ public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUn
}

private static class Interval implements Func1<Observer<Long>, Subscription> {
private final long interval;
private final long period;
private final TimeUnit unit;
private final Scheduler scheduler;

private long currentValue;
private final AtomicBoolean complete = new AtomicBoolean();

private Interval(long interval, TimeUnit unit, Scheduler scheduler) {
this.interval = interval;
private Interval(long period, TimeUnit unit, Scheduler scheduler) {
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
public Subscription call(final Observer<Long> observer) {
scheduler.schedule(new IntervalAction(observer), interval, unit);
return Subscriptions.create(new Action0() {
final Subscription wrapped = scheduler.schedulePeriodically(new Action0() {
@Override
public void call() {
complete.set(true);
observer.onNext(currentValue);
currentValue++;
}
});
}

private class IntervalAction implements Action0 {
private final Observer<Long> observer;

private IntervalAction(Observer<Long> observer) {
this.observer = observer;
}
}, period, period, unit);

@Override
public void call() {
if (complete.get()) {
return Subscriptions.create(new Action0() {
@Override
public void call() {
wrapped.unsubscribe();
observer.onCompleted();
} else {
observer.onNext(currentValue);
currentValue++;
scheduler.schedule(this, interval, unit);
}
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import rx.Observer;
import rx.Subscription;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
Expand Down Expand Up @@ -175,9 +176,8 @@ public Boolean call(Integer input)

@Test
public void testTakeWhileOnSubject1() {
PublishSubject<Integer> s = PublishSubject.create();
Observable<Integer> w = (Observable<Integer>) s;
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
Subject<Integer, Integer> s = PublishSubject.create();
Observable<Integer> take = Observable.create(takeWhile(s, new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer input)
Expand Down
Loading

0 comments on commit cec81cb

Please sign in to comment.