-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.x: just() construction to call the onCreate execution hook #3958
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import rx.internal.producers.SingleProducer; | ||
import rx.internal.schedulers.EventLoopsScheduler; | ||
import rx.observers.Subscribers; | ||
import rx.plugins.*; | ||
|
||
/** | ||
* An Observable that emits a single constant scalar value to Subscribers. | ||
|
@@ -33,6 +34,14 @@ | |
* @param <T> the value type | ||
*/ | ||
public final class ScalarSynchronousObservable<T> extends Observable<T> { | ||
/** | ||
* The execution hook instance. | ||
* <p> | ||
* Can't be final to allow tests overriding it in place; if the class | ||
* has been initialized, the plugin reset has no effect because | ||
* how RxJavaPlugins was designed. | ||
*/ | ||
static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); | ||
/** | ||
* Indicates that the Producer used by this Observable should be fully | ||
* threadsafe. It is possible, but unlikely that multiple concurrent | ||
|
@@ -72,14 +81,7 @@ public static <T> ScalarSynchronousObservable<T> create(T t) { | |
final T t; | ||
|
||
protected ScalarSynchronousObservable(final T t) { | ||
super(new OnSubscribe<T>() { | ||
|
||
@Override | ||
public void call(Subscriber<? super T> s) { | ||
s.setProducer(createProducer(s, t)); | ||
} | ||
|
||
}); | ||
super(hook.onCreate(new JustOnSubscribe<T>(t))); | ||
this.t = t; | ||
} | ||
|
||
|
@@ -131,6 +133,20 @@ public void call() { | |
return create(new ScalarAsyncOnSubscribe<T>(t, onSchedule)); | ||
} | ||
|
||
/** The OnSubscribe callback for the Observable constructor. */ | ||
static final class JustOnSubscribe<T> implements OnSubscribe<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
final T value; | ||
|
||
JustOnSubscribe(T value) { | ||
this.value = value; | ||
} | ||
|
||
@Override | ||
public void call(Subscriber<? super T> s) { | ||
s.setProducer(createProducer(s, value)); | ||
} | ||
} | ||
|
||
/** | ||
* The OnSubscribe implementation that creates the ScalarAsyncProducer for each | ||
* incoming subscriber. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,12 +16,16 @@ | |
|
||
package rx.internal.util; | ||
|
||
import org.junit.Test; | ||
import java.util.concurrent.atomic.*; | ||
|
||
import rx.Observable; | ||
import org.junit.*; | ||
|
||
import rx.*; | ||
import rx.Observable.OnSubscribe; | ||
import rx.exceptions.TestException; | ||
import rx.functions.Func1; | ||
import rx.observers.TestSubscriber; | ||
import rx.plugins.*; | ||
import rx.schedulers.Schedulers; | ||
|
||
public class ScalarSynchronousObservableTest { | ||
|
@@ -230,4 +234,66 @@ public void onNext(Integer t) { | |
ts.assertError(TestException.class); | ||
ts.assertNotCompleted(); | ||
} | ||
|
||
@Test | ||
public void hookCalled() { | ||
RxJavaObservableExecutionHook save = ScalarSynchronousObservable.hook; | ||
try { | ||
final AtomicInteger c = new AtomicInteger(); | ||
|
||
ScalarSynchronousObservable.hook = new RxJavaObservableExecutionHook() { | ||
@Override | ||
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) { | ||
c.getAndIncrement(); | ||
return f; | ||
} | ||
}; | ||
|
||
int n = 10; | ||
|
||
for (int i = 0; i < n; i++) { | ||
Observable.just(1).subscribe(); | ||
} | ||
|
||
Assert.assertEquals(n, c.get()); | ||
} finally { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it's more important to verify that it uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For reviewers: this is fixed below. |
||
ScalarSynchronousObservable.hook = save; | ||
} | ||
} | ||
|
||
@Test | ||
public void hookChangesBehavior() { | ||
RxJavaObservableExecutionHook save = ScalarSynchronousObservable.hook; | ||
try { | ||
ScalarSynchronousObservable.hook = new RxJavaObservableExecutionHook() { | ||
@Override | ||
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) { | ||
if (f instanceof ScalarSynchronousObservable.JustOnSubscribe) { | ||
final T v = ((ScalarSynchronousObservable.JustOnSubscribe<T>) f).value; | ||
return new OnSubscribe<T>() { | ||
@Override | ||
public void call(Subscriber<? super T> t) { | ||
t.onNext(v); | ||
t.onNext(v); | ||
t.onCompleted(); | ||
} | ||
}; | ||
} | ||
return f; | ||
} | ||
}; | ||
|
||
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(); | ||
|
||
Observable.just(1).subscribe(ts); | ||
|
||
ts.assertValues(1, 1); | ||
ts.assertNoErrors(); | ||
ts.assertCompleted(); | ||
|
||
} finally { | ||
ScalarSynchronousObservable.hook = save; | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you override it in the test. Then it needs a comment or test should use reflection.