Skip to content

Commit

Permalink
Merge pull request #2901 from akarnokd/ToSortedListBackpressure
Browse files Browse the repository at this point in the history
Operators toList and toSortedList now support backpressure
  • Loading branch information
benjchristensen committed Apr 29, 2015
2 parents d3d15b9 + 615db6a commit aeee037
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 77 deletions.
63 changes: 58 additions & 5 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8596,7 +8596,7 @@ public final BlockingObservable<T> toBlocking() {
* you do not have the option to unsubscribe.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as by intent it is requesting and buffering everything.</dd>
* <dd>The operator buffers everything from its upstream but it only emits the aggregated list when the downstream requests at least one item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toList} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand Down Expand Up @@ -8797,7 +8797,7 @@ public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as by intent it is requesting and buffering everything.</dd>
* <dd>The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSortedList} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -8810,7 +8810,7 @@ public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
public final Observable<List<T>> toSortedList() {
return lift(new OperatorToObservableSortedList<T>());
return lift(new OperatorToObservableSortedList<T>(10));
}

/**
Expand All @@ -8820,7 +8820,7 @@ public final Observable<List<T>> toSortedList() {
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.f.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as by intent it is requesting and buffering everything.</dd>
* <dd>The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSortedList} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -8833,7 +8833,60 @@ public final Observable<List<T>> toSortedList() {
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Integer> sortFunction) {
return lift(new OperatorToObservableSortedList<T>(sortFunction));
return lift(new OperatorToObservableSortedList<T>(sortFunction, 10));
}

/**
* Returns an Observable that emits a list that contains the items emitted by the source Observable, in a
* sorted order. Each item emitted by the Observable must implement {@link Comparable} with respect to all
* other items in the sequence.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSortedList} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @throws ClassCastException
* if any item emitted by the Observable does not implement {@link Comparable} with respect to
* all other items emitted by the Observable
* @param initialCapacity
* the initial capacity of the ArrayList used to accumulate items before sorting
* @return an Observable that emits a list that contains the items emitted by the source Observable in
* sorted order
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
@Experimental
public final Observable<List<T>> toSortedList(int initialCapacity) {
return lift(new OperatorToObservableSortedList<T>(initialCapacity));
}

/**
* Returns an Observable that emits a list that contains the items emitted by the source Observable, in a
* sorted order based on a specified comparison function.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.f.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSortedList} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sortFunction
* a function that compares two items emitted by the source Observable and returns an Integer
* that indicates their sort order
* @param initialCapacity
* the initial capacity of the ArrayList used to accumulate items before sorting
* @return an Observable that emits a list that contains the items emitted by the source Observable in
* sorted order
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
@Experimental
public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Integer> sortFunction, int initialCapacity) {
return lift(new OperatorToObservableSortedList<T>(sortFunction, initialCapacity));
}

/**
Expand Down
55 changes: 32 additions & 23 deletions src/main/java/rx/internal/operators/OperatorToObservableList.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ public static <T> OperatorToObservableList<T> instance() {
private OperatorToObservableList() { }
@Override
public Subscriber<? super T> call(final Subscriber<? super List<T>> o) {
return new Subscriber<T>(o) {
final SingleDelayedProducer<List<T>> producer = new SingleDelayedProducer<List<T>>(o);
Subscriber<T> result = new Subscriber<T>() {

private boolean completed = false;
final List<T> list = new LinkedList<T>();
boolean completed = false;
List<T> list = new LinkedList<T>();

@Override
public void onStart() {
Expand All @@ -64,27 +65,32 @@ public void onStart() {

@Override
public void onCompleted() {
try {
if (!completed) {
completed = true;
/*
* Ideally this should just return Collections.unmodifiableList(list) and not copy it,
* but, it ends up being a breaking change if we make that modification.
*
* Here is an example of is being done with these lists that breaks if we make it immutable:
*
* Caused by: java.lang.UnsupportedOperationException
* at java.util.Collections$UnmodifiableList$1.set(Collections.java:1244)
* at java.util.Collections.sort(Collections.java:221)
* ...
* Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: UnmodifiableList.class
* at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
* at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
* ... 419 more
*/
o.onNext(new ArrayList<T>(list));
o.onCompleted();
} catch (Throwable e) {
onError(e);
List<T> result;
try {
/*
* Ideally this should just return Collections.unmodifiableList(list) and not copy it,
* but, it ends up being a breaking change if we make that modification.
*
* Here is an example of is being done with these lists that breaks if we make it immutable:
*
* Caused by: java.lang.UnsupportedOperationException
* at java.util.Collections$UnmodifiableList$1.set(Collections.java:1244)
* at java.util.Collections.sort(Collections.java:221)
* ...
* Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: UnmodifiableList.class
* at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
* at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
* ... 419 more
*/
result = new ArrayList<T>(list);
} catch (Throwable t) {
onError(t);
return;
}
list = null;
producer.set(result);
}
}

Expand All @@ -101,6 +107,9 @@ public void onNext(T value) {
}

};
o.add(result);
o.setProducer(producer);
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
*/
package rx.internal.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.*;

