Skip to content

Commit

Permalink
Merge pull request #734 from akarnokd/DelayViaObservable
Browse files Browse the repository at this point in the history
Delay with subscription and item delaying observables.
  • Loading branch information
benjchristensen committed Jan 14, 2014
2 parents 8a29a64 + 337efeb commit 22fc397
Show file tree
Hide file tree
Showing 3 changed files with 555 additions and 0 deletions.
35 changes: 35 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,41 @@ public static Observable<Long> timer(long initialDelay, long period, TimeUnit un
return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler));
}

/**
* Create an Observable which delays the events via another Observable on a per item-basis.
* <p>
* Note: onError event is immediately propagated.
*
* @param <U> the item delay value type (ignored)
* @param itemDelay function that returns an Observable for each source item which is
* then used for delaying that particular item until the Observable
* fires its first onNext event.
* @return an Observable which delays the events via another Observable on a per item-basis.
*/
public <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDelay) {
return create(OperationDelay.delay(this, itemDelay));
}
/**
* Create an Observable which delays the subscription and events via another Observables on a per item-basis.
* <p>
* Note: onError event is immediately propagated.
*
* @param <U> the subscription delay value type (ignored)
* @param <V> the item delay value type (ignored)
* @param subscriptionDelay function that returns an Observable which will trigger
* the subscription to the source observable once it fires an
* onNext event.
* @param itemDelay function that returns an Observable for each source item which is
* then used for delaying that particular item until the Observable
* fires its first onNext event.
* @return an Observable which delays the events via another Observable on a per item-basis.
*/
public <U, V> Observable<T> delay(
Func0<? extends Observable<U>> subscriptionDelay,
Func1<? super T, ? extends Observable<V>> itemDelay) {
return create(OperationDelay.delay(this, subscriptionDelay, itemDelay));
}

/**
* Returns an Observable that emits the items emitted by the source
* Observable shifted forward in time by a specified delay. Error
Expand Down
212 changes: 212 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import rx.Scheduler;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public final class OperationDelay {
Expand Down Expand Up @@ -82,4 +85,213 @@ public void call() {
return ssub;
}
}
/**
* Delay the emission of the source items by a per-item observable that fires its first element.
*/
public static <T, U> OnSubscribeFunc<T> delay(Observable<? extends T> source,
Func1<? super T, ? extends Observable<U>> itemDelay) {
return new DelayViaObservable<T, Object, U>(source, null, itemDelay);
}
/**
* Delay the subscription and emission of the source items by a per-item observable that fires its first element.
*/
public static <T, U, V> OnSubscribeFunc<T> delay(Observable<? extends T> source,
Func0<? extends Observable<U>> subscriptionDelay,
Func1<? super T, ? extends Observable<V>> itemDelay) {
return new DelayViaObservable<T, U, V>(source, subscriptionDelay, itemDelay);
}
/**
* Delay the emission of the source items by a per-item observable that fires its first element.
*/
private static final class DelayViaObservable<T, U, V> implements OnSubscribeFunc<T> {
final Observable<? extends T> source;
final Func0<? extends Observable<U>> subscriptionDelay;
final Func1<? super T, ? extends Observable<V>> itemDelay;

public DelayViaObservable(Observable<? extends T> source,
Func0<? extends Observable<U>> subscriptionDelay,
Func1<? super T, ? extends Observable<V>> itemDelay) {
this.source = source;
this.subscriptionDelay = subscriptionDelay;
this.itemDelay = itemDelay;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
CompositeSubscription csub = new CompositeSubscription();

SerialSubscription sosub = new SerialSubscription();
csub.add(sosub);
SourceObserver<T, V> so = new SourceObserver<T, V>(t1, itemDelay, csub, sosub);
if (subscriptionDelay == null) {
sosub.set(source.subscribe(so));
} else {
Observable<U> subscriptionSource;
try {
subscriptionSource = subscriptionDelay.call();
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
}
SerialSubscription ssub = new SerialSubscription();
csub.add(ssub);
ssub.set(subscriptionSource.subscribe(new SubscribeDelay<T, U, V>(source, so, csub, ssub)));
}

return csub;
}
/** Subscribe delay observer. */
private static final class SubscribeDelay<T, U, V> implements Observer<U> {
final Observable<? extends T> source;
final SourceObserver<T, V> so;
final CompositeSubscription csub;
final Subscription self;
/** Prevent any onError once the first item was delivered. */
boolean subscribed;

public SubscribeDelay(
Observable<? extends T> source,
SourceObserver<T, V> so,
CompositeSubscription csub, Subscription self) {
this.source = source;
this.so = so;
this.csub = csub;
this.self = self;
}

@Override
public void onNext(U args) {
onCompleted();
}

@Override
public void onError(Throwable e) {
if (!subscribed) {
so.observer.onError(e);
csub.unsubscribe();
}
}

@Override
public void onCompleted() {
subscribed = true;
csub.remove(self);
so.self.set(source.subscribe(so));
}
}
/** The source observer. */
private static final class SourceObserver<T, U> implements Observer<T> {
final Observer<? super T> observer;
final Func1<? super T, ? extends Observable<U>> itemDelay;
final CompositeSubscription csub;
final SerialSubscription self;
/** Guard to avoid overlapping events from the various sources. */
final Object guard;
boolean done;
int wip;

public SourceObserver(Observer<? super T> observer,
Func1<? super T, ? extends Observable<U>> itemDelay,
CompositeSubscription csub,
SerialSubscription self) {
this.observer = observer;
this.itemDelay = itemDelay;
this.csub = csub;
this.guard = new Object();
this.self = self;
}

@Override
public void onNext(T args) {
Observable<U> delayer;
try {
delayer = itemDelay.call(args);
} catch (Throwable t) {
onError(t);
return;
}

synchronized (guard) {
wip++;
}

SerialSubscription ssub = new SerialSubscription();
csub.add(ssub);
ssub.set(delayer.subscribe(new DelayObserver<T, U>(args, this, ssub)));
}

@Override
public void onError(Throwable e) {
synchronized (guard) {
observer.onError(e);
}
csub.unsubscribe();
}

@Override
public void onCompleted() {
boolean b;
synchronized (guard) {
done = true;
b = checkDone();
}
if (b) {
csub.unsubscribe();
} else {
self.unsubscribe();
}
}

void emit(T value, Subscription token) {
boolean b;
synchronized (guard) {
observer.onNext(value);
wip--;
b = checkDone();
}
if (b) {
csub.unsubscribe();
} else {
csub.remove(token);
}
}
boolean checkDone() {
if (done && wip == 0) {
observer.onCompleted();
return true;
}
return false;
}
}
/**
* Delay observer.
*/
private static final class DelayObserver<T, U> implements Observer<U> {
final T value;
final SourceObserver<T, U> parent;
final Subscription token;

public DelayObserver(T value, SourceObserver<T, U> parent, Subscription token) {
this.value = value;
this.parent = parent;
this.token = token;
}

@Override
public void onNext(U args) {
parent.emit(value, token);
}

@Override
public void onError(Throwable e) {
parent.onError(e);
}

@Override
public void onCompleted() {
parent.emit(value, token);
}

}
}
}
Loading

0 comments on commit 22fc397

Please sign in to comment.