Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operators DelaySubscription, TakeLast w/ time, TakeLastBuffer #638

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2123,6 +2123,31 @@ public Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
return OperationDelay.delay(this, delay, unit, scheduler);
}

/**
* Return an Observable which delays the subscription to this Observable sequence
* by the given amount.
* @param delay the time to delay the subscription
* @param unit the time unit
* @return an Observable which delays the subscription to this Observable sequence
* by the given amount.
*/
public Observable<T> delaySubscription(long delay, TimeUnit unit) {
return delaySubscription(delay, unit, Schedulers.threadPoolForComputation());
}

/**
* Return an Observable which delays the subscription to this Observable sequence
* by the given amount, waiting and subscribing on the given scheduler.
* @param delay the time to delay the subscription
* @param unit the time unit
* @param scheduler the scheduler where the waiting and subscription will happen
* @return an Observable which delays the subscription to this Observable sequence
* by the given amount, waiting and subscribing on the given scheduler
*/
public Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
return create(OperationDelay.delaySubscription(this, delay, unit, scheduler));
}

/**
* Drops items emitted by an Observable that are followed by newer items
* before a timeout value expires. The timer resets on each emission.
Expand Down Expand Up @@ -5289,6 +5314,148 @@ public Observable<T> takeLast(final int count) {
return create(OperationTakeLast.takeLast(this, count));
}

/**
* Return an Observable which contains the items from this observable which
* were emitted not before this completed minus a time window.
*
* @param time the length of the time window, relative to the completion of this
* observable.
* @param unit the time unit
* @return an Observable which contains the items from this observable which
* were emitted not before this completed minus a time window.
*/
public Observable<T> takeLast(long time, TimeUnit unit) {
return takeLast(time, unit, Schedulers.threadPoolForComputation());
}

/**
* Return an Observable which contains the items from this observable which
* were emitted not before this completed minus a time window, where the timing
* information is provided by the given scheduler.
*
* @param time the length of the time window, relative to the completion of this
* observable.
* @param unit the time unit
* @param scheduler the scheduler which provides the timestamps for the observed
* elements
* @return an Observable which contains the items from this observable which
* were emitted not before this completed minus a time window, where the timing
* information is provided by the given scheduler
*/
public Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler) {
return create(OperationTakeLast.takeLast(this, time, unit, scheduler));
}

/**
* Return an Observable which contains at most count items from this Observable
* which were emitted not before this completed minus a time window.
*
* @param count the maximum number of items to return
* @param time the length of the time window, relative to the completion of this
* observable.
* @param unit the time unit
* @return Return an Observable which contains at most count items from this Observable
* which were emitted not before this completed minus a time window.
*/
public Observable<T> takeLast(int count, long time, TimeUnit unit) {
return takeLast(count, time, unit, Schedulers.threadPoolForComputation());
}

/**
* Return an Observable which contains at most count items from this Observable
* which were emitted not before this completed minus a time window, where the timing
* information is provided by the given scheduler.
*
* @param count the maximum number of items to return
* @param time the length of the time window, relative to the completion of this
* observable.
* @param unit the time unit
* @param scheduler the scheduler which provides the timestamps for the observed
* elements
* @return Return an Observable which contains at most count items from this Observable
* which were emitted not before this completed minus a time window, where the timing
* information is provided by the given scheduler
*/
public Observable<T> takeLast(int count, long time, TimeUnit unit, Scheduler scheduler) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required");
}
return create(OperationTakeLast.takeLast(this, count, time, unit, scheduler));
}

/**
* Return an Observable which emits single List containing the last count
* elements from this Observable.
*
* @param count the number of items to take last
* @return an Observable which emits single list containing the last count
* elements from this Observable.
*/
public Observable<List<T>> takeLastBuffer(int count) {
return takeLast(count).toList();
}

/**
* Return an Observable which emits single List containing items which
* were emitted not before this completed minus a time window.
* @param time the length of the time window, relative to the completion of this
* observable.
* @param unit the time unit
* @return an Observable which emits single list containing items which
* were emitted not before this completed minus a time window
*/
public Observable<List<T>> takeLastBuffer(long time, TimeUnit unit) {
return takeLast(time, unit).toList();
}

/**
* Return an Observable which emits single List containing items which
* were emitted not before this completed minus a time window, where the timing
* information is provided by the given scheduler.
* @param time the length of the time window, relative to the completion of this
* observable.
* @param unit the time unit
* @param scheduler the scheduler which provides the timestamps for the observed
* elements
* @return an Observable which emits single list containing items which
* were emitted not before this completed minus a time window, where the timing
* information is provided by the given scheduler
*/
public Observable<List<T>> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) {
return takeLast(time, unit, scheduler).toList();
}

/**
* Return an Observable which emits a single List containing at most count items
* from this Observable which were emitted not before this completed minus a time window.
* @param count the number of items to take last
* @param time the length of the time window, relative to the completion of this
* observable.
* @param unit the time unit
* @return an Observable which emits a single List containing at most count items
* from this Observable which were emitted not before this completed minus a time window.
*/
public Observable<List<T>> takeLastBuffer(int count, long time, TimeUnit unit) {
return takeLast(count, time, unit).toList();
}

