diff --git a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java index ca6ba3f437..676527f954 100644 --- a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java +++ b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java @@ -23,6 +23,7 @@ import rx.internal.producers.SingleProducer; import rx.internal.schedulers.EventLoopsScheduler; import rx.observers.Subscribers; +import rx.plugins.*; /** * An Observable that emits a single constant scalar value to Subscribers. @@ -33,6 +34,14 @@ * @param the value type */ public final class ScalarSynchronousObservable extends Observable { + /** + * The execution hook instance. + *

+ * Can't be final to allow tests overriding it in place; if the class + * has been initialized, the plugin reset has no effect because + * how RxJavaPlugins was designed. + */ + static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); /** * Indicates that the Producer used by this Observable should be fully * threadsafe. It is possible, but unlikely that multiple concurrent @@ -72,14 +81,7 @@ public static ScalarSynchronousObservable create(T t) { final T t; protected ScalarSynchronousObservable(final T t) { - super(new OnSubscribe() { - - @Override - public void call(Subscriber s) { - s.setProducer(createProducer(s, t)); - } - - }); + super(hook.onCreate(new JustOnSubscribe(t))); this.t = t; } @@ -131,6 +133,20 @@ public void call() { return create(new ScalarAsyncOnSubscribe(t, onSchedule)); } + /** The OnSubscribe callback for the Observable constructor. */ + static final class JustOnSubscribe implements OnSubscribe { + final T value; + + JustOnSubscribe(T value) { + this.value = value; + } + + @Override + public void call(Subscriber s) { + s.setProducer(createProducer(s, value)); + } + } + /** * The OnSubscribe implementation that creates the ScalarAsyncProducer for each * incoming subscriber. diff --git a/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java b/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java index fee7b6f8e1..6898fbd472 100644 --- a/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java +++ b/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java @@ -16,12 +16,16 @@ package rx.internal.util; -import org.junit.Test; +import java.util.concurrent.atomic.*; -import rx.Observable; +import org.junit.*; + +import rx.*; +import rx.Observable.OnSubscribe; import rx.exceptions.TestException; import rx.functions.Func1; import rx.observers.TestSubscriber; +import rx.plugins.*; import rx.schedulers.Schedulers; public class ScalarSynchronousObservableTest { @@ -230,4 +234,66 @@ public void onNext(Integer t) { ts.assertError(TestException.class); ts.assertNotCompleted(); } + + @Test + public void hookCalled() { + RxJavaObservableExecutionHook save = ScalarSynchronousObservable.hook; + try { + final AtomicInteger c = new AtomicInteger(); + + ScalarSynchronousObservable.hook = new RxJavaObservableExecutionHook() { + @Override + public OnSubscribe onCreate(OnSubscribe f) { + c.getAndIncrement(); + return f; + } + }; + + int n = 10; + + for (int i = 0; i < n; i++) { + Observable.just(1).subscribe(); + } + + Assert.assertEquals(n, c.get()); + } finally { + ScalarSynchronousObservable.hook = save; + } + } + + @Test + public void hookChangesBehavior() { + RxJavaObservableExecutionHook save = ScalarSynchronousObservable.hook; + try { + ScalarSynchronousObservable.hook = new RxJavaObservableExecutionHook() { + @Override + public OnSubscribe onCreate(OnSubscribe f) { + if (f instanceof ScalarSynchronousObservable.JustOnSubscribe) { + final T v = ((ScalarSynchronousObservable.JustOnSubscribe) f).value; + return new OnSubscribe() { + @Override + public void call(Subscriber t) { + t.onNext(v); + t.onNext(v); + t.onCompleted(); + } + }; + } + return f; + } + }; + + TestSubscriber ts = new TestSubscriber(); + + Observable.just(1).subscribe(ts); + + ts.assertValues(1, 1); + ts.assertNoErrors(); + ts.assertCompleted(); + + } finally { + ScalarSynchronousObservable.hook = save; + } + } + } \ No newline at end of file