Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operators: AsObservable, GroupBy w/ maxGroups, GroupByUntil w/ maxGroups #641

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Operators: AsObservable, GroupBy w/ maxGroups, GroupByUntil w/ maxGro…
…ups.
akarnokd committed Dec 19, 2013

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit a11e8d5b99ed6fa3928c179e002352fbb1f15e04
103 changes: 101 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@
import rx.operators.OperationAll;
import rx.operators.OperationAmb;
import rx.operators.OperationAny;
import rx.operators.OperationAsObservable;
import rx.operators.OperationAverage;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
@@ -510,7 +511,15 @@ public void onNext(T args) {
public Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}


/**
* Hides the identity of this observable.
* @return an Observable hiding the identity of this Observable.
*/
public Observable<T> asObservable() {
return create(new OperationAsObservable<T>(this));
}

/**
* Returns a {@link ConnectableObservable} that upon connection causes the
* source Observable to push results into the specified subject.
@@ -6815,9 +6824,99 @@ public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Fun
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229433.aspx">MSDN: Observable.GroupByUntil</a>
*/
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector) {
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector, -1));
}

/**
* Return an Observable which groups the items emitted by this Observable according to a specified key
* selector function until the duration Observable expires for the key or
* the total number of active groups exceeds the maxGroups value.
*
* @param <TKey> the group key type
* @param <TDuration> the duration element type
* @param keySelector a function to extract the key for each item
* @param durationSelector a function to signal the expiration of a group
* @param maxGroups the maximum allowed concurrent groups
* @return
*/
public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(
Func1<? super T, ? extends TKey> keySelector,
Func1<? super GroupedObservable<TKey, T>, ? extends Observable<TDuration>> durationSelector,
int maxGroups) {
return groupByUntil(keySelector, Functions.<T>identity(), durationSelector, maxGroups);
}

/**
* Return an Observable which groups the items emitted by this Observable according to specified key and
* value selector functions until the duration Observable expires for the
* key.
* @param <TKey> the group key type
* @param <TValue> the value type within the groups
* @param <TDuration> the duration element type
* @param keySelector a function to extract the key for each item
* @param valueSelector a function to map each source item to an item
* emitted by an Observable group
* @param durationSelector a function to signal the expiration of a group
* @param maxGroups the maximum allowed concurrent groups
* @return
*/
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(
Func1<? super T, ? extends TKey> keySelector,
Func1<? super T, ? extends TValue> valueSelector,
Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector,
int maxGroups) {
if (maxGroups < 0) {
throw new IllegalArgumentException("maxGroups >= 0 required");
}
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector, maxGroups));
}

/**
* Groups the items emitted by an Observable according to a specified
* criterion, and emits these grouped items as {@link GroupedObservable}s,
* one GroupedObservable per group and limits the number of active groups.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png">
*
* @param keySelector a function that extracts the key from an item
* @param elementSelector a function to map a source item to an item in a
* {@link GroupedObservable}
* @param maxGroups the maximum number of active groups.
* @param <K> the key type
* @param <R> the type of items emitted by the resulting
* {@link GroupedObservable}s
* @return an Observable that emits {@link GroupedObservable}s, each of
* which corresponds to a unique key value and emits items
* representing items from the source Observable that share that key
* value
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
*/
public <K, R> Observable<GroupedObservable<K, R>> groupBy(
final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector,
int maxGroups) {
return groupByUntil(keySelector, elementSelector, Functions.just1(never()), maxGroups);
}

/**
* Groups the items emitted by an Observable according to a specified
* criterion, and emits these grouped items as {@link GroupedObservable}s,
* one GroupedObservable per group and limits the number of active groups.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png">
*
* @param keySelector a function that extracts the key for each item
* @param maxGroups the maximum number of active groups.
* @param <K> the key type
* @return an Observable that emits {@link GroupedObservable}s, each of
* which corresponds to a unique key value and emits items
* representing items from the source Observable that share that key
* value
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
*/
public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector, int maxGroups) {
return groupByUntil(keySelector, Functions.just1(never()), maxGroups);
}

/**
* Invokes the specified function asynchronously and returns an Observable
* that emits the result.
37 changes: 37 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationAsObservable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;

/**
* Hides the identity of another observable.
* @param <T> the return value type of the wrapped observable.
*/
public final class OperationAsObservable<T> implements OnSubscribeFunc<T> {
private final Observable<? extends T> source;

public OperationAsObservable(Observable<? extends T> source) {
this.source = source;
}
@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return source.subscribe(t1);
}
}
50 changes: 30 additions & 20 deletions rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@@ -29,7 +30,6 @@
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
@@ -43,14 +43,19 @@ public class OperationGroupByUntil<TSource, TKey, TResult, TDuration> implements
final Func1<? super TSource, ? extends TKey> keySelector;
final Func1<? super TSource, ? extends TResult> valueSelector;
final Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector;
/** Number of active groups at once. */
final int capacity;
public OperationGroupByUntil(Observable<TSource> source,
Func1<? super TSource, ? extends TKey> keySelector,
Func1<? super TSource, ? extends TResult> valueSelector,
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector) {
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector,
int capacity
) {
this.source = source;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.durationSelector = durationSelector;
this.capacity = capacity;
}

