Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x Remove Function from transformer interfaces to allow a single obj… #4672

Merged
merged 2 commits into from
Oct 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,12 @@ public final Throwable blockingGet(long timeout, TimeUnit unit) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable compose(CompletableTransformer transformer) {
return wrap(to(transformer));
try {
return wrap(transformer.apply(this));
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/reactivex/CompletableTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@

package io.reactivex;

import io.reactivex.functions.Function;

/**
* Convenience interface and callback used by the compose operator to turn a Completable into another
* Completable fluently.
*/
public interface CompletableTransformer extends Function<Completable, CompletableSource> {

public interface CompletableTransformer {
CompletableSource apply(Completable completable) throws Exception;
}
7 changes: 6 additions & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6278,7 +6278,12 @@ public final <U> Single<U> collectInto(final U initialItem, BiConsumer<? super U
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> compose(FlowableTransformer<T, R> composer) {
return fromPublisher(to(composer));
try {
return fromPublisher(composer.apply(this));
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/reactivex/FlowableTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@

import org.reactivestreams.Publisher;

import io.reactivex.functions.Function;

/**
* Interface to compose Flowables.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface FlowableTransformer<Upstream, Downstream> extends Function<Flowable<Upstream>, Publisher<? extends Downstream>> {

public interface FlowableTransformer<Upstream, Downstream> {
Publisher<? extends Downstream> apply(Flowable<Upstream> flowable) throws Exception;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? extends should be removed as covariant return types are usually inconvenience to the consumer.

}
7 changes: 6 additions & 1 deletion src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1974,7 +1974,12 @@ public final <U> Maybe<U> cast(final Class<? extends U> clazz) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Maybe<R> compose(MaybeTransformer<T, R> transformer) {
return wrap(to(transformer));
try {
return wrap(transformer.apply(this));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to try-catch it if it doesn't declare throws Exception. So either each transformer should specify apply(...) throws Exception or these are not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does throw exception see later commit

} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/reactivex/MaybeTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@

package io.reactivex;

import io.reactivex.functions.Function;

/**
* Interface to compose Maybes.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface MaybeTransformer<Upstream, Downstream> extends Function<Maybe<Upstream>, MaybeSource<Downstream>> {

public interface MaybeTransformer<Upstream, Downstream> {
MaybeSource<Downstream> apply(Maybe<Upstream> maybe) throws Exception;
}
7 changes: 6 additions & 1 deletion src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5489,7 +5489,12 @@ public final <U> Single<U> collectInto(final U initialValue, BiConsumer<? super
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<T, R> composer) {
return wrap(to(composer));
try {
return wrap(composer.apply(this));
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}


Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/reactivex/ObservableTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@

package io.reactivex;

import io.reactivex.functions.Function;

/**
* Interface to compose Observables.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface ObservableTransformer<Upstream, Downstream> extends Function<Observable<Upstream>, ObservableSource<Downstream>> {

public interface ObservableTransformer<Upstream, Downstream> {
ObservableSource<Downstream> apply(Observable<Upstream> upstream) throws Exception;
}
7 changes: 6 additions & 1 deletion src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,12 @@ public final Single<T> hide() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> compose(SingleTransformer<T, R> transformer) {
return wrap(to(transformer));
try {
return wrap(transformer.apply(this));
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/reactivex/SingleTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@

package io.reactivex;

import io.reactivex.functions.Function;

/**
* Interface to compose Singles.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface SingleTransformer<Upstream, Downstream> extends Function<Single<Upstream>, SingleSource<Downstream>> {

public interface SingleTransformer<Upstream, Downstream> {
SingleSource<Downstream> apply(Single<Upstream> upstream) throws Exception;
}