Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay with subscription and item delaying observables. #734

Merged
merged 2 commits into from
Jan 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,41 @@ public static Observable<Long> timer(long initialDelay, long period, TimeUnit un
return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler));
}

/**
* Create an Observable which delays the events via another Observable on a per item-basis.
* <p>
* Note: onError event is immediately propagated.
*
* @param <U> the item delay value type (ignored)
* @param itemDelay function that returns an Observable for each source item which is
* then used for delaying that particular item until the Observable
* fires its first onNext event.
* @return an Observable which delays the events via another Observable on a per item-basis.
*/
public <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDelay) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the following codes do the same thing?

    public <U> Observable<T> delay(
            final Func1<? super T, ? extends Observable<U>> itemDelay) {
        return flatMap(new Func1<T, Observable<T>>() {
            @Override
            public Observable<T> call(final T t1) {
                return itemDelay.call(t1).take(1).map(new Func1<U, T>() {
                    @Override
                    public T call(U ignored) {
                        return t1;
                    }
                });
            }
        });
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked Rx.NET and both onNext and onCompleted of the itemDelay's Observable trigger the value emission so this and mine is incorrect.

return create(OperationDelay.delay(this, itemDelay));
}
/**
* Create an Observable which delays the subscription and events via another Observables on a per item-basis.
* <p>
* Note: onError event is immediately propagated.
*
* @param <U> the subscription delay value type (ignored)
* @param <V> the item delay value type (ignored)
* @param subscriptionDelay function that returns an Observable which will trigger
* the subscription to the source observable once it fires an
* onNext event.
* @param itemDelay function that returns an Observable for each source item which is
* then used for delaying that particular item until the Observable
* fires its first onNext event.
* @return an Observable which delays the events via another Observable on a per item-basis.
*/
public <U, V> Observable<T> delay(
Func0<? extends Observable<U>> subscriptionDelay,
Func1<? super T, ? extends Observable<V>> itemDelay) {
return create(OperationDelay.delay(this, subscriptionDelay, itemDelay));
}

/**
* Returns an Observable that emits the items emitted by the source
* Observable shifted forward in time by a specified delay. Error
Expand Down
212 changes: 212 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import rx.Scheduler;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public final class OperationDelay {
Expand Down Expand Up @@ -82,4 +85,213 @@ public void call() {
return ssub;
}
}
/**
* Delay the emission of the source items by a per-item observable that fires its first element.
*/
public static <T, U> OnSubscribeFunc<T> delay(Observable<? extends T> source,
Func1<? super T, ? extends Observable<U>> itemDelay) {
return new DelayViaObservable<T, Object, U>(source, null, itemDelay);
}
/**
* Delay the subscription and emission of the source items by a per-item observable that fires its first element.
*/
public static <T, U, V> OnSubscribeFunc<T> delay(Observable<? extends T> source,
Func0<? extends Observable<U>> subscriptionDelay,
Func1<? super T, ? extends Observable<V>> itemDelay) {
return new DelayViaObservable<T, U, V>(source, subscriptionDelay, itemDelay);
}
/**
* Delay the emission of the source items by a per-item observable that fires its first element.
*/
private static final class DelayViaObservable<T, U, V> implements OnSubscribeFunc<T> {
final Observable<? extends T> source;
final Func0<? extends Observable<U>> subscriptionDelay;
final Func1<? super T, ? extends Observable<V>> itemDelay;

public DelayViaObservable(Observable<? extends T> source,
Func0<? extends Observable<U>> subscriptionDelay,
Func1<? super T, ? extends Observable<V>> itemDelay) {
this.source = source;
this.subscriptionDelay = subscriptionDelay;
this.itemDelay = itemDelay;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
CompositeSubscription csub = new CompositeSubscription();

SerialSubscription sosub = new SerialSubscription();
csub.add(sosub);
SourceObserver<T, V> so = new SourceObserver<T, V>(t1, itemDelay, csub, sosub);
if (subscriptionDelay == null) {
sosub.set(source.subscribe(so));
} else {
Observable<U> subscriptionSource;
try {
subscriptionSource = subscriptionDelay.call();
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
}
SerialSubscription ssub = new SerialSubscription();
csub.add(ssub);
ssub.set(subscriptionSource.subscribe(new SubscribeDelay<T, U, V>(source, so, csub, ssub)));
}

return csub;
}
/** Subscribe delay observer. */
private static final class SubscribeDelay<T, U, V> implements Observer<U> {
final Observable<? extends T> source;
final SourceObserver<T, V> so;
final CompositeSubscription csub;
final Subscription self;
/** Prevent any onError once the first item was delivered. */
boolean subscribed;

public SubscribeDelay(
Observable<? extends T> source,
SourceObserver<T, V> so,
CompositeSubscription csub, Subscription self) {
this.source = source;
this.so = so;
this.csub = csub;
this.self = self;
}

@Override
public void onNext(U args) {
onCompleted();
}

@Override
public void onError(Throwable e) {
if (!subscribed) {
so.observer.onError(e);
csub.unsubscribe();
}
}

@Override
public void onCompleted() {
subscribed = true;
csub.remove(self);
so.self.set(source.subscribe(so));
}
}
/** The source observer. */
private static final class SourceObserver<T, U> implements Observer<T> {
final Observer<? super T> observer;
final Func1<? super T, ? extends Observable<U>> itemDelay;
final CompositeSubscription csub;
final SerialSubscription self;
/** Guard to avoid overlapping events from the various sources. */
final Object guard;
boolean done;
int wip;

public SourceObserver(Observer<? super T> observer,
Func1<? super T, ? extends Observable<U>> itemDelay,
CompositeSubscription csub,
SerialSubscription self) {
this.observer = observer;
this.itemDelay = itemDelay;
this.csub = csub;
this.guard = new Object();
this.self = self;
}

@Override
public void onNext(T args) {
Observable<U> delayer;
try {
delayer = itemDelay.call(args);
} catch (Throwable t) {
onError(t);
return;
}

synchronized (guard) {
wip++;
}

SerialSubscription ssub = new SerialSubscription();
csub.add(ssub);
ssub.set(delayer.subscribe(new DelayObserver<T, U>(args, this, ssub)));
}

@Override
public void onError(Throwable e) {
synchronized (guard) {
observer.onError(e);
}
csub.unsubscribe();
}

@Override
public void onCompleted() {
boolean b;
synchronized (guard) {
done = true;
b = checkDone();
}
if (b) {
csub.unsubscribe();
} else {
self.unsubscribe();
}
}

void emit(T value, Subscription token) {
boolean b;
synchronized (guard) {
observer.onNext(value);
wip--;
b = checkDone();
}
if (b) {
csub.unsubscribe();
} else {
csub.remove(token);
}
}
boolean checkDone() {
if (done && wip == 0) {
observer.onCompleted();
return true;
}
return false;
}
}
/**
* Delay observer.
*/
private static final class DelayObserver<T, U> implements Observer<U> {
final T value;
final SourceObserver<T, U> parent;
final Subscription token;

public DelayObserver(T value, SourceObserver<T, U> parent, Subscription token) {
this.value = value;
this.parent = parent;
this.token = token;
}

@Override
public void onNext(U args) {
parent.emit(value, token);
}

@Override
public void onError(Throwable e) {
parent.onError(e);
}

@Override
public void onCompleted() {
parent.emit(value, token);
}

}
}
}
Loading