@Override
@@ -61,17 +66,35 @@ public Subscription onSubscribe(Observer<? super GroupedObservable<TKey, TResult
return cancel;
}
/** The source value sink and group manager. */
class ResultSink implements Observer<TSource> {
final class ResultSink implements Observer<TSource> {
/** Guarded by gate. */
protected final Observer<? super GroupedObservable<TKey, TResult>> observer;
protected final Subscription cancel;
protected final CompositeSubscription group = new CompositeSubscription();
protected final Object gate = new Object();
/** Guarded by gate. */
protected final Map<TKey, GroupSubject<TKey, TResult>> map = new HashMap<TKey, GroupSubject<TKey, TResult>>();
protected final Map<TKey, GroupSubject<TKey, TResult>> map;
public ResultSink(Observer<? super GroupedObservable<TKey, TResult>> observer, Subscription cancel) {
this.observer = observer;
this.cancel = cancel;
Map<TKey, GroupSubject<TKey, TResult>> map0;
if (capacity < 0) {
map0 = new HashMap<TKey, GroupSubject<TKey, TResult>>();
} else {
map0 = new LinkedHashMap<TKey, GroupSubject<TKey, TResult>>() {

@Override
protected boolean removeEldestEntry(Map.Entry<TKey, GroupSubject<TKey, TResult>> eldest) {
if (size() > capacity) {
eldest.getValue().onCompleted();
return true;
}
return false;
}

};
}
this.map = map0;
}
/** Prepare the subscription tree. */
public Subscription run() {
@@ -173,7 +196,7 @@ public void expire(TKey key, Subscription handle) {
handle.unsubscribe();
}
/** Observe the completion of a group. */
class DurationObserver implements Observer<TDuration> {
final class DurationObserver implements Observer<TDuration> {
final TKey key;
final Subscription handle;
public DurationObserver(TKey key, Subscription handle) {
@@ -197,27 +220,14 @@ public void onCompleted() {

}
}
protected static <T> OnSubscribeFunc<T> neverSubscribe() {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return Subscriptions.empty();
}
};
}
/** A grouped observable with subject-like behavior. */
public static class GroupSubject<K, V> extends GroupedObservable<K, V> implements Observer<V> {
public static final class GroupSubject<K, V> extends GroupedObservable<K, V> implements Observer<V> {
protected final Subject<V, V> publish;
public GroupSubject(K key, Subject<V, V> publish) {
super(key, OperationGroupByUntil.<V>neverSubscribe());
super(key, OperationReplay.subscriberOf(publish));
this.publish = publish;
}

@Override
public Subscription subscribe(Observer<? super V> observer) {
return publish.subscribe(observer);
}

@Override
public void onNext(V args) {
publish.onNext(args);
46 changes: 46 additions & 0 deletions rxjava-core/src/main/java/rx/util/functions/Functions.java
Original file line number Diff line number Diff line change
@@ -341,6 +341,52 @@ public T call(T o) {
};
}

/**
* Return a Func0 which returns the given constant value.
* @param <R> the result type
* @param value the constant value to return
* @return a function which returns the given constant value.
*/
public static <R> Func0<R> just0(final R value) {
return new Func0<R>() {
@Override
public R call() {
return value;
}
};
}
/**
* Return a Func1 which returns the given constant value.
* @param <T1> the first parameter type
* @param <R> the result type
* @param value the constant value to return
* @return a function which returns the given constant value.
*/
public static <T1, R> Func1<T1, R> just1(final R value) {
return new Func1<T1, R>() {
@Override
public R call(T1 t1) {
return value;
}
};
}
/**
* Return a Func2 which returns the given constant value.
* @param <T1> the first parameter type
* @param <T2> the second parameter type
* @param <R> the result type
* @param value the constant value to return
* @return a function which returns the given constant value.
*/
public static <T1, T2, R> Func2<T1, T2, R> just2(final R value) {
return new Func2<T1, T2, R>() {
@Override
public R call(T1 t1, T2 t2) {
return value;
}
};
}

private enum AlwaysTrue implements Func1<Object, Boolean> {
INSTANCE;

Loading