-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Observable.x(ConversionFunc) to allow extensions to Observables #3082
Conversation
public void testExtend() { | ||
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>(); | ||
final Object value = new Object(); | ||
Observable.just(value).x(new ConversionFunc<Object,Object>(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially the same as:
Observable.just(value).to(o -> {
o.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValue(value);
return subscriber.getOnNextEvents().get(0);
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comment regarding exposing the Observable inside the conversion.
The Travis CI failure is related to an rx backpreasure test
Is this known to be buggy? My changes seem unrelated. It also failed for #3060. |
It didn't fail with |
They both failed with missing backpressure. |
Found the bug in merge(), it was my oversight. Fix posted: #3093. |
Awesome! Thanks @akarnokd |
I've restarted this build and it passes now. |
I've simplified the problem this is trying to address to just adding the |
87d6dfe
to
c2ba9f5
Compare
I would have liked to have made the static method private static Subscription Observable.subscribe(Subscriber<? super T>, Observable<T>) be agnostic of the public static Subscription Observable.subscribe(Subscriber<? super T>, OnSubscribe<T>) This would allow us to reuse this hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); and then it probably wouldn't belong in |
Reviewed with @benjchristensen and he agreed to merge this and start testing it out in snapshots. |
* @param <R> the return type | ||
*/ | ||
@Experimental | ||
public interface ConversionFunc<T, R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need another type? Would Func1<OnSubscribe<T>, R>
be okay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Updating code now.
Latest version was rebased onto 1.x and has |
Observable.x(ConversionFunc) to allow extensions to Observables
No description provided.