Skip to content

Commit

Permalink
Implemented Observable.x(ConversionFunc) to allow external extensions…
Browse files Browse the repository at this point in the history
… to Observables.
  • Loading branch information
Aaron Tull committed Jul 24, 2015
1 parent f036cd0 commit 7fb785a
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 8 deletions.
21 changes: 21 additions & 0 deletions src/main/java/rx/ConversionFunc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package rx;

import rx.Observable.OnSubscribe;
import rx.annotations.Experimental;

/**
* Converts the values emitted by an Observable's OnSubscribe function to a value.
*
* @param <T> the type of values to be consumed
* @param <R> the return type
*/
@Experimental
public interface ConversionFunc<T, R> {
/**
* Converts the data produced by the provided {@code OnSubscribe function} to a value.
*
* @param onSubscribe a function that produces data to a Subscriber, usually wrapped by an Observable.
* @return an instance of {@code R}
*/
public R convert(OnSubscribe<T> onSubscribe);
}
35 changes: 28 additions & 7 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
// cover for generics insanity
}

/**
* Passes all emitted values from {@code this} Observable to the provided {@link ConversionFunc} to be
* collected and returned as a single value. Note that it is legal for a {@link ConversionFunc} to
* return an Observable (enabling chaining).
*
* @param conversion a function that converts from this {@code Observable<T>} to an {@code R}
* @return an instance of R created by the provided Conversion
*/
@Experimental
public <R> R x(ConversionFunc<T, R> conversion) {
final Observable<T> self = this;
return conversion.convert(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.add(Observable.subscribe(subscriber, self));
}});
}

/**
* Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
* the values of the current Observable through the Operator function.
Expand All @@ -128,17 +146,17 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param lift the Operator that implements the Observable-operating function to be applied to the source
* @param operator the Operator that implements the Observable-operating function to be applied to the source
* Observable
* @return an Observable that is the result of applying the lifted Operator to the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(lift).call(o);
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
Expand All @@ -164,7 +182,6 @@ public void call(Subscriber<? super R> o) {
});
}


/**
* Transform an Observable by applying a particular Transformer function to it.
* <p>
Expand Down Expand Up @@ -7781,11 +7798,15 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
*/
public final Subscription subscribe(Subscriber<? super T> subscriber) {
// validate and proceed
return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (onSubscribe == null) {
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
Expand All @@ -7809,7 +7830,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Expand Down
234 changes: 234 additions & 0 deletions src/test/java/rx/ObservableConversionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package rx;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static junit.framework.Assert.*;

import org.junit.Test;

import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.operators.OperatorFilter;
import rx.internal.operators.OperatorMap;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class ObservableConversionTest {

public static class Cylon {}

public static class Jail {
Object cylon;

Jail(Object cylon) {
this.cylon = cylon;
}
}

public static class CylonDetectorObservable<T> {
protected OnSubscribe<T> onSubscribe;

public static <T> CylonDetectorObservable<T> create(OnSubscribe<T> onSubscribe) {
return new CylonDetectorObservable<T>(onSubscribe);
}

protected CylonDetectorObservable(OnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

public void subscribe(Subscriber<T> subscriber) {
onSubscribe.call(subscriber);
}

public <R> CylonDetectorObservable<R> lift(Operator<? extends R, ? super T> operator) {
return x(new RobotConversionFunc<T, R>(operator));
}

public <R, O> O x(ConversionFunc<T, O> operator) {
return operator.convert(onSubscribe);
}

public <R> CylonDetectorObservable<? extends R> compose(Func1<CylonDetectorObservable<? super T>, CylonDetectorObservable<? extends R>> transformer) {
return transformer.call(this);
}

public final CylonDetectorObservable<T> beep(Func1<? super T, Boolean> predicate) {
return lift(new OperatorFilter<T>(predicate));
}

public final <R> CylonDetectorObservable<R> boop(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}

public CylonDetectorObservable<String> DESTROY() {
return boop(new Func1<T, String>() {
@Override
public String call(T t) {
Object cylon = ((Jail) t).cylon;
throwOutTheAirlock(cylon);
if (t instanceof Jail) {
String name = cylon.toString();
return "Cylon '" + name + "' has been destroyed";
}
else {
return "Cylon 'anonymous' has been destroyed";
}
}});
}

private static void throwOutTheAirlock(Object cylon) {
// ...
}
}

public static class RobotConversionFunc<T, R> implements ConversionFunc<T, CylonDetectorObservable<R>> {
private Operator<? extends R, ? super T> operator;

public RobotConversionFunc(Operator<? extends R, ? super T> operator) {
this.operator = operator;
}

@Override
public CylonDetectorObservable<R> convert(final OnSubscribe<T> onSubscribe) {
return CylonDetectorObservable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = operator.call(o);
try {
st.onStart();
onSubscribe.call(st);
} catch (OnErrorNotImplementedException e) {
throw e;
} catch (Throwable e) {
st.onError(e);
}
} catch (OnErrorNotImplementedException e) {
throw e;
} catch (Throwable e) {
o.onError(e);
}

}});
}
}

