Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized scalar observeOn/subscribeOn #2767

Merged
merged 1 commit into from
Feb 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5175,6 +5175,9 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
* @see #subscribeOn
*/
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}

Expand Down Expand Up @@ -7597,6 +7600,9 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
* @see #observeOn
*/
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.schedulers;
package rx.internal.schedulers;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.schedulers.ScheduledAction;
import rx.internal.util.RxThreadFactory;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/* package */class EventLoopsScheduler extends Scheduler {
public class EventLoopsScheduler extends Scheduler {
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
Expand Down Expand Up @@ -76,14 +74,25 @@ public PoolWorker getEventLoop() {
* Create a scheduler with pool size equal to the available processor
* count and using least-recent worker selection policy.
*/
EventLoopsScheduler() {
public EventLoopsScheduler() {
pool = new FixedSchedulerPool();
}

@Override
public Worker createWorker() {
return new EventLoopWorker(pool.getEventLoop());
}

/**
* Schedules the action directly on one of the event loop workers
* without the additional infrastructure and checking.
* @param action the action to schedule
* @return the subscription
*/
public Subscription scheduleDirect(Action0 action) {
PoolWorker pw = pool.getEventLoop();
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
}

private static class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
Expand Down
71 changes: 69 additions & 2 deletions src/main/java/rx/internal/util/ScalarSynchronousObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package rx.internal.util;

import rx.Observable;
import rx.Subscriber;
import rx.*;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.internal.schedulers.EventLoopsScheduler;

public final class ScalarSynchronousObservable<T> extends Observable<T> {

Expand Down Expand Up @@ -49,5 +51,70 @@ public void call(Subscriber<? super T> s) {
public T get() {
return t;
}
/**
* Customized observeOn/subscribeOn implementation which emits the scalar
* value directly or with less overhead on the specified scheduler.
* @param scheduler the target scheduler
* @return the new observable
*/
public Observable<T> scalarScheduleOn(Scheduler scheduler) {
if (scheduler instanceof EventLoopsScheduler) {
EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
return create(new DirectScheduledEmission<T>(es, t));
}
return create(new NormalScheduledEmission<T>(scheduler, t));
}

/** Optimized observeOn for scalar value observed on the EventLoopsScheduler. */
static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
private final EventLoopsScheduler es;
private final T value;
DirectScheduledEmission(EventLoopsScheduler es, T value) {
this.es = es;
this.value = value;
}
@Override
public void call(final Subscriber<? super T> child) {
child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
}
}
/** Emits a scalar value on a general scheduler. */
static final class NormalScheduledEmission<T> implements OnSubscribe<T> {
private final Scheduler scheduler;
private final T value;

NormalScheduledEmission(Scheduler scheduler, T value) {
this.scheduler = scheduler;
this.value = value;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
Worker worker = scheduler.createWorker();
subscriber.add(worker);
worker.schedule(new ScalarSynchronousAction<T>(subscriber, value));
}
}
/** Action that emits a single value when called. */
static final class ScalarSynchronousAction<T> implements Action0 {
private final Subscriber<? super T> subscriber;
private final T value;

private ScalarSynchronousAction(Subscriber<? super T> subscriber,
T value) {
this.subscriber = subscriber;
this.value = value;
}

@Override
public void call() {
try {
subscriber.onNext(value);
} catch (Throwable t) {
subscriber.onError(t);
return;
}
subscriber.onCompleted();
}
}
}
1 change: 1 addition & 0 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.schedulers;

import rx.Scheduler;
import rx.internal.schedulers.EventLoopsScheduler;
import rx.plugins.RxJavaPlugins;

import java.util.concurrent.Executor;
Expand Down