/**
* Return an Observable which emits a single List containing at most count items
* from this Observable which were emitted not before this completed minus a time window.
* @param count the number of items to take last
* @param time the length of the time window, relative to the completion of this
* observable.
* @param unit the time unit
* @param scheduler the scheduler which provides the timestamps for the observed
* elements
* @return an Observable which emits a single List containing at most count items
* from this Observable which were emitted not before this completed minus a time window.
*/
public Observable<List<T>> takeLastBuffer(int count, long time, TimeUnit unit, Scheduler scheduler) {
return takeLast(count, time, unit, scheduler).toList();
}


/**
* Returns an Observable that emits the items from the source Observable
* only until the <code>other</code> Observable emits an item.
Expand Down
42 changes: 42 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

public final class OperationDelay {
Expand All @@ -40,4 +45,41 @@ public T call(Long ignored) {
});
return Observable.concat(seqs);
}

/**
* Delays the subscription to the source by the given amount, running on the given scheduler.
*/
public static <T> OnSubscribeFunc<T> delaySubscription(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
return new DelaySubscribeFunc<T>(source, time, unit, scheduler);
}

/** Subscribe function which schedules the actual subscription to source on a scheduler at a later time. */
private static final class DelaySubscribeFunc<T> implements OnSubscribeFunc<T> {
final Observable<? extends T> source;
final Scheduler scheduler;
final long time;
final TimeUnit unit;

public DelaySubscribeFunc(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
this.time = time;
this.unit = unit;
}
@Override
public Subscription onSubscribe(final Observer<? super T> t1) {
final SerialSubscription ssub = new SerialSubscription();

ssub.setSubscription(scheduler.schedule(new Action0() {
@Override
public void call() {
if (!ssub.isUnsubscribed()) {
ssub.setSubscription(source.subscribe(t1));
}
}
}, time, unit));

return ssub;
}
}
}
121 changes: 121 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationTakeLast.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.SingleAssignmentSubscription;
import rx.util.Timestamped;

/**
* Returns an Observable that emits the last <code>count</code> items emitted by the source
Expand Down Expand Up @@ -119,4 +123,121 @@ public void onNext(T value) {
}

}

/**
* Returns the items emitted by source whose arrived in the time window
* before the source completed.
*/
public static <T> OnSubscribeFunc<T> takeLast(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
return new TakeLastTimed<T>(source, -1, time, unit, scheduler);
}

/**
* Returns the items emitted by source whose arrived in the time window
* before the source completed and at most count values.
*/
public static <T> OnSubscribeFunc<T> takeLast(Observable<? extends T> source, int count, long time, TimeUnit unit, Scheduler scheduler) {
return new TakeLastTimed<T>(source, count, time, unit, scheduler);
}

/** Take only the values which appeared some time before the completion. */
static final class TakeLastTimed<T> implements OnSubscribeFunc<T> {
final Observable<? extends T> source;
final long ageMillis;
final Scheduler scheduler;
final int count;

public TakeLastTimed(Observable<? extends T> source, int count, long time, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.ageMillis = unit.toMillis(time);
this.scheduler = scheduler;
this.count = count;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
SingleAssignmentSubscription sas = new SingleAssignmentSubscription();
sas.set(source.subscribe(new TakeLastTimedObserver<T>(t1, sas, count, ageMillis, scheduler)));
return sas;
}
}
/** Observes source values and keeps the most recent items. */
static final class TakeLastTimedObserver<T> implements Observer<T> {
final Observer<? super T> observer;
final Subscription cancel;
final long ageMillis;
final Scheduler scheduler;
/** -1 indicates unlimited buffer. */
final int count;

final Deque<Timestamped<T>> buffer = new LinkedList<Timestamped<T>>();

public TakeLastTimedObserver(Observer<? super T> observer, Subscription cancel,
int count, long ageMillis, Scheduler scheduler) {
this.observer = observer;
this.cancel = cancel;
this.ageMillis = ageMillis;
this.scheduler = scheduler;
this.count = count;
}

protected void runEvictionPolicy(long now) {
// trim size
while (count >= 0 && buffer.size() > count) {
buffer.pollFirst();
}
// remove old entries
while (!buffer.isEmpty()) {
Timestamped<T> v = buffer.peekFirst();
if (v.getTimestampMillis() < now - ageMillis) {
buffer.pollFirst();
} else {
break;
}
}
}

@Override
public void onNext(T args) {
long t = scheduler.now();
buffer.add(new Timestamped<T>(t, args));
runEvictionPolicy(t);
}

@Override
public void onError(Throwable e) {
buffer.clear();
observer.onError(e);
cancel.unsubscribe();
}

/**
* Emit the contents of the buffer.
* @return true if no exception was raised in the process
*/
protected boolean emitBuffer() {
for (Timestamped<T> v : buffer) {
try {
observer.onNext(v.getValue());
} catch (Throwable t) {
buffer.clear();
observer.onError(t);
return false;
}
}
buffer.clear();
return true;
}

@Override
public void onCompleted() {
runEvictionPolicy(scheduler.now());

if (emitBuffer()) {
observer.onCompleted();
}
cancel.unsubscribe();
}

}
}
Loading