diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 1faddbb819..a09da42012 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5175,6 +5175,9 @@ public final Observable mergeWith(Observable t1) { * @see #subscribeOn */ public final Observable observeOn(Scheduler scheduler) { + if (this instanceof ScalarSynchronousObservable) { + return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler); + } return lift(new OperatorObserveOn(scheduler)); } @@ -7597,6 +7600,9 @@ public final Subscription subscribe(Subscriber subscriber) { * @see #observeOn */ public final Observable subscribeOn(Scheduler scheduler) { + if (this instanceof ScalarSynchronousObservable) { + return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler); + } return nest().lift(new OperatorSubscribeOn(scheduler)); } diff --git a/src/main/java/rx/schedulers/EventLoopsScheduler.java b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java similarity index 88% rename from src/main/java/rx/schedulers/EventLoopsScheduler.java rename to src/main/java/rx/internal/schedulers/EventLoopsScheduler.java index a674dfbc22..91a5440227 100644 --- a/src/main/java/rx/schedulers/EventLoopsScheduler.java +++ b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java @@ -13,13 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.schedulers; +package rx.internal.schedulers; import rx.Scheduler; import rx.Subscription; import rx.functions.Action0; -import rx.internal.schedulers.NewThreadWorker; -import rx.internal.schedulers.ScheduledAction; import rx.internal.util.RxThreadFactory; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; @@ -27,7 +25,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -/* package */class EventLoopsScheduler extends Scheduler { +public class EventLoopsScheduler extends Scheduler { /** Manages a fixed number of workers. */ private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-"; private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); @@ -76,7 +74,7 @@ public PoolWorker getEventLoop() { * Create a scheduler with pool size equal to the available processor * count and using least-recent worker selection policy. */ - EventLoopsScheduler() { + public EventLoopsScheduler() { pool = new FixedSchedulerPool(); } @@ -84,6 +82,17 @@ public PoolWorker getEventLoop() { public Worker createWorker() { return new EventLoopWorker(pool.getEventLoop()); } + + /** + * Schedules the action directly on one of the event loop workers + * without the additional infrastructure and checking. + * @param action the action to schedule + * @return the subscription + */ + public Subscription scheduleDirect(Action0 action) { + PoolWorker pw = pool.getEventLoop(); + return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS); + } private static class EventLoopWorker extends Scheduler.Worker { private final CompositeSubscription innerSubscription = new CompositeSubscription(); diff --git a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java index e145349b42..c350c895c4 100644 --- a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java +++ b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java @@ -15,8 +15,10 @@ */ package rx.internal.util; -import rx.Observable; -import rx.Subscriber; +import rx.*; +import rx.Scheduler.Worker; +import rx.functions.Action0; +import rx.internal.schedulers.EventLoopsScheduler; public final class ScalarSynchronousObservable extends Observable { @@ -49,5 +51,70 @@ public void call(Subscriber s) { public T get() { return t; } + /** + * Customized observeOn/subscribeOn implementation which emits the scalar + * value directly or with less overhead on the specified scheduler. + * @param scheduler the target scheduler + * @return the new observable + */ + public Observable scalarScheduleOn(Scheduler scheduler) { + if (scheduler instanceof EventLoopsScheduler) { + EventLoopsScheduler es = (EventLoopsScheduler) scheduler; + return create(new DirectScheduledEmission(es, t)); + } + return create(new NormalScheduledEmission(scheduler, t)); + } + + /** Optimized observeOn for scalar value observed on the EventLoopsScheduler. */ + static final class DirectScheduledEmission implements OnSubscribe { + private final EventLoopsScheduler es; + private final T value; + DirectScheduledEmission(EventLoopsScheduler es, T value) { + this.es = es; + this.value = value; + } + @Override + public void call(final Subscriber child) { + child.add(es.scheduleDirect(new ScalarSynchronousAction(child, value))); + } + } + /** Emits a scalar value on a general scheduler. */ + static final class NormalScheduledEmission implements OnSubscribe { + private final Scheduler scheduler; + private final T value; + + NormalScheduledEmission(Scheduler scheduler, T value) { + this.scheduler = scheduler; + this.value = value; + } + + @Override + public void call(final Subscriber subscriber) { + Worker worker = scheduler.createWorker(); + subscriber.add(worker); + worker.schedule(new ScalarSynchronousAction(subscriber, value)); + } + } + /** Action that emits a single value when called. */ + static final class ScalarSynchronousAction implements Action0 { + private final Subscriber subscriber; + private final T value; + private ScalarSynchronousAction(Subscriber subscriber, + T value) { + this.subscriber = subscriber; + this.value = value; + } + + @Override + public void call() { + try { + subscriber.onNext(value); + } catch (Throwable t) { + subscriber.onError(t); + return; + } + subscriber.onCompleted(); + } + } } diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index 374448d695..8ded001e0e 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -16,6 +16,7 @@ package rx.schedulers; import rx.Scheduler; +import rx.internal.schedulers.EventLoopsScheduler; import rx.plugins.RxJavaPlugins; import java.util.concurrent.Executor;