import rx.Observable.Operator;
import rx.Subscriber;
import rx.*;
import rx.functions.Func2;

/**
Expand All @@ -35,72 +32,82 @@
* the type of the items emitted by the source and the resulting {@code Observable}s
*/
public final class OperatorToObservableSortedList<T> implements Operator<List<T>, T> {
private final Func2<? super T, ? super T, Integer> sortFunction;
private final Comparator<? super T> sortFunction;
private final int initialCapacity;

@SuppressWarnings("unchecked")
public OperatorToObservableSortedList() {
this.sortFunction = defaultSortFunction;
public OperatorToObservableSortedList(int initialCapacity) {
this.sortFunction = DEFAULT_SORT_FUNCTION;
this.initialCapacity = initialCapacity;
}

public OperatorToObservableSortedList(Func2<? super T, ? super T, Integer> sortFunction) {
this.sortFunction = sortFunction;
public OperatorToObservableSortedList(final Func2<? super T, ? super T, Integer> sortFunction, int initialCapacity) {
this.initialCapacity = initialCapacity;
this.sortFunction = new Comparator<T>() {
@Override
public int compare(T o1, T o2) {
return sortFunction.call(o1, o2);
}
};
}

@Override
public Subscriber<? super T> call(final Subscriber<? super List<T>> o) {
return new Subscriber<T>(o) {

final List<T> list = new ArrayList<T>();
public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
final SingleDelayedProducer<List<T>> producer = new SingleDelayedProducer<List<T>>(child);
Subscriber<T> result = new Subscriber<T>() {

List<T> list = new ArrayList<T>(initialCapacity);
boolean completed;

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {
try {

// sort the list before delivery
Collections.sort(list, new Comparator<T>() {

@Override
public int compare(T o1, T o2) {
return sortFunction.call(o1, o2);
}

});

o.onNext(Collections.unmodifiableList(list));
o.onCompleted();
} catch (Throwable e) {
onError(e);
if (!completed) {
completed = true;
List<T> a = list;
list = null;
try {
// sort the list before delivery
Collections.sort(a, sortFunction);
} catch (Throwable e) {
onError(e);
return;
}
producer.set(a);
}
}

@Override
public void onError(Throwable e) {
o.onError(e);
child.onError(e);
}

@Override
public void onNext(T value) {
list.add(value);
if (!completed) {
list.add(value);
}
}

};
child.add(result);
child.setProducer(producer);
return result;
}

// raw because we want to support Object for this default
@SuppressWarnings("rawtypes")
private static Func2 defaultSortFunction = new DefaultComparableFunction();
private static Comparator DEFAULT_SORT_FUNCTION = new DefaultComparableFunction();

private static class DefaultComparableFunction implements Func2<Object, Object, Integer> {
private static class DefaultComparableFunction implements Comparator<Object> {

// unchecked because we want to support Object for this default
@SuppressWarnings("unchecked")
@Override
public Integer call(Object t1, Object t2) {
public int compare(Object t1, Object t2) {
Comparable<Object> c1 = (Comparable<Object>) t1;
Comparable<Object> c2 = (Comparable<Object>) t2;
return c1.compareTo(c2);
Expand Down
87 changes: 87 additions & 0 deletions src/main/java/rx/internal/operators/SingleDelayedProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicInteger;

import rx.*;

/**
* A producer that holds a single value until it is requested and emits it followed by an onCompleted.
*/
public final class SingleDelayedProducer<T> extends AtomicInteger implements Producer {
/** */
private static final long serialVersionUID = 4721551710164477552L;
/** The actual child. */
final Subscriber<? super T> child;
/** The value to emit, acquired and released by compareAndSet. */
T value;
/** State flag: request() called with positive value. */
static final int REQUESTED = 1;
/** State flag: set() called. */
static final int SET = 2;
/**
* Constructs a SingleDelayedProducer with the given child as output.
* @param child the subscriber to emit the value and completion events
*/
public SingleDelayedProducer(Subscriber<? super T> child) {
this.child = child;
}
@Override
public void request(long n) {
if (n > 0) {
for (;;) {
int s = get();
// if already requested
if ((s & REQUESTED) != 0) {
break;
}
int u = s | REQUESTED;
if (compareAndSet(s, u)) {
if ((s & SET) != 0) {
emit();
}
break;
}
}
}
}
/**
* Sets the value to be emitted and emits it if there was a request.
* Should be called only once and from a single thread
* @param value the value to set and possibly emit
*/
public void set(T value) {
for (;;) {
int s = get();
// if already set
if ((s & SET) != 0) {
break;
}
int u = s | SET;
this.value = value;
if (compareAndSet(s, u)) {
if ((s & REQUESTED) != 0) {
emit();
}
break;
}
}
}
/**
* Emits the set value if the child is not unsubscribed and bounces back
* exceptions caught from child.onNext.
*/
void emit() {
try {
T v = value;
value = null; // do not hold onto the value
if (child.isUnsubscribed()) {
return;
}
child.onNext(v);
} catch (Throwable t) {
child.onError(t);
return;
}
child.onCompleted();
}
}
Loading

0 comments on commit aeee037

Please sign in to comment.