Skip to content

Commit

Permalink
Merge pull request #1727 from benjchristensen/groupByWithBackpressure
Browse files Browse the repository at this point in the history
Proposed groupBy/groupByUntil Changes
  • Loading branch information
benjchristensen committed Oct 6, 2014
2 parents 84c88bd + b930295 commit 0d74d98
Show file tree
Hide file tree
Showing 5 changed files with 616 additions and 868 deletions.
89 changes: 0 additions & 89 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4708,15 +4708,6 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
Expand Down Expand Up @@ -4746,15 +4737,6 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
Expand All @@ -4770,77 +4752,6 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super
return lift(new OperatorGroupBy<T, K, T>(keySelector));
}

/**
* Groups the items emitted by an {@code Observable} according to a specified key selector function until
* the duration {@code Observable} expires for the key.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupByUntil.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupByUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function to extract the key for each item
* @param durationSelector
* a function to signal the expiration of a group
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a key
* value and each of which emits all items emitted by the source {@code Observable} during that
* key's duration that share that same key value
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupByUntil</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211932.aspx">MSDN: Observable.GroupByUntil</a>
*/
public final <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super GroupedObservable<TKey, T>, ? extends Observable<? extends TDuration>> durationSelector) {
return groupByUntil(keySelector, Functions.<T> identity(), durationSelector);
}

/**
* Groups the items emitted by an {@code Observable} (transformed by a selector) according to a specified
* key selector function until the duration Observable expires for the key.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupByUntil.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupByUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function to extract the key for each item
* @param valueSelector
* a function to map each item emitted by the source {@code Observable} to an item emitted by one
* of the resulting {@link GroupedObservable}s
* @param durationSelector
* a function to signal the expiration of a group
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a key
* value and each of which emits all items emitted by the source {@code Observable} during that
* key's duration that share that same key value, transformed by the value selector
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupByUntil</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229433.aspx">MSDN: Observable.GroupByUntil</a>
*/
public final <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<? extends TDuration>> durationSelector) {
return lift(new OperatorGroupByUntil<T, TKey, TValue, TDuration>(keySelector, valueSelector, durationSelector));
}

/**
* Returns an Observable that correlates two Observables when they overlap in time and groups the results.
* <p>
Expand Down
Loading

0 comments on commit 0d74d98

Please sign in to comment.