Skip to content

Commit

Permalink
Merge pull request #909 from benjchristensen/scheduler-plugin
Browse files Browse the repository at this point in the history
Scheduler Plugin Refactor
  • Loading branch information
benjchristensen committed Feb 20, 2014
2 parents bccac64 + 07ce114 commit 9cb6d81
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 55 deletions.
19 changes: 12 additions & 7 deletions rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.plugins;

import rx.Scheduler;
import rx.functions.Func0;

/**
* Define alternate Scheduler implementations to be returned by the `Schedulers` factory methods.
Expand All @@ -27,17 +26,23 @@
public abstract class RxJavaDefaultSchedulers {

/**
* Factory of Scheduler to return from {@link Schedulers.computation()} or null if default should be used.
* Scheduler to return from {@link Schedulers.computation()} or null if default should be used.
*
* This instance should be or behave like a stateless singleton;
*/
public abstract Func0<Scheduler> getComputationSchedulerFactory();
public abstract Scheduler getComputationScheduler();

/**
* Factory of Scheduler to return from {@link Schedulers.io()} or null if default should be used.
* Scheduler to return from {@link Schedulers.io()} or null if default should be used.
*
* This instance should be or behave like a stateless singleton;
*/
public abstract Func0<Scheduler> getIOSchedulerFactory();
public abstract Scheduler getIOScheduler();

/**
* Factory of Scheduler to return from {@link Schedulers.newThread()} or null if default should be used.
* Scheduler to return from {@link Schedulers.newThread()} or null if default should be used.
*
* This instance should be or behave like a stateless singleton;
*/
public abstract Func0<Scheduler> getNewThreadSchedulerFactory();
public abstract Scheduler getNewThreadScheduler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.plugins;

import rx.Scheduler;
import rx.functions.Func0;

/**
* Default implementation of {@link RxJavaErrorHandler} that does nothing.
Expand All @@ -27,20 +26,23 @@ public class RxJavaDefaultSchedulersDefault extends RxJavaDefaultSchedulers {

private static RxJavaDefaultSchedulersDefault INSTANCE = new RxJavaDefaultSchedulersDefault();

public Func0<Scheduler> getComputationSchedulerFactory() {
return null;
public static RxJavaDefaultSchedulers getInstance() {
return INSTANCE;
}

public Func0<Scheduler> getIOSchedulerFactory() {
@Override
public Scheduler getComputationScheduler() {
return null;
}

public Func0<Scheduler> getNewThreadSchedulerFactory() {
@Override
public Scheduler getIOScheduler() {
return null;
}

public static RxJavaDefaultSchedulers getInstance() {
return INSTANCE;
@Override
public Scheduler getNewThreadScheduler() {
return null;
}

}
11 changes: 10 additions & 1 deletion rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,20 @@
public class ExecutorScheduler extends Scheduler {
private final Executor executor;

/**
* @deprecated Use Schedulers.executor();
* @return
*/
@Deprecated
public ExecutorScheduler(Executor executor) {
this.executor = executor;
}

/**
* @deprecated Use Schedulers.executor();
* @return
*/
@Deprecated
public ExecutorScheduler(ScheduledExecutorService executor) {
this.executor = executor;
}
Expand All @@ -50,7 +60,6 @@ public Subscription schedule(Action1<Scheduler.Inner> action) {
inner.schedule(action);
return inner.innerSubscription;
}


@Override
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
Expand Down
10 changes: 9 additions & 1 deletion rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,19 @@
public final class ImmediateScheduler extends Scheduler {
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();

/**
* @deprecated Use Schedulers.immediate();
* @return
*/
@Deprecated
public static ImmediateScheduler getInstance() {
return INSTANCE;
}

/* package */static ImmediateScheduler instance() {
return INSTANCE;
}

/* package accessible for unit tests */ImmediateScheduler() {
}

Expand All @@ -49,7 +58,6 @@ public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit uni
return inner.innerSubscription;
}


private class InnerImmediateScheduler extends Scheduler.Inner implements Subscription {

final BooleanSubscription innerSubscription = new BooleanSubscription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ public Thread newThread(Runnable r) {
}
};

/**
* @deprecated Use Schedulers.newThread();
* @return
*/
@Deprecated
public static NewThreadScheduler getInstance() {
return INSTANCE;
}
/* package */ static NewThreadScheduler instance() {

/* package */static NewThreadScheduler instance() {
return INSTANCE;
}

Expand Down
52 changes: 15 additions & 37 deletions rxjava-core/src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,39 @@
import java.util.concurrent.atomic.AtomicLong;

import rx.Scheduler;
import rx.functions.Func0;
import rx.plugins.RxJavaPlugins;

/**
* Static factory methods for creating Schedulers.
*/
public class Schedulers {

private final Func0<Scheduler> computationScheduler;
private final Func0<Scheduler> ioScheduler;
private final Func0<Scheduler> newThreadScheduler;
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();

private Schedulers() {
Func0<Scheduler> c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationSchedulerFactory();
Scheduler c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return executor(createComputationExecutor());
}

};
computationScheduler = executor(createComputationExecutor());
}

Func0<Scheduler> io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOSchedulerFactory();
Scheduler io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return executor(createIOExecutor());
}

};
ioScheduler = executor(createIOExecutor());
}

Func0<Scheduler> nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadSchedulerFactory();
Scheduler nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return NewThreadScheduler.instance();
}

};
newThreadScheduler = NewThreadScheduler.instance();
}

}
Expand All @@ -88,7 +66,7 @@ public Scheduler call() {
* @return {@link ImmediateScheduler} instance
*/
public static Scheduler immediate() {
return ImmediateScheduler.getInstance();
return ImmediateScheduler.instance();
}

/**
Expand All @@ -99,7 +77,7 @@ public static Scheduler immediate() {
*/
@Deprecated
public static Scheduler currentThread() {
return TrampolineScheduler.getInstance();
return TrampolineScheduler.instance();
}

/**
Expand All @@ -108,7 +86,7 @@ public static Scheduler currentThread() {
* @return {@link TrampolineScheduler} instance
*/
public static Scheduler trampoline() {
return TrampolineScheduler.getInstance();
return TrampolineScheduler.instance();
}

/**
Expand All @@ -117,7 +95,7 @@ public static Scheduler trampoline() {
* @return {@link NewThreadScheduler} instance
*/
public static Scheduler newThread() {
return INSTANCE.newThreadScheduler.call();
return INSTANCE.newThreadScheduler;
}

/**
Expand Down Expand Up @@ -167,7 +145,7 @@ public static Scheduler threadPoolForComputation() {
* @return {@link Scheduler} for computation-bound work.
*/
public static Scheduler computation() {
return INSTANCE.computationScheduler.call();
return INSTANCE.computationScheduler;
}

/**
Expand Down Expand Up @@ -199,7 +177,7 @@ public static Scheduler threadPoolForIO() {
* @return {@link ExecutorScheduler} for IO-bound work.
*/
public static Scheduler io() {
return INSTANCE.ioScheduler.call();
return INSTANCE.ioScheduler;
}

private static ScheduledExecutorService createComputationExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@
public class TrampolineScheduler extends Scheduler {
private static final TrampolineScheduler INSTANCE = new TrampolineScheduler();

/**
* @deprecated Use Schedulers.trampoline();
* @return
*/
@Deprecated
public static TrampolineScheduler getInstance() {
return INSTANCE;
}

/* package */ static TrampolineScheduler instance() {
return INSTANCE;
}

@Override
public Subscription schedule(Action1<Scheduler.Inner> action) {
Expand Down

0 comments on commit 9cb6d81

Please sign in to comment.