public static class ConvertToCylonDetector<T> implements ConversionFunc<T, CylonDetectorObservable<T>> {
@Override
public CylonDetectorObservable<T> convert(final OnSubscribe<T> onSubscribe) {
return CylonDetectorObservable.create(onSubscribe);
}
}

public static class ConvertToObservable<T> implements ConversionFunc<T, Observable<T>> {
@Override
public Observable<T> convert(final OnSubscribe<T> onSubscribe) {
return Observable.create(onSubscribe);
}
}

@Test
public void testConversionBetweenObservableClasses() {
final TestSubscriber<String> subscriber = new TestSubscriber<String>(new Subscriber<String>(){

@Override
public void onCompleted() {
System.out.println("Complete");
}

@Override
public void onError(Throwable e) {
System.out.println("error: " + e.getMessage());
e.printStackTrace();
}

@Override
public void onNext(String t) {
System.out.println(t);
}});
List<Object> crewOfBattlestarGalactica = Arrays.asList(new Object[] {"William Adama", "Laura Roslin", "Lee Adama", new Cylon()});
Observable.from(crewOfBattlestarGalactica)
.x(new ConvertToCylonDetector<Object>())
.beep(new Func1<Object, Boolean>(){
@Override
public Boolean call(Object t) {
return t instanceof Cylon;
}})
.boop(new Func1<Object, Object>() {
@Override
public Jail call(Object cylon) {
return new Jail(cylon);
}})
.DESTROY()
.x(new ConvertToObservable<String>())
.reduce("Cylon Detector finished. Report:\n", new Func2<String, String, String>() {
@Override
public String call(String a, String n) {
return a + n + "\n";
}})
.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertCompleted();
}

@Test
public void testConvertToConcurrentQueue() {
final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(null);
final AtomicBoolean isFinished = new AtomicBoolean(false);
ConcurrentLinkedQueue<Integer> queue = Observable.range(0,5)
.flatMap(new Func1<Integer, Observable<Integer>>(){
@Override
public Observable<Integer> call(final Integer i) {
return Observable.range(0, 5)
.observeOn(Schedulers.io())
.map(new Func1<Integer, Integer>(){
@Override
public Integer call(Integer k) {
try {
Thread.sleep(System.currentTimeMillis() % 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i + k;
}});
}})
.x(new ConversionFunc<Integer, ConcurrentLinkedQueue<Integer>>() {
@Override
public ConcurrentLinkedQueue<Integer> convert(OnSubscribe<Integer> onSubscribe) {
final ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<Integer>();
onSubscribe.call(new Subscriber<Integer>(){
@Override
public void onCompleted() {
isFinished.set(true);
}

@Override
public void onError(Throwable e) {
thrown.set(e);
}

@Override
public void onNext(Integer t) {
q.add(t);
}});
return q;
}});

int x = 0;
while(!isFinished.get()) {
Integer i = queue.poll();
if (i != null) {
x++;
System.out.println(x + " item: " + i);
}
}
assertEquals(null, thrown.get());
}
}
16 changes: 15 additions & 1 deletion src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.Functions;
import rx.observables.ConnectableObservable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
Expand Down Expand Up @@ -1157,4 +1156,19 @@ public void testForEachWithNull() {
//
.forEach(null);
}

@Test
public void testExtend() {
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
final Object value = new Object();
Observable.just(value).x(new ConversionFunc<Object,Object>(){
@Override
public Object convert(OnSubscribe<Object> onSubscribe) {
onSubscribe.call(subscriber);
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValue(value);
return subscriber.getOnNextEvents().get(0);
}});
}
}

0 comments on commit 7fb785a

Please sign in to comment.