Skip to content

Commit

Permalink
Merge pull request ReactiveX#621 from Applied-Duality/CleanFixes
Browse files Browse the repository at this point in the history
SerialSubscription & From
  • Loading branch information
benjchristensen committed Dec 23, 2013
2 parents ce734f7 + b55a433 commit bbd6d45
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,15 @@ object SerialSubscription {
/**
* Represents a [[rx.lang.scala.Subscription]] that can be checked for status.
*/
class SerialSubscription private[scala] (serial: rx.subscriptions.SerialSubscription) extends Subscription {
class SerialSubscription private[scala] (override val asJavaSubscription: rx.subscriptions.SerialSubscription) extends Subscription {

/*
* As long as rx.subscriptions.SerialSubscription has no isUnsubscribed,
* we need to intercept and do it ourselves.
*/
override val asJavaSubscription: rx.subscriptions.SerialSubscription = new rx.subscriptions.SerialSubscription() {
override def unsubscribe(): Unit = {
if(unsubscribed.compareAndSet(false, true)) { serial.unsubscribe() }
}
override def setSubscription(subscription: rx.Subscription): Unit = serial.setSubscription(subscription)
override def getSubscription(): rx.Subscription = serial.getSubscription()
}
override def unsubscribe(): Unit = asJavaSubscription.unsubscribe()
override def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed

def subscription_=(value: Subscription): this.type = { asJavaSubscription.setSubscription(value.asJavaSubscription); this }
def subscription_=(value: Subscription): this.type = {
asJavaSubscription.setSubscription(value.asJavaSubscription)
this
}
def subscription: Subscription = Subscription(asJavaSubscription.getSubscription)

}
Expand Down
50 changes: 29 additions & 21 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public static <T> Observable<T> error(Throwable exception, Scheduler scheduler)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
*/
public static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(OperationToObservableIterable.toObservableIterable(iterable));
return from(iterable, Schedulers.currentThread());
}

/**
Expand All @@ -750,7 +750,7 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
*/
public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
return from(iterable).observeOn(scheduler);
return create(OperationToObservableIterable.toObservableIterable(iterable, scheduler));
}

/**
Expand All @@ -763,14 +763,35 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler s
* {@link Subscription} is returned, it is not possible to unsubscribe from
* the sequence before it completes.
*
* @param items the source sequence
* @param items the source array
* @param <T> the type of items in the Array and the type of items to be
* emitted by the resulting Observable
* @return an Observable that emits each item in the source Array
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
*/
public static <T> Observable<T> from(T[] items) {
return create(OperationToObservableIterable.toObservableIterable(Arrays.asList(items)));
return from(Arrays.asList(items));
}

/**
* Converts an Array into an Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.png">
* <p>
* Note: the entire array is immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the
* {@link Subscription} is returned, it is not possible to unsubscribe from
* the sequence before it completes.
*
* @param items the source array
* @param scheduler the scheduler to emit the items of the array
* @param <T> the type of items in the Array and the type of items to be
* emitted by the resulting Observable
* @return an Observable that emits each item in the source Array
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
*/
public static <T> Observable<T> from(T[] items, Scheduler scheduler) {
return from(Arrays.asList(items), scheduler);
}

/**
Expand Down Expand Up @@ -827,7 +848,7 @@ public static <T> Observable<T> from(T t1, T t2) {
* subscribes. Since this occurs before the {@link Subscription} is
* returned, it is not possible to unsubscribe from the sequence before it
* completes.
*
*
* @param t1 first item
* @param t2 second item
* @param t3 third item
Expand Down Expand Up @@ -1012,11 +1033,6 @@ public static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.png">
* <p>
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
* returned, it is not possible to unsubscribe from the sequence before it
* completes.
*
* @param t1 first item
* @param t2 second item
* @param t3 third item
Expand Down Expand Up @@ -1044,11 +1060,6 @@ public static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/range.png">
* <p>
* Note: the entire range is immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the
* {@link Subscription} is returned, it is not possible to unsubscribe from
* the sequence before it completes.
*
* @param start the value of the first Integer in the sequence
* @param count the number of sequential Integers to generate
* @return an Observable that emits a range of sequential Integers
Expand All @@ -1073,7 +1084,7 @@ public static Observable<Integer> range(int start, int count) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896.aspx">Observable.Range Method (Int32, Int32, IScheduler)</a>
*/
public static Observable<Integer> range(int start, int count, Scheduler scheduler) {
return range(start, count).observeOn(scheduler);
return from(Range.createWithCount(start, count), scheduler);
}

