Skip to content

Commit

Permalink
Operation: Replay additional overloads
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Dec 12, 2013
1 parent 9a59039 commit 0165f52
Show file tree
Hide file tree
Showing 4 changed files with 790 additions and 24 deletions.
336 changes: 331 additions & 5 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallel;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
Expand Down Expand Up @@ -526,6 +527,24 @@ public <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends R> su
return OperationMulticast.multicast(this, subject);
}

/**
* Returns an observable sequence that contains the elements of a sequence
* produced by multicasting the source sequence within a selector function.
*
* @param subjectFactory the subject factory
* @param selector The selector function which can use the multicasted
* source sequence subject to the policies enforced by the
* created subject.
* @return the Observable sequence that contains the elements of a sequence
* produced by multicasting the source sequence within a selector function.
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229708.aspx'>MSDN: Observable.Multicast</a>
*/
public <TIntermediate, TResult> Observable<TResult> multicast(
final Func0<? extends Subject<? super T, ? extends TIntermediate>> subjectFactory,
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
return OperationMulticast.multicast(this, subjectFactory, selector);
}
/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from
* onError.
Expand Down Expand Up @@ -4277,20 +4296,327 @@ public <R> Observable<List<T>> maxBy(Func1<T, R> selector, Comparator<? super R>
public ConnectableObservable<T> replay() {
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription
* to the underlying Observable that will replay all of its items and
* notifications to any future {@link Observer} on the given scheduler
* notifications to any future {@link Observer} on the given scheduler.
*
* @param scheduler
* @return
* @param scheduler the scheduler where the Observers will receive the events
* @return a {@link ConnectableObservable} that shares a single subscription
* to the underlying Observable that will replay all of its items and
* notifications to any future {@link Observer} on the given scheduler
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211699.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(Scheduler scheduler) {
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject.<T>create(), scheduler));
}

/**
* Returns a connectable observable sequence that shares a single subscription
* to the underlying sequence replaying bufferSize notifications.
*
* @param bufferSize the buffer size
* @return a connectable observable sequence that shares a single subscription
* to the underlying sequence replaying bufferSize notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211976.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(int bufferSize) {
return OperationMulticast.multicast(this, OperationReplay.<T>replayBuffered(bufferSize));
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications.
*
* @param bufferSize the buffer size
* @param scheduler the scheduler where the Observers will receive the events
* @return a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229814.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
return OperationMulticast.multicast(this,
OperationReplay.createScheduledSubject(
OperationReplay.<T>replayBuffered(bufferSize), scheduler));
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window.
*
* @param time the window length
* @param unit the window length time unit
* @return a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229232.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(long time, TimeUnit unit) {
return replay(time, unit, Schedulers.threadPoolForComputation());
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window.
*
* @param time the window length
* @param unit the window length time unit
* @param scheduler the scheduler which is used as a time source for the window
* @return a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211811.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
return OperationMulticast.multicast(this, OperationReplay.<T>replayWindowed(time, unit, -1, scheduler));
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications within window.
*
* @param bufferSize the buffer size
* @param time the window length
* @param unit the window length time unit
* @return Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229874.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit) {
return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation());
}

/**
* Returns a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications within window.
*
* @param bufferSize the buffer size
* @param time the window length
* @param unit the window length time unit
* @param scheduler the scheduler which is used as a time source for the window
* @return a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211759.aspx'>MSDN: Observable.Replay</a>
*/
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
if (bufferSize < 0) {
throw new IllegalArgumentException("bufferSize < 0");
}
return OperationMulticast.multicast(this, OperationReplay.<T>replayWindowed(time, unit, bufferSize, scheduler));
}

/**
* Returns an observable sequence that is the result of invoking the selector
* on a connectable observable sequence that shares a single subscription to
* the underlying sequence and starts with initial value.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @return an observable sequence that is the result of invoking the selector
* on a connectable observable sequence that shares a single subscription to
* the underlying sequence and starts with initial value
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229653.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return ReplaySubject.create();
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param scheduler the scheduler where the replay is observed
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211644.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final Scheduler scheduler) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.createScheduledSubject(ReplaySubject.<T>create(), scheduler);
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param bufferSize the buffer size
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211675.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.replayBuffered(bufferSize);
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param bufferSize the buffer size
* @param scheduler the scheduler where the replay is observed
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229928.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final Scheduler scheduler) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.<T>createScheduledSubject(OperationReplay.<T>replayBuffered(bufferSize), scheduler);
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking
* the selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param time the window length
* @param unit the window length time unit
* @return an observable sequence that is the result of invoking
* the selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229526.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, long time, TimeUnit unit) {
return replay(selector, time, unit, Schedulers.threadPoolForComputation());
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param time the window length
* @param unit the window length time unit
* @param scheduler the scheduler which is used as a time source for the window
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying all notifications within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh244327.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler) {
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.replayWindowed(time, unit, -1, scheduler);
}
}, selector);
}

/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
* within window.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param bufferSize the buffer size
* @param time the window length
* @param unit the window length time unit
*
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
* within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh228952.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, int bufferSize, long time, TimeUnit unit) {
return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation());
}


/**
* Returns an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
* within window.
*
* @param <R> the return element type
* @param selector The selector function which can use the multicasted
* this sequence as many times as needed, without causing
* multiple subscriptions to this sequence.
* @param bufferSize the buffer size
* @param time the window length
* @param unit the window length time unit
* @param scheduler the scheduler which is used as a time source for the window
*
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence replaying bufferSize notifications
* within window
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229404.aspx'>MSDN: Observable.Replay</a>
*/
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
if (bufferSize < 0) {
throw new IllegalArgumentException("bufferSize < 0");
}
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler);
}
}, selector);
}

/**
* Retry subscription to the source Observable when it calls
* <code>onError</code> up to a certain number of retries.
Expand Down
Loading

0 comments on commit 0165f52

Please sign in to comment.