Skip to content

Commit

Permalink
Merge pull request #1548 from stealthcode/BPRedo
Browse files Browse the repository at this point in the history
Adding backpressure to OnSubscribeRedo
  • Loading branch information
benjchristensen committed Aug 11, 2014
2 parents 68652e9 + 071aa4c commit 82a04d9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@
import static rx.Observable.create;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
Expand Down Expand Up @@ -154,32 +158,40 @@ public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observa
}

private Observable<T> source;
private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f;
private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> controlHandlerFunction;
private boolean stopOnComplete;
private boolean stopOnError;
private final Scheduler scheduler;
private final AtomicBoolean isLocked = new AtomicBoolean(true);
private final AtomicBoolean isStarted = new AtomicBoolean(false);
// incremented when requests are made, decremented when requests are fulfilled
private final AtomicLong consumerCapacity = new AtomicLong(0l);
private final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();

private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
Scheduler scheduler) {
this.source = source;
this.f = f;
this.controlHandlerFunction = f;
this.stopOnComplete = stopOnComplete;
this.stopOnError = stopOnError;
this.scheduler = scheduler;
}

@Override
public void call(final Subscriber<? super T> child) {
isStarted.set(false);
isLocked.set(true);
consumerCapacity.set(0l);
currentProducer.set(null);

final Scheduler.Worker inner = scheduler.createWorker();
child.add(inner);

final CompositeSubscription sourceSubscriptions = new CompositeSubscription();
child.add(sourceSubscriptions);

final PublishSubject<Notification<?>> terminals = PublishSubject.create();


final Action0 subscribeToSource = new Action0() {
@Override
public void call() {
Expand All @@ -198,8 +210,15 @@ public void onError(Throwable e) {

@Override
public void onNext(T v) {
consumerCapacity.decrementAndGet();
child.onNext(v);
}

@Override
public void setProducer(Producer producer) {
currentProducer.set(producer);
producer.request(consumerCapacity.get());
}
};
// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
Expand All @@ -208,8 +227,10 @@ public void onNext(T v) {
}
};

final Observable<?> restarts = f.call(
// lifting in a custom operator to kind of do a merge/map/filter thing.
// the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat'
// type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert
// the retry/repeat relevant values to the control handler
final Observable<?> restarts = controlHandlerFunction.call(
terminals.lift(new Operator<Notification<?>, Notification<?>>() {
@Override
public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
Expand All @@ -233,6 +254,11 @@ public void onNext(Notification<?> t) {
filteredTerminals.onNext(t);
}
}

@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
};
}
}));
Expand All @@ -255,15 +281,31 @@ public void onError(Throwable e) {
@Override
public void onNext(Object t) {
if (!isLocked.get() && !child.isUnsubscribed()) {
// if (!child.isUnsubscribed()) {
child.add(inner.schedule(subscribeToSource));
}
}

@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
});
}
}));
if (!child.isUnsubscribed()) {
child.add(inner.schedule(subscribeToSource));
}

child.setProducer(new Producer() {

@Override
public void request(long n) {
if (isStarted.compareAndSet(false, true)) {
consumerCapacity.set(n);
if (!child.isUnsubscribed()) child.add(inner.schedule(subscribeToSource));
} else if (currentProducer.get() != null) {
consumerCapacity.getAndAdd(n);
currentProducer.get().request(n);
}
}
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import rx.Observable.OnSubscribe;
import rx.Notification;
import rx.Observer;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
Expand Down

0 comments on commit 82a04d9

Please sign in to comment.