/**
Expand Down Expand Up @@ -1120,10 +1131,7 @@ public static <T> Observable<T> defer(Func0<? extends Observable<? extends T>> o
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
*/
public static <T> Observable<T> just(T value) {
List<T> list = new ArrayList<T>();
list.add(value);

return from(list);
return from(Arrays.asList((value)));
}

/**
Expand All @@ -1142,7 +1150,7 @@ public static <T> Observable<T> just(T value) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
*/
public static <T> Observable<T> just(T value, Scheduler scheduler) {
return just(value).observeOn(scheduler);
return from(Arrays.asList((value)), scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

import java.util.Iterator;

/**
* Converts an Iterable sequence into an Observable.
Expand All @@ -30,24 +36,42 @@
*/
public final class OperationToObservableIterable<T> {

public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
return new ToObservableIterable<T>(list, scheduler);
}

public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
return new ToObservableIterable<T>(list);
return toObservableIterable(list, Schedulers.currentThread());
}

private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
public ToObservableIterable(Iterable<? extends T> list) {

public ToObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
this.iterable = list;
this.scheduler = scheduler;
}

public Iterable<? extends T> iterable;

public Subscription onSubscribe(Observer<? super T> observer) {
for (T item : iterable) {
observer.onNext(item);
}
observer.onCompleted();
Scheduler scheduler;
final Iterable<? extends T> iterable;

return Subscriptions.empty();
public Subscription onSubscribe(final Observer<? super T> observer) {
final Iterator<? extends T> iterator = iterable.iterator();
return scheduler.schedule(new Action1<Action0>() {
@Override
public void call(Action0 self) {
try {
if (iterator.hasNext()) {
T x = iterator.next();
observer.onNext(x);
self.call();
} else {
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
}
}
}
26 changes: 12 additions & 14 deletions rxjava-core/src/test/java/rx/ObservableWindowTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,21 @@ public class ObservableWindowTests {
@Test
public void testWindow() {
final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();
Observable.from(1, 2, 3, 4, 5, 6)
.window(3).map(new Func1<Observable<Integer>, List<Integer>>() {

@Override
public List<Integer> call(Observable<Integer> o) {
return o.toList().toBlockingObservable().single();
}
Observable.concat(Observable.from(1, 2, 3, 4, 5, 6).window(3).map(new Func1<Observable<Integer>, Observable<List<Integer>>>() {
@Override
public Observable<List<Integer>> call(Observable<Integer> xs) {
return xs.toList();
}
})).toBlockingObservable().forEach(new Action1<List<Integer>>() {

}).toBlockingObservable().forEach(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> xs) {
lists.add(xs);
}
});

@Override
public void call(List<Integer> t) {
lists.add(t);
}
});

assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[]{1, 2, 3});
assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
assertEquals(2, lists.size());

Expand Down
26 changes: 12 additions & 14 deletions rxjava-core/src/test/java/rx/operators/OperationWindowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
Expand All @@ -44,21 +45,18 @@ public void before() {
scheduler = new TestScheduler();
}

private static <T> List<List<T>> toLists(Observable<Observable<T>> observable) {
final List<T> list = new ArrayList<T>();
final List<List<T>> lists = new ArrayList<List<T>>();
private static <T> List<List<T>> toLists(Observable<Observable<T>> observables) {

observable.subscribe(new Action1<Observable<T>>() {
final List<List<T>> lists = new ArrayList<List<T>>();
Observable.concat(observables.map(new Func1<Observable<T>, Observable<List<T>>>() {
@Override
public void call(Observable<T> tObservable) {
tObservable.subscribe(new Action1<T>() {
@Override
public void call(T t) {
list.add(t);
}
});
lists.add(new ArrayList<T>(list));
list.clear();
public Observable<List<T>> call(Observable<T> xs) {
return xs.toList();
}
})).toBlockingObservable().forEach(new Action1<List<T>>() {
@Override
public void call(List<T> xs) {
lists.add(xs);
}
});
return lists;
Expand Down Expand Up @@ -90,7 +88,7 @@ public void testSkipAndCountGaplessEindows() {

@Test
public void testOverlappingWindows() {
Observable<String> subject = Observable.from("zero", "one", "two", "three", "four", "five");
Observable<String> subject = Observable.from(new String[]{"zero", "one", "two", "three", "four", "five"}, Schedulers.currentThread());
Observable<Observable<String>> windowed = Observable.create(window(subject, 3, 1));

List<List<String>> windows = toLists(windowed);
Expand Down

0 comments on commit bbd6d45

Please sign in to comment.