Skip to content

Commit

Permalink
Add an operator to throttle data via controlling the requests going u…
Browse files Browse the repository at this point in the history
…pstream.
  • Loading branch information
abersnaze committed Jun 1, 2016
1 parent ab6dbc1 commit d86bc3b
Show file tree
Hide file tree
Showing 3 changed files with 420 additions and 33 deletions.
86 changes: 53 additions & 33 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ static final class OnSubscribeExtend<T> implements OnSubscribe<T> {
OnSubscribeExtend(Observable<T> parent) {
this.parent = parent;
}
@Override
public void call(Subscriber<? super T> subscriber) {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.add(subscribe(subscriber, parent));
}
}
Expand Down Expand Up @@ -239,7 +239,7 @@ public void call(Subscriber<? super T> subscriber) {
*/
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
}

/**
* Transform an Observable by applying a particular Transformer function to it.
Expand Down Expand Up @@ -2807,8 +2807,8 @@ public static Observable<Integer> range(int start, int count, Scheduler schedule
*/
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second) {
return sequenceEqual(first, second, InternalObservableUtils.OBJECT_EQUALS);
}
}

/**
* Returns an Observable that emits a Boolean value that indicates whether two Observable sequences are the
* same by comparing the items emitted by each Observable pairwise based on the results of a specified
Expand Down Expand Up @@ -3132,7 +3132,7 @@ public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
return ws.toList().map(InternalObservableUtils.TO_ARRAY).lift(new OperatorZip<R>(zipFunction));
}

/**
* Returns an Observable that emits the results of a specified combiner function applied to combinations of
* two items emitted, in sequence, by two other Observables.
Expand Down Expand Up @@ -3994,7 +3994,7 @@ public final <R> Observable<R> cast(final Class<R> klass) {
*/
public final <R> Observable<R> collect(Func0<R> stateFactory, final Action2<R, ? super T> collector) {
Func2<R, T, R> accumulator = InternalObservableUtils.createCollectorCaller(collector);

/*
* Discussion and confirmation of implementation at
* https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532
Expand Down Expand Up @@ -4117,7 +4117,7 @@ public final Observable<T> concatWith(Observable<? extends T> t1) {
*/
public final Observable<Boolean> contains(final Object element) {
return exists(InternalObservableUtils.equalsWith(element));
}
}

/**
* Returns an Observable that emits the count of the total number of items emitted by the source Observable.
Expand Down Expand Up @@ -4782,7 +4782,7 @@ public final Observable<T> doOnSubscribe(final Action0 subscribe) {
public final Observable<T> doOnTerminate(final Action0 onTerminate) {
Action1<T> onNext = Actions.empty();
Action1<Throwable> onError = Actions.toAction1(onTerminate);

Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onTerminate);

return lift(new OperatorDoOnEach<T>(observer));
Expand Down Expand Up @@ -6698,6 +6698,26 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
return OperatorPublish.create(this, selector);
}

/**
* Allow the an external signal control the amount of data being set through this Observable chain.
* When the control Observable emits false (closes the valve) requests upstream are stopped and any
* requests from downstream for more data are buffered until the control Observable emits a true
* (opens the valve). Should the control Observable error or complete while closed (last control
* emition was a false) an error is sent down the data stream. The granularity breaks up large requests
* from downstream to limit the number of onNexts that are possible after the control valve has closed.
* The smaller the number the tighter the control on the flow but the more overhead there will be in
* managing the requests.
*
* @param control
* an Observable that dictates if request signals propagate upstream
* @param granularity
* the maximum number of outstanding requests.
* @returns an Observable that mostly stops emiting after the control Observable emits a false.
*/
public final Observable<T> pressureValve(Observable<Boolean> control, long granularity) {
return lift(new OperatorValve<T>(control, granularity));
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable, then feeds the result of that function along with the second item emitted by the source
Expand Down Expand Up @@ -6874,7 +6894,7 @@ public final Observable<T> repeat(final long count, Scheduler scheduler) {
*/
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return OnSubscribeRedo.repeat(this, InternalObservableUtils.createRepeatDematerializer(notificationHandler), scheduler);
}
}

/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
Expand All @@ -6897,7 +6917,7 @@ public final Observable<T> repeatWhen(final Func1<? super Observable<? extends V
*/
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) {
return OnSubscribeRedo.repeat(this, InternalObservableUtils.createRepeatDematerializer(notificationHandler));
}
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying Observable
Expand Down Expand Up @@ -6948,7 +6968,7 @@ public final ConnectableObservable<T> replay() {
*/
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return OperatorReplay.multicastSelector(InternalObservableUtils.createReplaySupplier(this), selector);
}
}

/**
* Returns an Observable that emits items that are the results of invoking a specified selector on items
Expand Down Expand Up @@ -7059,7 +7079,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
}
return OperatorReplay.multicastSelector(
InternalObservableUtils.createReplaySupplier(this, bufferSize, time, unit, scheduler), selector);
}
}

/**
* Returns an Observable that emits items that are the results of invoking a specified selector on items
Expand Down Expand Up @@ -7093,7 +7113,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
public final <R> Observable<R> replay(final Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final Scheduler scheduler) {
return OperatorReplay.multicastSelector(InternalObservableUtils.createReplaySupplier(this, bufferSize),
InternalObservableUtils.createReplaySelectorAndObserveOn(selector, scheduler));
}
}

/**
* Returns an Observable that emits items that are the results of invoking a specified selector on items
Expand Down Expand Up @@ -7162,7 +7182,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler) {
return OperatorReplay.multicastSelector(
InternalObservableUtils.createReplaySupplier(this, time, unit, scheduler), selector);
}
}

/**
* Returns an Observable that emits items that are the results of invoking a specified selector on items
Expand Down Expand Up @@ -7194,7 +7214,7 @@ public final <R> Observable<R> replay(final Func1<? super Observable<T>, ? exten
return OperatorReplay.multicastSelector(
InternalObservableUtils.createReplaySupplier(this),
InternalObservableUtils.createReplaySelectorAndObserveOn(selector, scheduler));
}
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription to the source Observable that
Expand Down Expand Up @@ -7530,7 +7550,7 @@ public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
*/
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
return OnSubscribeRedo.<T>retry(this, InternalObservableUtils.createRetryDematerializer(notificationHandler));
}
}

/**
* Returns an Observable that emits the same values as the source observable with the exception of an
Expand All @@ -7557,7 +7577,7 @@ public final Observable<T> retryWhen(final Func1<? super Observable<? extends Th
*/
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return OnSubscribeRedo.<T> retry(this, InternalObservableUtils.createRetryDematerializer(notificationHandler), scheduler);
}
}

/**
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
Expand Down Expand Up @@ -8361,7 +8381,7 @@ public final Subscription subscribe(final Action1<? super T> onNext) {
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
}

/**
* Subscribes to an Observable and provides callbacks to handle the items it emits and any error
Expand Down Expand Up @@ -8393,7 +8413,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, final Actio

Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
}

/**
* Subscribes to an Observable and provides callbacks to handle the items it emits and any error or
Expand Down Expand Up @@ -8452,7 +8472,7 @@ public final Subscription subscribe(final Observer<? super T> observer) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new ObserverSubscriber<T>(observer));
}
}

/**
* Subscribes to an Observable and invokes {@link OnSubscribe} function without any contract protection,
Expand Down Expand Up @@ -8579,19 +8599,19 @@ static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
}
return Subscriptions.unsubscribed();
}
Expand Down
Loading

0 comments on commit d86bc3b

Please sign in to comment.