Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler Plugin Refactor #909

Merged
merged 2 commits into from
Feb 20, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.plugins;

import rx.Scheduler;
import rx.functions.Func0;

/**
* Define alternate Scheduler implementations to be returned by the `Schedulers` factory methods.
Expand All @@ -27,17 +26,23 @@
public abstract class RxJavaDefaultSchedulers {

/**
* Factory of Scheduler to return from {@link Schedulers.computation()} or null if default should be used.
* Scheduler to return from {@link Schedulers.computation()} or null if default should be used.
*
* This instance should be or behave like a stateless singleton;
*/
public abstract Func0<Scheduler> getComputationSchedulerFactory();
public abstract Scheduler getComputationScheduler();

/**
* Factory of Scheduler to return from {@link Schedulers.io()} or null if default should be used.
* Scheduler to return from {@link Schedulers.io()} or null if default should be used.
*
* This instance should be or behave like a stateless singleton;
*/
public abstract Func0<Scheduler> getIOSchedulerFactory();
public abstract Scheduler getIOScheduler();

/**
* Factory of Scheduler to return from {@link Schedulers.newThread()} or null if default should be used.
* Scheduler to return from {@link Schedulers.newThread()} or null if default should be used.
*
* This instance should be or behave like a stateless singleton;
*/
public abstract Func0<Scheduler> getNewThreadSchedulerFactory();
public abstract Scheduler getNewThreadScheduler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.plugins;

import rx.Scheduler;
import rx.functions.Func0;

/**
* Default implementation of {@link RxJavaErrorHandler} that does nothing.
Expand All @@ -27,20 +26,23 @@ public class RxJavaDefaultSchedulersDefault extends RxJavaDefaultSchedulers {

private static RxJavaDefaultSchedulersDefault INSTANCE = new RxJavaDefaultSchedulersDefault();

public Func0<Scheduler> getComputationSchedulerFactory() {
return null;
public static RxJavaDefaultSchedulers getInstance() {
return INSTANCE;
}

public Func0<Scheduler> getIOSchedulerFactory() {
@Override
public Scheduler getComputationScheduler() {
return null;
}

public Func0<Scheduler> getNewThreadSchedulerFactory() {
@Override
public Scheduler getIOScheduler() {
return null;
}

public static RxJavaDefaultSchedulers getInstance() {
return INSTANCE;
@Override
public Scheduler getNewThreadScheduler() {
return null;
}

}
11 changes: 10 additions & 1 deletion rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,20 @@
public class ExecutorScheduler extends Scheduler {
private final Executor executor;

/**
* @deprecated Use Schedulers.executor();
* @return
*/
@Deprecated
public ExecutorScheduler(Executor executor) {
this.executor = executor;
}

/**
* @deprecated Use Schedulers.executor();
* @return
*/
@Deprecated
public ExecutorScheduler(ScheduledExecutorService executor) {
this.executor = executor;
}
Expand All @@ -50,7 +60,6 @@ public Subscription schedule(Action1<Scheduler.Inner> action) {
inner.schedule(action);
return inner.innerSubscription;
}


@Override
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
Expand Down
10 changes: 9 additions & 1 deletion rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,19 @@
public final class ImmediateScheduler extends Scheduler {
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();

/**
* @deprecated Use Schedulers.immediate();
* @return
*/
@Deprecated
public static ImmediateScheduler getInstance() {
return INSTANCE;
}

/* package */static ImmediateScheduler instance() {
return INSTANCE;
}

/* package accessible for unit tests */ImmediateScheduler() {
}

Expand All @@ -49,7 +58,6 @@ public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit uni
return inner.innerSubscription;
}


private class InnerImmediateScheduler extends Scheduler.Inner implements Subscription {

final BooleanSubscription innerSubscription = new BooleanSubscription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ public Thread newThread(Runnable r) {
}
};

/**
* @deprecated Use Schedulers.newThread();
* @return
*/
@Deprecated
public static NewThreadScheduler getInstance() {
return INSTANCE;
}
/* package */ static NewThreadScheduler instance() {

/* package */static NewThreadScheduler instance() {
return INSTANCE;
}

Expand Down
52 changes: 15 additions & 37 deletions rxjava-core/src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,39 @@
import java.util.concurrent.atomic.AtomicLong;

import rx.Scheduler;
import rx.functions.Func0;
import rx.plugins.RxJavaPlugins;

/**
* Static factory methods for creating Schedulers.
*/
public class Schedulers {

private final Func0<Scheduler> computationScheduler;
private final Func0<Scheduler> ioScheduler;
private final Func0<Scheduler> newThreadScheduler;
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();

private Schedulers() {
Func0<Scheduler> c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationSchedulerFactory();
Scheduler c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return executor(createComputationExecutor());
}

};
computationScheduler = executor(createComputationExecutor());
}

Func0<Scheduler> io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOSchedulerFactory();
Scheduler io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return executor(createIOExecutor());
}

};
ioScheduler = executor(createIOExecutor());
}

Func0<Scheduler> nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadSchedulerFactory();
Scheduler nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return NewThreadScheduler.instance();
}

};
newThreadScheduler = NewThreadScheduler.instance();
}

}
Expand All @@ -88,7 +66,7 @@ public Scheduler call() {
* @return {@link ImmediateScheduler} instance
*/
public static Scheduler immediate() {
return ImmediateScheduler.getInstance();
return ImmediateScheduler.instance();
}

/**
Expand All @@ -99,7 +77,7 @@ public static Scheduler immediate() {
*/
@Deprecated
public static Scheduler currentThread() {
return TrampolineScheduler.getInstance();
return TrampolineScheduler.instance();
}

/**
Expand All @@ -108,7 +86,7 @@ public static Scheduler currentThread() {
* @return {@link TrampolineScheduler} instance
*/
public static Scheduler trampoline() {
return TrampolineScheduler.getInstance();
return TrampolineScheduler.instance();
}

/**
Expand All @@ -117,7 +95,7 @@ public static Scheduler trampoline() {
* @return {@link NewThreadScheduler} instance
*/
public static Scheduler newThread() {
return INSTANCE.newThreadScheduler.call();
return INSTANCE.newThreadScheduler;
}

/**
Expand Down Expand Up @@ -167,7 +145,7 @@ public static Scheduler threadPoolForComputation() {
* @return {@link Scheduler} for computation-bound work.
*/
public static Scheduler computation() {
return INSTANCE.computationScheduler.call();
return INSTANCE.computationScheduler;
}

/**
Expand Down Expand Up @@ -199,7 +177,7 @@ public static Scheduler threadPoolForIO() {
* @return {@link ExecutorScheduler} for IO-bound work.
*/
public static Scheduler io() {
return INSTANCE.ioScheduler.call();
return INSTANCE.ioScheduler;
}

private static ScheduledExecutorService createComputationExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@
public class TrampolineScheduler extends Scheduler {
private static final TrampolineScheduler INSTANCE = new TrampolineScheduler();

/**
* @deprecated Use Schedulers.trampoline();
* @return
*/
@Deprecated
public static TrampolineScheduler getInstance() {
return INSTANCE;
}

/* package */ static TrampolineScheduler instance() {
return INSTANCE;
}

@Override
public Subscription schedule(Action1<Scheduler.Inner> action) {
Expand Down